1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID
19 from nepi.util import server
20 from nepi.util import parallel
26 class UnresponsiveNodeError(RuntimeError):
29 def _castproperty(typ, propattr):
31 return getattr(self, propattr)
32 def _set(self, value):
33 if value is not None or (isinstance(value, basestring) and not value):
35 return setattr(self, propattr, value)
36 def _del(self, value):
37 return delattr(self, propattr)
38 _get.__name__ = propattr + '_get'
39 _set.__name__ = propattr + '_set'
40 _del.__name__ = propattr + '_del'
41 return property(_get, _set, _del)
45 # Map Node attribute to plcapi filter name
46 'hostname' : 'hostname',
50 # Map Node attribute to (<tag name>, <plcapi filter expression>)
51 # There are replacements that are applied with string formatting,
52 # so '%' has to be escaped as '%%'.
53 'architecture' : ('arch','value'),
54 'operatingSystem' : ('fcdistro','value'),
55 'pl_distro' : ('pldistro','value'),
56 'city' : ('city','value'),
57 'country' : ('country','value'),
58 'region' : ('region','value'),
59 'minReliability' : ('reliability%(timeframe)s', ']value'),
60 'maxReliability' : ('reliability%(timeframe)s', '[value'),
61 'minBandwidth' : ('bw%(timeframe)s', ']value'),
62 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63 'minLoad' : ('load%(timeframe)s', ']value'),
64 'maxLoad' : ('load%(timeframe)s', '[value'),
65 'minCpu' : ('cpu%(timeframe)s', ']value'),
66 'maxCpu' : ('cpu%(timeframe)s', '[value'),
70 # (<tag name>, <weight>, <default>)
71 ('bw%(timeframe)s', -0.001, 1024.0),
72 ('cpu%(timeframe)s', 0.1, 40.0),
73 ('load%(timeframe)s', -0.2, 3.0),
74 ('reliability%(timeframe)s', 1, 100.0),
77 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
80 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
81 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
83 minReliability = _castproperty(float, '_minReliability')
84 maxReliability = _castproperty(float, '_maxReliability')
85 minBandwidth = _castproperty(float, '_minBandwidth')
86 maxBandwidth = _castproperty(float, '_maxBandwidth')
87 minCpu = _castproperty(float, '_minCpu')
88 maxCpu = _castproperty(float, '_maxCpu')
89 minLoad = _castproperty(float, '_minLoad')
90 maxLoad = _castproperty(float, '_maxLoad')
92 def __init__(self, api=None, sliceapi=None):
96 self._sliceapi = sliceapi or api
100 self.architecture = None
101 self.operatingSystem = None
102 self.pl_distro = None
107 self.minReliability = None
108 self.maxReliability = None
109 self.minBandwidth = None
110 self.maxBandwidth = None
115 self.min_num_external_ifaces = None
116 self.max_num_external_ifaces = None
117 self._timeframe = 'w'
119 # Applications and routes add requirements to connected nodes
120 self.required_packages = set()
121 self.required_vsys = set()
123 self.rpmFusion = False
124 self.env = collections.defaultdict(list)
126 # Some special applications - initialized when connected
127 self.multicast_forwarder = None
129 # Testbed-derived attributes
130 self.slicename = None
131 self.ident_path = None
132 self.server_key = None
133 self.home_path = None
134 self.enable_proc_cleanup = False
135 self.enable_home_cleanup = False
137 # Those are filled when an actual node is allocated
139 self._yum_dependencies = None
140 self._installed = False
143 self._logger = logging.getLogger('nepi.testbeds.planetlab')
145 def set_timeframe(self, timeframe):
146 if timeframe == "latest":
148 elif timeframe == "month":
149 self._timeframe = "m"
150 elif timeframe == "year":
151 self._timeframe = "y"
153 self._timeframe = "w"
155 def get_timeframe(self):
156 if self._timeframe == "":
158 if self._timeframe == "m":
160 if self._timeframe == "y":
164 timeframe = property(get_timeframe, set_timeframe)
166 def _nepi_testbed_environment_setup_get(self):
167 command = cStringIO.StringIO()
168 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
169 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
171 command.write(' ; export PATH=$PATH:%s' % (
172 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
175 for envkey, envvals in self.env.iteritems():
176 for envval in envvals:
177 command.write(' ; export %s=%s' % (envkey, envval))
178 return command.getvalue()
180 def _nepi_testbed_environment_setup_set(self, value):
183 _nepi_testbed_environment_setup = property(
184 _nepi_testbed_environment_setup_get,
185 _nepi_testbed_environment_setup_set)
187 def build_filters(self, target_filters, filter_map):
188 for attr, tag in filter_map.iteritems():
189 value = getattr(self, attr, None)
190 if value is not None:
191 target_filters[tag] = value
192 return target_filters
195 def applicable_filters(self):
196 has = lambda att : getattr(self,att,None) is not None
198 filter(has, self.BASEFILTERS.iterkeys())
199 + filter(has, self.TAGFILTERS.iterkeys())
202 def find_candidates(self, filter_slice_id=None):
203 self._logger.info("Finding candidates for %s", self.make_filter_description())
205 fields = ('node_id',)
206 replacements = {'timeframe':self._timeframe}
208 # get initial candidates (no tag filters)
209 basefilters = self.build_filters({}, self.BASEFILTERS)
210 rootfilters = basefilters.copy()
212 basefilters['|slice_ids'] = (filter_slice_id,)
214 # only pick healthy nodes
215 basefilters['run_level'] = 'boot'
216 basefilters['boot_state'] = 'boot'
217 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
218 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
220 # keyword-only "pseudofilters"
223 extra['peer'] = self.site
225 candidates = set(map(operator.itemgetter('node_id'),
226 self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
228 # filter by tag, one tag at a time
229 applicable = self.applicable_filters
230 for tagfilter in self.TAGFILTERS.iteritems():
231 attr, (tagname, expr) = tagfilter
233 # don't bother if there's no filter defined
234 if attr in applicable:
235 tagfilter = rootfilters.copy()
236 tagfilter['tagname'] = tagname % replacements
237 tagfilter[expr % replacements] = str(getattr(self,attr))
238 tagfilter['node_id'] = list(candidates)
240 candidates &= set(map(operator.itemgetter('node_id'),
241 self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
243 # filter by vsys tags - special case since it doesn't follow
244 # the usual semantics
245 if self.required_vsys:
246 newcandidates = collections.defaultdict(set)
248 vsys_tags = self._sliceapi.GetNodeTags(
250 node_id = list(candidates),
251 fields = ['node_id','value'])
254 operator.itemgetter(['node_id','value']),
257 required_vsys = self.required_vsys
258 for node_id, value in vsys_tags:
259 if value in required_vsys:
260 newcandidates[value].add(node_id)
262 # take only those that have all the required vsys tags
263 newcandidates = reduce(
264 lambda accum, new : accum & new,
265 newcandidates.itervalues(),
268 # filter by iface count
269 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
270 # fetch interfaces for all, in one go
271 filters = basefilters.copy()
272 filters['node_id'] = list(candidates)
273 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
274 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
276 # filter candidates by interface count
277 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
278 predicate = ( lambda node_id :
279 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
280 elif self.min_num_external_ifaces is not None:
281 predicate = ( lambda node_id :
282 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
284 predicate = ( lambda node_id :
285 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
287 candidates = set(filter(predicate, candidates))
289 # make sure hostnames are resolvable
292 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
294 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
295 self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
298 def resolvable(node_id):
300 addr = server.gethostbyname(hostnames[node_id])
301 return addr is not None
304 candidates = set(parallel.pfilter(resolvable, candidates,
307 self._logger.info(" Found %s reachable candidates.", len(candidates))
309 for h in hostnames.keys():
310 if h not in candidates:
313 hostnames = dict((v,k) for k, v in hostnames.iteritems())
317 def make_filter_description(self):
319 Makes a human-readable description of filtering conditions
323 # get initial candidates (no tag filters)
324 filters = self.build_filters({}, self.BASEFILTERS)
326 # keyword-only "pseudofilters"
328 filters['peer'] = self.site
330 # filter by tag, one tag at a time
331 applicable = self.applicable_filters
332 for tagfilter in self.TAGFILTERS.iteritems():
333 attr, (tagname, expr) = tagfilter
335 # don't bother if there's no filter defined
336 if attr in applicable:
337 filters[attr] = getattr(self,attr)
339 # filter by vsys tags - special case since it doesn't follow
340 # the usual semantics
341 if self.required_vsys:
342 filters['vsys'] = ','.join(list(self.required_vsys))
344 # filter by iface count
345 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
346 filters['num_ifaces'] = '-'.join([
347 str(self.min_num_external_ifaces or '0'),
348 str(self.max_num_external_ifaces or 'inf')
351 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
353 def assign_node_id(self, node_id):
354 self._node_id = node_id
355 self.fetch_node_info()
357 def unassign_node(self):
362 orig_attrs = self.__orig_attrs
363 except AttributeError:
366 for key, value in orig_attrs.iteritems():
367 setattr(self, key, value)
368 del self.__orig_attrs
370 def rate_nodes(self, nodes):
371 rates = collections.defaultdict(int)
372 tags = collections.defaultdict(dict)
373 replacements = {'timeframe':self._timeframe}
374 tagnames = [ tagname % replacements
375 for tagname, weight, default in self.RATE_FACTORS ]
377 taginfo = self._sliceapi.GetNodeTags(
380 fields=('node_id','tagname','value'))
382 unpack = operator.itemgetter('node_id','tagname','value')
383 for value in taginfo:
384 node, tagname, value = unpack(value)
385 if value and value.lower() != 'n/a':
386 tags[tagname][node] = float(value)
388 for tagname, weight, default in self.RATE_FACTORS:
389 taginfo = tags[tagname % replacements].get
391 rates[node] += weight * taginfo(node,default)
393 return map(rates.__getitem__, nodes)
395 def fetch_node_info(self):
398 info, tags = self._sliceapi.GetNodeInfo(self._node_id)
401 tags = dict( (t['tagname'],t['value'])
404 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
405 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
406 self.min_num_external_ifaces = None
407 self.max_num_external_ifaces = None
408 if not self._timeframe: self._timeframe = 'w'
410 replacements = {'timeframe':self._timeframe}
412 for attr, tag in self.BASEFILTERS.iteritems():
415 if hasattr(self, attr):
416 orig_attrs[attr] = getattr(self, attr)
417 setattr(self, attr, value)
418 for attr, (tag,_) in self.TAGFILTERS.iteritems():
419 tag = tag % replacements
422 if hasattr(self, attr):
423 orig_attrs[attr] = getattr(self, attr)
424 if not value or value.lower() == 'n/a':
426 setattr(self, attr, value)
428 if 'peer_id' in info:
429 orig_attrs['site'] = self.site
430 self.site = self._sliceapi.peer_map[info['peer_id']]
432 if 'interface_ids' in info:
433 self.min_num_external_ifaces = \
434 self.max_num_external_ifaces = len(info['interface_ids'])
436 if 'ssh_rsa_key' in info:
437 orig_attrs['server_key'] = self.server_key
438 self.server_key = info['ssh_rsa_key']
440 self.hostip = server.gethostbyname(self.hostname)
444 except AttributeError:
445 self.__orig_attrs = orig_attrs
448 if self.home_path is None:
449 raise AssertionError, "Misconfigured node: missing home path"
450 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
451 raise AssertionError, "Misconfigured node: missing slice SSH key"
452 if self.slicename is None:
453 raise AssertionError, "Misconfigured node: unspecified slice"
456 # Mark dependencies installed
457 self._installed = True
459 # Clear load attributes, they impair re-discovery
460 self.minReliability = \
461 self.maxReliability = \
462 self.minBandwidth = \
463 self.maxBandwidth = \
469 def install_dependencies(self):
470 if self.required_packages and not self._installed:
471 # If we need rpmfusion, we must install the repo definition and the gpg keys
473 if self.operatingSystem == 'f12':
474 # Fedora 12 requires a different rpmfusion package
475 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
477 # This one works for f13+
478 RPM_FUSION_URL = self.RPM_FUSION_URL
481 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
483 'RPM_FUSION_URL' : RPM_FUSION_URL
489 (out,err),proc = server.popen_ssh_command(
493 user = self.slicename,
495 ident_key = self.ident_path,
496 server_key = self.server_key,
501 if self.check_bad_host(out,err):
503 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (self.hostname, out,err,)
505 # Launch p2p yum dependency installer
506 self._yum_dependencies.async_setup()
508 def wait_provisioning(self, timeout = 20*60):
509 # Wait for the p2p installer
512 while not self.is_alive():
513 time.sleep(sleeptime)
514 totaltime += sleeptime
515 sleeptime = min(30.0, sleeptime*1.5)
517 if totaltime > timeout:
518 # PlanetLab has a 15' delay on configuration propagation
519 # If we're above that delay, the unresponsiveness is not due
521 if not self.is_alive(verbose=True):
522 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
524 # Ensure the node is clean (no apps running that could interfere with operations)
525 if self.enable_proc_cleanup:
526 self.do_proc_cleanup()
527 if self.enable_home_cleanup:
528 self.do_home_cleanup()
530 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
531 # Wait for the p2p installer
532 if self._yum_dependencies and not self._installed:
533 self._yum_dependencies.async_setup_wait()
534 self._installed = True
536 def is_alive(self, verbose = False):
537 # Make sure all the paths are created where
538 # they have to be created for deployment
539 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
543 user = self.slicename,
545 ident_key = self.ident_path,
546 server_key = self.server_key,
548 err_on_timeout = False,
554 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
556 elif not err and out.strip() == 'ALIVE':
560 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
564 if self.enable_proc_cleanup:
565 self.do_proc_cleanup()
569 self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
571 util.appendBlacklist(self.hostname)
573 def do_proc_cleanup(self):
574 if self.testbed().recovering:
578 self._logger.info("Cleaning up processes on %s", self.hostname)
581 "sudo -S killall python tcpdump || /bin/true ; "
582 "sudo -S killall python tcpdump || /bin/true ; "
583 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
584 "sudo -S killall -u %(slicename)s || /bin/true ",
585 "sudo -S killall -u root || /bin/true ",
586 "sudo -S killall -u %(slicename)s || /bin/true ",
587 "sudo -S killall -u root || /bin/true ",
591 (out,err),proc = server.popen_ssh_command(
592 # Some apps need two kills
594 'slicename' : self.slicename ,
598 user = self.slicename,
600 ident_key = self.ident_path,
601 server_key = self.server_key,
602 tty = True, # so that ps -N -T works as advertised...
608 def do_home_cleanup(self):
609 if self.testbed().recovering:
613 self._logger.info("Cleaning up home on %s", self.hostname)
616 "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
620 (out,err),proc = server.popen_ssh_command(
621 # Some apps need two kills
623 'slicename' : self.slicename ,
627 user = self.slicename,
629 ident_key = self.ident_path,
630 server_key = self.server_key,
631 tty = True, # so that ps -N -T works as advertised...
637 def prepare_dependencies(self):
638 # Configure p2p yum dependency installer
639 if self.required_packages and not self._installed:
640 self._yum_dependencies = application.YumDependency(self._api)
641 self._yum_dependencies.node = self
642 self._yum_dependencies.home_path = "nepi-yumdep"
643 self._yum_dependencies.depends = ' '.join(self.required_packages)
645 def routing_method(self, routes, vsys_vnet):
647 There are two methods, vroute and sliceip.
650 Modifies the node's routing table directly, validating that the IP
651 range lies within the network given by the slice's vsys_vnet tag.
652 This method is the most scalable for very small routing tables
653 that need not modify other routes (including the default)
656 Uses policy routing and iptables filters to create per-sliver
657 routing tables. It's the most flexible way, but it doesn't scale
658 as well since only 155 routing tables can be created this way.
660 This method will return the most appropriate routing method, which will
661 prefer vroute for small routing tables.
664 # For now, sliceip results in kernel panics
665 # so we HAVE to use vroute
668 # We should not make the routing table grow too big
669 if len(routes) > MAX_VROUTE_ROUTES:
672 vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
674 dest, prefix, nexthop, metric, device = route
675 dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
676 nexthop = ipaddr.IPAddress(nexthop)
677 if dest not in vsys_vnet or nexthop not in vsys_vnet:
682 def format_route(self, route, dev, method, action):
683 dest, prefix, nexthop, metric, device = route
684 if method == 'vroute':
686 "%s %s%s gw %s %s" % (
689 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
694 elif method == 'sliceip':
696 "route %s to %s%s via %s metric %s dev %s" % (
699 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
706 raise AssertionError, "Unknown method"
708 def _annotate_routes_with_devs(self, routes, devs, method):
712 if dev.routes_here(route):
713 dev_routes.append(tuple(route) + (dev.if_name,))
718 if method == 'sliceip':
719 dev_routes.append(tuple(route) + ('eth0',))
721 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
722 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
725 def configure_routes(self, routes, devs, vsys_vnet):
727 Add the specified routes to the node's routing table
730 method = self.routing_method(routes, vsys_vnet)
733 # annotate routes with devices
734 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
735 for route in dev_routes:
736 route, dev = route[:-1], route[-1]
740 rules.append(self.format_route(route, dev, method, 'add'))
742 if method == 'sliceip':
743 rules = map('enable '.__add__, tdevs) + rules
745 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
746 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
748 self.apply_route_rules(rules, method)
750 self._configured_routes = set(routes)
751 self._configured_devs = tdevs
752 self._configured_method = method
754 def reconfigure_routes(self, routes, devs, vsys_vnet):
756 Updates the routes in the node's routing table to match
759 method = self._configured_method
761 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
763 current = self._configured_routes
764 current_devs = self._configured_devs
766 new = set(dev_routes)
767 new_devs = set(map(operator.itemgetter(-1), dev_routes))
769 deletions = current - new
770 insertions = new - current
772 dev_deletions = current_devs - new_devs
773 dev_insertions = new_devs - current_devs
778 # Rule deletions first
779 for route in deletions:
780 route, dev = route[:-1], route[-1]
781 rules.append(self.format_route(route, dev, method, 'del'))
783 if method == 'sliceip':
785 rules.extend(map('disable '.__add__, dev_deletions))
788 rules.extend(map('enable '.__add__, dev_insertions))
790 # Rule insertions now
791 for route in insertions:
792 route, dev = route[:-1], dev[-1]
793 rules.append(self.format_route(route, dev, method, 'add'))
796 self.apply_route_rules(rules, method)
798 self._configured_routes = dev_routes
799 self._configured_devs = new_devs
801 def apply_route_rules(self, rules, method):
802 (out,err),proc = server.popen_ssh_command(
803 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
804 home = server.shell_escape(self.home_path),
808 user = self.slicename,
810 ident_key = self.ident_path,
811 server_key = self.server_key,
812 stdin = '\n'.join(rules),
816 if proc.wait() or err:
817 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
819 logger.debug("%s said: %s%s", method, out, err)
821 def check_bad_host(self, out, err):
822 badre = re.compile(r'(?:'
823 r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
824 r'|Error: disk I/O error'
827 return badre.search(out) or badre.search(err)