2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
20 from nepi.util import server
21 from nepi.util import parallel
27 class UnresponsiveNodeError(RuntimeError):
30 def _castproperty(typ, propattr):
32 return getattr(self, propattr)
33 def _set(self, value):
34 if value is not None or (isinstance(value, basestring) and not value):
36 return setattr(self, propattr, value)
37 def _del(self, value):
38 return delattr(self, propattr)
39 _get.__name__ = propattr + '_get'
40 _set.__name__ = propattr + '_set'
41 _del.__name__ = propattr + '_del'
42 return property(_get, _set, _del)
46 # Map Node attribute to plcapi filter name
47 'hostname' : 'hostname',
51 # Map Node attribute to (<tag name>, <plcapi filter expression>)
52 # There are replacements that are applied with string formatting,
53 # so '%' has to be escaped as '%%'.
54 'architecture' : ('arch','value'),
55 'operatingSystem' : ('fcdistro','value'),
56 'pl_distro' : ('pldistro','value'),
57 'city' : ('city','value'),
58 'country' : ('country','value'),
59 'region' : ('region','value'),
60 'minReliability' : ('reliability%(timeframe)s', ']value'),
61 'maxReliability' : ('reliability%(timeframe)s', '[value'),
62 'minBandwidth' : ('bw%(timeframe)s', ']value'),
63 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
64 'minLoad' : ('load%(timeframe)s', ']value'),
65 'maxLoad' : ('load%(timeframe)s', '[value'),
66 'minCpu' : ('cpu%(timeframe)s', ']value'),
67 'maxCpu' : ('cpu%(timeframe)s', '[value'),
71 # (<tag name>, <weight>, <default>)
72 ('bw%(timeframe)s', -0.001, 1024.0),
73 ('cpu%(timeframe)s', 0.1, 40.0),
74 ('load%(timeframe)s', -0.2, 3.0),
75 ('reliability%(timeframe)s', 1, 100.0),
78 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
79 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
80 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
81 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
83 minReliability = _castproperty(float, '_minReliability')
84 maxReliability = _castproperty(float, '_maxReliability')
85 minBandwidth = _castproperty(float, '_minBandwidth')
86 maxBandwidth = _castproperty(float, '_maxBandwidth')
87 minCpu = _castproperty(float, '_minCpu')
88 maxCpu = _castproperty(float, '_maxCpu')
89 minLoad = _castproperty(float, '_minLoad')
90 maxLoad = _castproperty(float, '_maxLoad')
92 def __init__(self, api=None):
99 self.architecture = None
100 self.operatingSystem = None
101 self.pl_distro = None
106 self.minReliability = None
107 self.maxReliability = None
108 self.minBandwidth = None
109 self.maxBandwidth = None
114 self.min_num_external_ifaces = None
115 self.max_num_external_ifaces = None
118 # Applications and routes add requirements to connected nodes
119 self.required_packages = set()
120 self.required_vsys = set()
122 self.rpmFusion = False
123 self.env = collections.defaultdict(list)
125 # Some special applications - initialized when connected
126 self.multicast_forwarder = None
128 # Testbed-derived attributes
129 self.slicename = None
130 self.ident_path = None
131 self.server_key = None
132 self.home_path = None
133 self.enable_cleanup = False
135 # Those are filled when an actual node is allocated
137 self._yum_dependencies = None
138 self._installed = False
141 self._logger = logging.getLogger('nepi.testbeds.planetlab')
143 def _nepi_testbed_environment_setup_get(self):
144 command = cStringIO.StringIO()
145 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
146 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
148 command.write(' ; export PATH=$PATH:%s' % (
149 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
152 for envkey, envvals in self.env.iteritems():
153 for envval in envvals:
154 command.write(' ; export %s=%s' % (envkey, envval))
155 return command.getvalue()
156 def _nepi_testbed_environment_setup_set(self, value):
158 _nepi_testbed_environment_setup = property(
159 _nepi_testbed_environment_setup_get,
160 _nepi_testbed_environment_setup_set)
162 def build_filters(self, target_filters, filter_map):
163 for attr, tag in filter_map.iteritems():
164 value = getattr(self, attr, None)
165 if value is not None:
166 target_filters[tag] = value
167 return target_filters
170 def applicable_filters(self):
171 has = lambda att : getattr(self,att,None) is not None
173 filter(has, self.BASEFILTERS.iterkeys())
174 + filter(has, self.TAGFILTERS.iterkeys())
177 def find_candidates(self, filter_slice_id=None):
178 self._logger.info("Finding candidates for %s", self.make_filter_description())
180 fields = ('node_id',)
181 replacements = {'timeframe':self.timeframe}
183 # get initial candidates (no tag filters)
184 basefilters = self.build_filters({}, self.BASEFILTERS)
185 rootfilters = basefilters.copy()
187 basefilters['|slice_ids'] = (filter_slice_id,)
189 # only pick healthy nodes
190 basefilters['run_level'] = 'boot'
191 basefilters['boot_state'] = 'boot'
192 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
193 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
195 # keyword-only "pseudofilters"
198 extra['peer'] = self.site
200 candidates = set(map(operator.itemgetter('node_id'),
201 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
203 # filter by tag, one tag at a time
204 applicable = self.applicable_filters
205 for tagfilter in self.TAGFILTERS.iteritems():
206 attr, (tagname, expr) = tagfilter
208 # don't bother if there's no filter defined
209 if attr in applicable:
210 tagfilter = rootfilters.copy()
211 tagfilter['tagname'] = tagname % replacements
212 tagfilter[expr % replacements] = getattr(self,attr)
213 tagfilter['node_id'] = list(candidates)
215 candidates &= set(map(operator.itemgetter('node_id'),
216 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
218 # filter by vsys tags - special case since it doesn't follow
219 # the usual semantics
220 if self.required_vsys:
221 newcandidates = collections.defaultdict(set)
223 vsys_tags = self._api.GetNodeTags(
225 node_id = list(candidates),
226 fields = ['node_id','value'])
229 operator.itemgetter(['node_id','value']),
232 required_vsys = self.required_vsys
233 for node_id, value in vsys_tags:
234 if value in required_vsys:
235 newcandidates[value].add(node_id)
237 # take only those that have all the required vsys tags
238 newcandidates = reduce(
239 lambda accum, new : accum & new,
240 newcandidates.itervalues(),
243 # filter by iface count
244 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245 # fetch interfaces for all, in one go
246 filters = basefilters.copy()
247 filters['node_id'] = list(candidates)
248 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
249 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
251 # filter candidates by interface count
252 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
253 predicate = ( lambda node_id :
254 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
255 elif self.min_num_external_ifaces is not None:
256 predicate = ( lambda node_id :
257 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
259 predicate = ( lambda node_id :
260 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
262 candidates = set(filter(predicate, candidates))
264 # make sure hostnames are resolvable
266 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
268 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
269 self._api.GetNodes(list(candidates), ['node_id','hostname'])
271 def resolvable(node_id):
273 addr = socket.gethostbyname(hostnames[node_id])
274 return addr is not None
277 candidates = set(parallel.pfilter(resolvable, candidates,
280 self._logger.info(" Found %s reachable candidates.", len(candidates))
284 def make_filter_description(self):
286 Makes a human-readable description of filtering conditions
290 # get initial candidates (no tag filters)
291 filters = self.build_filters({}, self.BASEFILTERS)
293 # keyword-only "pseudofilters"
295 filters['peer'] = self.site
297 # filter by tag, one tag at a time
298 applicable = self.applicable_filters
299 for tagfilter in self.TAGFILTERS.iteritems():
300 attr, (tagname, expr) = tagfilter
302 # don't bother if there's no filter defined
303 if attr in applicable:
304 filters[attr] = getattr(self,attr)
306 # filter by vsys tags - special case since it doesn't follow
307 # the usual semantics
308 if self.required_vsys:
309 filters['vsys'] = ','.join(list(self.required_vsys))
311 # filter by iface count
312 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
313 filters['num_ifaces'] = '-'.join([
314 str(self.min_num_external_ifaces or '0'),
315 str(self.max_num_external_ifaces or 'inf')
318 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
320 def assign_node_id(self, node_id):
321 self._node_id = node_id
322 self.fetch_node_info()
324 def unassign_node(self):
329 orig_attrs = self.__orig_attrs
330 except AttributeError:
333 for key, value in orig_attrs.iteritems():
334 setattr(self, key, value)
335 del self.__orig_attrs
337 def rate_nodes(self, nodes):
338 rates = collections.defaultdict(int)
339 tags = collections.defaultdict(dict)
340 replacements = {'timeframe':self.timeframe}
341 tagnames = [ tagname % replacements
342 for tagname, weight, default in self.RATE_FACTORS ]
344 taginfo = self._api.GetNodeTags(
347 fields=('node_id','tagname','value'))
349 unpack = operator.itemgetter('node_id','tagname','value')
350 for value in taginfo:
351 node, tagname, value = unpack(value)
352 if value and value.lower() != 'n/a':
353 tags[tagname][int(node)] = float(value)
355 for tagname, weight, default in self.RATE_FACTORS:
356 taginfo = tags[tagname % replacements].get
358 rates[node] += weight * taginfo(node,default)
360 return map(rates.__getitem__, nodes)
362 def fetch_node_info(self):
365 self._api.StartMulticall()
366 info = self._api.GetNodes(self._node_id)
367 tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
368 info, tags = self._api.FinishMulticall()
371 tags = dict( (t['tagname'],t['value'])
374 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
375 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
376 self.min_num_external_ifaces = None
377 self.max_num_external_ifaces = None
380 replacements = {'timeframe':self.timeframe}
381 for attr, tag in self.BASEFILTERS.iteritems():
384 if hasattr(self, attr):
385 orig_attrs[attr] = getattr(self, attr)
386 setattr(self, attr, value)
387 for attr, (tag,_) in self.TAGFILTERS.iteritems():
388 tag = tag % replacements
391 if hasattr(self, attr):
392 orig_attrs[attr] = getattr(self, attr)
393 if not value or value.lower() == 'n/a':
395 setattr(self, attr, value)
397 if 'peer_id' in info:
398 orig_attrs['site'] = self.site
399 self.site = self._api.peer_map[info['peer_id']]
401 if 'interface_ids' in info:
402 self.min_num_external_ifaces = \
403 self.max_num_external_ifaces = len(info['interface_ids'])
405 if 'ssh_rsa_key' in info:
406 orig_attrs['server_key'] = self.server_key
407 self.server_key = info['ssh_rsa_key']
409 self.hostip = socket.gethostbyname(self.hostname)
413 except AttributeError:
414 self.__orig_attrs = orig_attrs
417 if self.home_path is None:
418 raise AssertionError, "Misconfigured node: missing home path"
419 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
420 raise AssertionError, "Misconfigured node: missing slice SSH key"
421 if self.slicename is None:
422 raise AssertionError, "Misconfigured node: unspecified slice"
425 # Mark dependencies installed
426 self._installed = True
428 # Clear load attributes, they impair re-discovery
429 self.minReliability = \
430 self.maxReliability = \
431 self.minBandwidth = \
432 self.maxBandwidth = \
438 def install_dependencies(self):
439 if self.required_packages and not self._installed:
440 # If we need rpmfusion, we must install the repo definition and the gpg keys
442 if self.operatingSystem == 'f12':
443 # Fedora 12 requires a different rpmfusion package
444 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
446 # This one works for f13+
447 RPM_FUSION_URL = self.RPM_FUSION_URL
450 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
452 'RPM_FUSION_URL' : RPM_FUSION_URL
458 (out,err),proc = server.popen_ssh_command(
460 host = self.hostname,
462 user = self.slicename,
464 ident_key = self.ident_path,
465 server_key = self.server_key,
470 if self.check_bad_host(out,err):
472 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
474 # Launch p2p yum dependency installer
475 self._yum_dependencies.async_setup()
477 def wait_provisioning(self, timeout = 20*60):
478 # Wait for the p2p installer
481 while not self.is_alive():
482 time.sleep(sleeptime)
483 totaltime += sleeptime
484 sleeptime = min(30.0, sleeptime*1.5)
486 if totaltime > timeout:
487 # PlanetLab has a 15' delay on configuration propagation
488 # If we're above that delay, the unresponsiveness is not due
490 if not self.is_alive(verbose=True):
491 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
493 # Ensure the node is clean (no apps running that could interfere with operations)
494 if self.enable_cleanup:
497 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
498 # Wait for the p2p installer
499 if self._yum_dependencies and not self._installed:
500 self._yum_dependencies.async_setup_wait()
501 self._installed = True
503 def is_alive(self, verbose = False):
504 # Make sure all the paths are created where
505 # they have to be created for deployment
506 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
508 host = self.hostname,
510 user = self.slicename,
512 ident_key = self.ident_path,
513 server_key = self.server_key,
515 err_on_timeout = False,
521 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
523 elif not err and out.strip() == 'ALIVE':
527 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
531 if self.enable_cleanup:
536 self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
538 util.appendBlacklist(self._node_id)
540 def do_cleanup(self):
541 if self.testbed().recovering:
545 self._logger.info("Cleaning up %s", self.hostname)
548 "sudo -S killall python tcpdump || /bin/true ; "
549 "sudo -S killall python tcpdump || /bin/true ; "
550 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
551 "sudo -S killall -u %(slicename)s || /bin/true ",
552 "sudo -S killall -u root || /bin/true ",
553 "sudo -S killall -u %(slicename)s || /bin/true ",
554 "sudo -S killall -u root || /bin/true ",
558 (out,err),proc = server.popen_ssh_command(
559 # Some apps need two kills
561 'slicename' : self.slicename ,
563 host = self.hostname,
565 user = self.slicename,
567 ident_key = self.ident_path,
568 server_key = self.server_key,
569 tty = True, # so that ps -N -T works as advertised...
575 def prepare_dependencies(self):
576 # Configure p2p yum dependency installer
577 if self.required_packages and not self._installed:
578 self._yum_dependencies = application.YumDependency(self._api)
579 self._yum_dependencies.node = self
580 self._yum_dependencies.home_path = "nepi-yumdep"
581 self._yum_dependencies.depends = ' '.join(self.required_packages)
583 def routing_method(self, routes, vsys_vnet):
585 There are two methods, vroute and sliceip.
588 Modifies the node's routing table directly, validating that the IP
589 range lies within the network given by the slice's vsys_vnet tag.
590 This method is the most scalable for very small routing tables
591 that need not modify other routes (including the default)
594 Uses policy routing and iptables filters to create per-sliver
595 routing tables. It's the most flexible way, but it doesn't scale
596 as well since only 155 routing tables can be created this way.
598 This method will return the most appropriate routing method, which will
599 prefer vroute for small routing tables.
602 # For now, sliceip results in kernel panics
603 # so we HAVE to use vroute
606 # We should not make the routing table grow too big
607 if len(routes) > MAX_VROUTE_ROUTES:
610 vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
612 dest, prefix, nexthop, metric = route
613 dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
614 nexthop = ipaddr.IPAddress(nexthop)
615 if dest not in vsys_vnet or nexthop not in vsys_vnet:
620 def format_route(self, route, dev, method, action):
621 dest, prefix, nexthop, metric = route
622 if method == 'vroute':
624 "%s %s%s gw %s %s" % (
627 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
632 elif method == 'sliceip':
634 "route %s to %s%s via %s metric %s dev %s" % (
637 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
644 raise AssertionError, "Unknown method"
646 def _annotate_routes_with_devs(self, routes, devs, method):
650 if dev.routes_here(route):
651 dev_routes.append(tuple(route) + (dev.if_name,))
656 if method == 'sliceip':
657 dev_routes.append(tuple(route) + ('eth0',))
659 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
660 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
663 def configure_routes(self, routes, devs, vsys_vnet):
665 Add the specified routes to the node's routing table
668 method = self.routing_method(routes, vsys_vnet)
671 # annotate routes with devices
672 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
673 for route in dev_routes:
674 route, dev = route[:-1], route[-1]
678 rules.append(self.format_route(route, dev, method, 'add'))
680 if method == 'sliceip':
681 rules = map('enable '.__add__, tdevs) + rules
683 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
684 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
686 self.apply_route_rules(rules, method)
688 self._configured_routes = set(routes)
689 self._configured_devs = tdevs
690 self._configured_method = method
692 def reconfigure_routes(self, routes, devs, vsys_vnet):
694 Updates the routes in the node's routing table to match
697 method = self._configured_method
699 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
701 current = self._configured_routes
702 current_devs = self._configured_devs
704 new = set(dev_routes)
705 new_devs = set(map(operator.itemgetter(-1), dev_routes))
707 deletions = current - new
708 insertions = new - current
710 dev_deletions = current_devs - new_devs
711 dev_insertions = new_devs - current_devs
716 # Rule deletions first
717 for route in deletions:
718 route, dev = route[:-1], route[-1]
719 rules.append(self.format_route(route, dev, method, 'del'))
721 if method == 'sliceip':
723 rules.extend(map('disable '.__add__, dev_deletions))
726 rules.extend(map('enable '.__add__, dev_insertions))
728 # Rule insertions now
729 for route in insertions:
730 route, dev = route[:-1], dev[-1]
731 rules.append(self.format_route(route, dev, method, 'add'))
734 self.apply_route_rules(rules, method)
736 self._configured_routes = dev_routes
737 self._configured_devs = new_devs
739 def apply_route_rules(self, rules, method):
740 (out,err),proc = server.popen_ssh_command(
741 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
742 home = server.shell_escape(self.home_path),
744 host = self.hostname,
746 user = self.slicename,
748 ident_key = self.ident_path,
749 server_key = self.server_key,
750 stdin = '\n'.join(rules),
754 if proc.wait() or err:
755 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
757 logger.debug("%s said: %s%s", method, out, err)
759 def check_bad_host(self, out, err):
760 badre = re.compile(r'(?:'
761 r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
762 r'|Error: disk I/O error'
765 return badre.search(out) or badre.search(err)