1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID
19 from nepi.util import server
20 from nepi.util import parallel
26 class UnresponsiveNodeError(RuntimeError):
29 def _castproperty(typ, propattr):
31 return getattr(self, propattr)
32 def _set(self, value):
33 if value is not None or (isinstance(value, basestring) and not value):
35 return setattr(self, propattr, value)
36 def _del(self, value):
37 return delattr(self, propattr)
38 _get.__name__ = propattr + '_get'
39 _set.__name__ = propattr + '_set'
40 _del.__name__ = propattr + '_del'
41 return property(_get, _set, _del)
45 # Map Node attribute to plcapi filter name
46 'hostname' : 'hostname',
50 # Map Node attribute to (<tag name>, <plcapi filter expression>)
51 # There are replacements that are applied with string formatting,
52 # so '%' has to be escaped as '%%'.
53 'architecture' : ('arch','value'),
54 'operatingSystem' : ('fcdistro','value'),
55 'pl_distro' : ('pldistro','value'),
56 'city' : ('city','value'),
57 'country' : ('country','value'),
58 'region' : ('region','value'),
59 'minReliability' : ('reliability%(timeframe)s', ']value'),
60 'maxReliability' : ('reliability%(timeframe)s', '[value'),
61 'minBandwidth' : ('bw%(timeframe)s', ']value'),
62 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63 'minLoad' : ('load%(timeframe)s', ']value'),
64 'maxLoad' : ('load%(timeframe)s', '[value'),
65 'minCpu' : ('cpu%(timeframe)s', ']value'),
66 'maxCpu' : ('cpu%(timeframe)s', '[value'),
70 # (<tag name>, <weight>, <default>)
71 ('bw%(timeframe)s', -0.001, 1024.0),
72 ('cpu%(timeframe)s', 0.1, 40.0),
73 ('load%(timeframe)s', -0.2, 3.0),
74 ('reliability%(timeframe)s', 1, 100.0),
77 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
82 minReliability = _castproperty(float, '_minReliability')
83 maxReliability = _castproperty(float, '_maxReliability')
84 minBandwidth = _castproperty(float, '_minBandwidth')
85 maxBandwidth = _castproperty(float, '_maxBandwidth')
86 minCpu = _castproperty(float, '_minCpu')
87 maxCpu = _castproperty(float, '_maxCpu')
88 minLoad = _castproperty(float, '_minLoad')
89 maxLoad = _castproperty(float, '_maxLoad')
91 def __init__(self, api=None, sliceapi=None):
95 self._sliceapi = sliceapi
99 self.architecture = None
100 self.operatingSystem = None
101 self.pl_distro = None
106 self.minReliability = None
107 self.maxReliability = None
108 self.minBandwidth = None
109 self.maxBandwidth = None
114 self.min_num_external_ifaces = None
115 self.max_num_external_ifaces = None
118 # Applications and routes add requirements to connected nodes
119 self.required_packages = set()
120 self.required_vsys = set()
122 self.rpmFusion = False
123 self.env = collections.defaultdict(list)
125 # Some special applications - initialized when connected
126 self.multicast_forwarder = None
128 # Testbed-derived attributes
129 self.slicename = None
130 self.ident_path = None
131 self.server_key = None
132 self.home_path = None
133 self.enable_cleanup = False
135 # Those are filled when an actual node is allocated
137 self._yum_dependencies = None
138 self._installed = False
141 self._logger = logging.getLogger('nepi.testbeds.planetlab')
143 def _nepi_testbed_environment_setup_get(self):
144 command = cStringIO.StringIO()
145 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
146 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
148 command.write(' ; export PATH=$PATH:%s' % (
149 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
152 for envkey, envvals in self.env.iteritems():
153 for envval in envvals:
154 command.write(' ; export %s=%s' % (envkey, envval))
155 return command.getvalue()
156 def _nepi_testbed_environment_setup_set(self, value):
158 _nepi_testbed_environment_setup = property(
159 _nepi_testbed_environment_setup_get,
160 _nepi_testbed_environment_setup_set)
162 def build_filters(self, target_filters, filter_map):
163 for attr, tag in filter_map.iteritems():
164 value = getattr(self, attr, None)
165 if value is not None:
166 target_filters[tag] = value
167 return target_filters
170 def applicable_filters(self):
171 has = lambda att : getattr(self,att,None) is not None
173 filter(has, self.BASEFILTERS.iterkeys())
174 + filter(has, self.TAGFILTERS.iterkeys())
177 def find_candidates(self, filter_slice_id=None):
178 self._logger.info("Finding candidates for %s", self.make_filter_description())
180 fields = ('node_id',)
181 replacements = {'timeframe':self.timeframe}
183 # get initial candidates (no tag filters)
184 basefilters = self.build_filters({}, self.BASEFILTERS)
185 rootfilters = basefilters.copy()
187 basefilters['|slice_ids'] = (filter_slice_id,)
189 # only pick healthy nodes
190 basefilters['run_level'] = 'boot'
191 basefilters['boot_state'] = 'boot'
192 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
193 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
195 # keyword-only "pseudofilters"
198 extra['peer'] = self.site
200 candidates = set(map(operator.itemgetter('node_id'),
201 self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
203 # filter by tag, one tag at a time
204 applicable = self.applicable_filters
205 for tagfilter in self.TAGFILTERS.iteritems():
206 attr, (tagname, expr) = tagfilter
208 # don't bother if there's no filter defined
209 if attr in applicable:
210 tagfilter = rootfilters.copy()
211 tagfilter['tagname'] = tagname % replacements
212 tagfilter[expr % replacements] = getattr(self,attr)
213 tagfilter['node_id'] = list(candidates)
215 candidates &= set(map(operator.itemgetter('node_id'),
216 self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
218 # filter by vsys tags - special case since it doesn't follow
219 # the usual semantics
220 if self.required_vsys:
221 newcandidates = collections.defaultdict(set)
223 vsys_tags = self._sliceapi.GetNodeTags(
225 node_id = list(candidates),
226 fields = ['node_id','value'])
229 operator.itemgetter(['node_id','value']),
232 required_vsys = self.required_vsys
233 for node_id, value in vsys_tags:
234 if value in required_vsys:
235 newcandidates[value].add(node_id)
237 # take only those that have all the required vsys tags
238 newcandidates = reduce(
239 lambda accum, new : accum & new,
240 newcandidates.itervalues(),
243 # filter by iface count
244 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245 # fetch interfaces for all, in one go
246 filters = basefilters.copy()
247 filters['node_id'] = list(candidates)
248 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
249 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
251 # filter candidates by interface count
252 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
253 predicate = ( lambda node_id :
254 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
255 elif self.min_num_external_ifaces is not None:
256 predicate = ( lambda node_id :
257 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
259 predicate = ( lambda node_id :
260 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
262 candidates = set(filter(predicate, candidates))
264 # make sure hostnames are resolvable
267 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
269 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
270 self._api.GetNodes(list(candidates), ['node_id','hostname'])
272 def resolvable(node_id):
274 addr = socket.gethostbyname(hostnames[node_id])
275 return addr is not None
278 candidates = set(parallel.pfilter(resolvable, candidates,
281 self._logger.info(" Found %s reachable candidates.", len(candidates))
283 for h in hostnames.keys():
284 if h not in candidates:
287 hostnames = dict((v,k) for k, v in hostnames.iteritems())
291 def make_filter_description(self):
293 Makes a human-readable description of filtering conditions
297 # get initial candidates (no tag filters)
298 filters = self.build_filters({}, self.BASEFILTERS)
300 # keyword-only "pseudofilters"
302 filters['peer'] = self.site
304 # filter by tag, one tag at a time
305 applicable = self.applicable_filters
306 for tagfilter in self.TAGFILTERS.iteritems():
307 attr, (tagname, expr) = tagfilter
309 # don't bother if there's no filter defined
310 if attr in applicable:
311 filters[attr] = getattr(self,attr)
313 # filter by vsys tags - special case since it doesn't follow
314 # the usual semantics
315 if self.required_vsys:
316 filters['vsys'] = ','.join(list(self.required_vsys))
318 # filter by iface count
319 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
320 filters['num_ifaces'] = '-'.join([
321 str(self.min_num_external_ifaces or '0'),
322 str(self.max_num_external_ifaces or 'inf')
325 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
327 def assign_node_id(self, node_id):
328 self._node_id = node_id
329 self.fetch_node_info()
331 def unassign_node(self):
336 orig_attrs = self.__orig_attrs
337 except AttributeError:
340 for key, value in orig_attrs.iteritems():
341 setattr(self, key, value)
342 del self.__orig_attrs
344 def rate_nodes(self, nodes):
345 rates = collections.defaultdict(int)
346 tags = collections.defaultdict(dict)
347 replacements = {'timeframe':self.timeframe}
348 tagnames = [ tagname % replacements
349 for tagname, weight, default in self.RATE_FACTORS ]
351 taginfo = self._sliceapi.GetNodeTags(
354 fields=('node_id','tagname','value'))
356 unpack = operator.itemgetter('node_id','tagname','value')
357 for value in taginfo:
358 node, tagname, value = unpack(value)
359 if value and value.lower() != 'n/a':
360 tags[tagname][int(node)] = float(value)
362 for tagname, weight, default in self.RATE_FACTORS:
363 taginfo = tags[tagname % replacements].get
365 rates[node] += weight * taginfo(node,default)
367 return map(rates.__getitem__, nodes)
369 def fetch_node_info(self):
372 info, tags = self._sliceapi.GetNodeInfo(self._node_id)
375 tags = dict( (t['tagname'],t['value'])
378 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
379 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
380 self.min_num_external_ifaces = None
381 self.max_num_external_ifaces = None
384 replacements = {'timeframe':self.timeframe}
385 for attr, tag in self.BASEFILTERS.iteritems():
388 if hasattr(self, attr):
389 orig_attrs[attr] = getattr(self, attr)
390 setattr(self, attr, value)
391 for attr, (tag,_) in self.TAGFILTERS.iteritems():
392 tag = tag % replacements
395 if hasattr(self, attr):
396 orig_attrs[attr] = getattr(self, attr)
397 if not value or value.lower() == 'n/a':
399 setattr(self, attr, value)
401 if 'peer_id' in info:
402 orig_attrs['site'] = self.site
403 self.site = self._api.peer_map[info['peer_id']]
405 if 'interface_ids' in info:
406 self.min_num_external_ifaces = \
407 self.max_num_external_ifaces = len(info['interface_ids'])
409 if 'ssh_rsa_key' in info:
410 orig_attrs['server_key'] = self.server_key
411 self.server_key = info['ssh_rsa_key']
413 self.hostip = socket.gethostbyname(self.hostname)
417 except AttributeError:
418 self.__orig_attrs = orig_attrs
421 if self.home_path is None:
422 raise AssertionError, "Misconfigured node: missing home path"
423 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
424 raise AssertionError, "Misconfigured node: missing slice SSH key"
425 if self.slicename is None:
426 raise AssertionError, "Misconfigured node: unspecified slice"
429 # Mark dependencies installed
430 self._installed = True
432 # Clear load attributes, they impair re-discovery
433 self.minReliability = \
434 self.maxReliability = \
435 self.minBandwidth = \
436 self.maxBandwidth = \
442 def install_dependencies(self):
443 if self.required_packages and not self._installed:
444 # If we need rpmfusion, we must install the repo definition and the gpg keys
446 if self.operatingSystem == 'f12':
447 # Fedora 12 requires a different rpmfusion package
448 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
450 # This one works for f13+
451 RPM_FUSION_URL = self.RPM_FUSION_URL
454 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
456 'RPM_FUSION_URL' : RPM_FUSION_URL
462 (out,err),proc = server.popen_ssh_command(
464 host = self.hostname,
466 user = self.slicename,
468 ident_key = self.ident_path,
469 server_key = self.server_key,
474 if self.check_bad_host(out,err):
476 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
478 # Launch p2p yum dependency installer
479 self._yum_dependencies.async_setup()
481 def wait_provisioning(self, timeout = 20*60):
482 # Wait for the p2p installer
485 while not self.is_alive():
486 time.sleep(sleeptime)
487 totaltime += sleeptime
488 sleeptime = min(30.0, sleeptime*1.5)
490 if totaltime > timeout:
491 # PlanetLab has a 15' delay on configuration propagation
492 # If we're above that delay, the unresponsiveness is not due
494 if not self.is_alive(verbose=True):
495 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
497 # Ensure the node is clean (no apps running that could interfere with operations)
498 if self.enable_cleanup:
501 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
502 # Wait for the p2p installer
503 if self._yum_dependencies and not self._installed:
504 self._yum_dependencies.async_setup_wait()
505 self._installed = True
507 def is_alive(self, verbose = False):
508 # Make sure all the paths are created where
509 # they have to be created for deployment
510 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
512 host = self.hostname,
514 user = self.slicename,
516 ident_key = self.ident_path,
517 server_key = self.server_key,
519 err_on_timeout = False,
525 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
527 elif not err and out.strip() == 'ALIVE':
531 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
535 if self.enable_cleanup:
540 self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
542 util.appendBlacklist(self.hostname)
544 def do_cleanup(self):
545 if self.testbed().recovering:
549 self._logger.info("Cleaning up %s", self.hostname)
552 "sudo -S killall python tcpdump || /bin/true ; "
553 "sudo -S killall python tcpdump || /bin/true ; "
554 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
555 "sudo -S killall -u %(slicename)s || /bin/true ",
556 "sudo -S killall -u root || /bin/true ",
557 "sudo -S killall -u %(slicename)s || /bin/true ",
558 "sudo -S killall -u root || /bin/true ",
562 (out,err),proc = server.popen_ssh_command(
563 # Some apps need two kills
565 'slicename' : self.slicename ,
567 host = self.hostname,
569 user = self.slicename,
571 ident_key = self.ident_path,
572 server_key = self.server_key,
573 tty = True, # so that ps -N -T works as advertised...
579 def prepare_dependencies(self):
580 # Configure p2p yum dependency installer
581 if self.required_packages and not self._installed:
582 self._yum_dependencies = application.YumDependency(self._api)
583 self._yum_dependencies.node = self
584 self._yum_dependencies.home_path = "nepi-yumdep"
585 self._yum_dependencies.depends = ' '.join(self.required_packages)
587 def routing_method(self, routes, vsys_vnet):
589 There are two methods, vroute and sliceip.
592 Modifies the node's routing table directly, validating that the IP
593 range lies within the network given by the slice's vsys_vnet tag.
594 This method is the most scalable for very small routing tables
595 that need not modify other routes (including the default)
598 Uses policy routing and iptables filters to create per-sliver
599 routing tables. It's the most flexible way, but it doesn't scale
600 as well since only 155 routing tables can be created this way.
602 This method will return the most appropriate routing method, which will
603 prefer vroute for small routing tables.
606 # For now, sliceip results in kernel panics
607 # so we HAVE to use vroute
610 # We should not make the routing table grow too big
611 if len(routes) > MAX_VROUTE_ROUTES:
614 vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
616 dest, prefix, nexthop, metric = route
617 dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
618 nexthop = ipaddr.IPAddress(nexthop)
619 if dest not in vsys_vnet or nexthop not in vsys_vnet:
624 def format_route(self, route, dev, method, action):
625 dest, prefix, nexthop, metric = route
626 if method == 'vroute':
628 "%s %s%s gw %s %s" % (
631 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
636 elif method == 'sliceip':
638 "route %s to %s%s via %s metric %s dev %s" % (
641 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
648 raise AssertionError, "Unknown method"
650 def _annotate_routes_with_devs(self, routes, devs, method):
654 if dev.routes_here(route):
655 dev_routes.append(tuple(route) + (dev.if_name,))
660 if method == 'sliceip':
661 dev_routes.append(tuple(route) + ('eth0',))
663 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
664 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
667 def configure_routes(self, routes, devs, vsys_vnet):
669 Add the specified routes to the node's routing table
672 method = self.routing_method(routes, vsys_vnet)
675 # annotate routes with devices
676 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
677 for route in dev_routes:
678 route, dev = route[:-1], route[-1]
682 rules.append(self.format_route(route, dev, method, 'add'))
684 if method == 'sliceip':
685 rules = map('enable '.__add__, tdevs) + rules
687 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
688 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
690 self.apply_route_rules(rules, method)
692 self._configured_routes = set(routes)
693 self._configured_devs = tdevs
694 self._configured_method = method
696 def reconfigure_routes(self, routes, devs, vsys_vnet):
698 Updates the routes in the node's routing table to match
701 method = self._configured_method
703 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
705 current = self._configured_routes
706 current_devs = self._configured_devs
708 new = set(dev_routes)
709 new_devs = set(map(operator.itemgetter(-1), dev_routes))
711 deletions = current - new
712 insertions = new - current
714 dev_deletions = current_devs - new_devs
715 dev_insertions = new_devs - current_devs
720 # Rule deletions first
721 for route in deletions:
722 route, dev = route[:-1], route[-1]
723 rules.append(self.format_route(route, dev, method, 'del'))
725 if method == 'sliceip':
727 rules.extend(map('disable '.__add__, dev_deletions))
730 rules.extend(map('enable '.__add__, dev_insertions))
732 # Rule insertions now
733 for route in insertions:
734 route, dev = route[:-1], dev[-1]
735 rules.append(self.format_route(route, dev, method, 'add'))
738 self.apply_route_rules(rules, method)
740 self._configured_routes = dev_routes
741 self._configured_devs = new_devs
743 def apply_route_rules(self, rules, method):
744 (out,err),proc = server.popen_ssh_command(
745 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
746 home = server.shell_escape(self.home_path),
748 host = self.hostname,
750 user = self.slicename,
752 ident_key = self.ident_path,
753 server_key = self.server_key,
754 stdin = '\n'.join(rules),
758 if proc.wait() or err:
759 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
761 logger.debug("%s said: %s%s", method, out, err)
763 def check_bad_host(self, out, err):
764 badre = re.compile(r'(?:'
765 r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
766 r'|Error: disk I/O error'
769 return badre.search(out) or badre.search(err)