1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID
19 from nepi.util import server
20 from nepi.util import parallel
26 class UnresponsiveNodeError(RuntimeError):
29 def _castproperty(typ, propattr):
31 return getattr(self, propattr)
32 def _set(self, value):
33 if value is not None or (isinstance(value, basestring) and not value):
35 return setattr(self, propattr, value)
36 def _del(self, value):
37 return delattr(self, propattr)
38 _get.__name__ = propattr + '_get'
39 _set.__name__ = propattr + '_set'
40 _del.__name__ = propattr + '_del'
41 return property(_get, _set, _del)
45 # Map Node attribute to plcapi filter name
46 'hostname' : 'hostname',
50 # Map Node attribute to (<tag name>, <plcapi filter expression>)
51 # There are replacements that are applied with string formatting,
52 # so '%' has to be escaped as '%%'.
53 'architecture' : ('arch','value'),
54 'operatingSystem' : ('fcdistro','value'),
55 'pl_distro' : ('pldistro','value'),
56 'city' : ('city','value'),
57 'country' : ('country','value'),
58 'region' : ('region','value'),
59 'minReliability' : ('reliability%(timeframe)s', ']value'),
60 'maxReliability' : ('reliability%(timeframe)s', '[value'),
61 'minBandwidth' : ('bw%(timeframe)s', ']value'),
62 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63 'minLoad' : ('load%(timeframe)s', ']value'),
64 'maxLoad' : ('load%(timeframe)s', '[value'),
65 'minCpu' : ('cpu%(timeframe)s', ']value'),
66 'maxCpu' : ('cpu%(timeframe)s', '[value'),
70 # (<tag name>, <weight>, <default>)
71 ('bw%(timeframe)s', -0.001, 1024.0),
72 ('cpu%(timeframe)s', 0.1, 40.0),
73 ('load%(timeframe)s', -0.2, 3.0),
74 ('reliability%(timeframe)s', 1, 100.0),
77 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
82 minReliability = _castproperty(float, '_minReliability')
83 maxReliability = _castproperty(float, '_maxReliability')
84 minBandwidth = _castproperty(float, '_minBandwidth')
85 maxBandwidth = _castproperty(float, '_maxBandwidth')
86 minCpu = _castproperty(float, '_minCpu')
87 maxCpu = _castproperty(float, '_maxCpu')
88 minLoad = _castproperty(float, '_minLoad')
89 maxLoad = _castproperty(float, '_maxLoad')
91 def __init__(self, api=None, sliceapi=None):
95 self._sliceapi = sliceapi
99 self.architecture = None
100 self.operatingSystem = None
101 self.pl_distro = None
106 self.minReliability = None
107 self.maxReliability = None
108 self.minBandwidth = None
109 self.maxBandwidth = None
114 self.min_num_external_ifaces = None
115 self.max_num_external_ifaces = None
118 # Applications and routes add requirements to connected nodes
119 self.required_packages = set()
120 self.required_vsys = set()
122 self.rpmFusion = False
123 self.env = collections.defaultdict(list)
125 # Some special applications - initialized when connected
126 self.multicast_forwarder = None
128 # Testbed-derived attributes
129 self.slicename = None
130 self.ident_path = None
131 self.server_key = None
132 self.home_path = None
133 self.enable_cleanup = False
135 # Those are filled when an actual node is allocated
137 self._yum_dependencies = None
138 self._installed = False
141 self._logger = logging.getLogger('nepi.testbeds.planetlab')
143 def _nepi_testbed_environment_setup_get(self):
144 command = cStringIO.StringIO()
145 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
146 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
148 command.write(' ; export PATH=$PATH:%s' % (
149 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
152 for envkey, envvals in self.env.iteritems():
153 for envval in envvals:
154 command.write(' ; export %s=%s' % (envkey, envval))
155 return command.getvalue()
156 def _nepi_testbed_environment_setup_set(self, value):
158 _nepi_testbed_environment_setup = property(
159 _nepi_testbed_environment_setup_get,
160 _nepi_testbed_environment_setup_set)
162 def build_filters(self, target_filters, filter_map):
163 for attr, tag in filter_map.iteritems():
164 value = getattr(self, attr, None)
165 if value is not None:
166 target_filters[tag] = value
167 return target_filters
170 def applicable_filters(self):
171 has = lambda att : getattr(self,att,None) is not None
173 filter(has, self.BASEFILTERS.iterkeys())
174 + filter(has, self.TAGFILTERS.iterkeys())
177 def find_candidates(self, filter_slice_id=None):
178 self._logger.info("Finding candidates for %s", self.make_filter_description())
180 fields = ('node_id',)
181 replacements = {'timeframe':self.timeframe}
183 # get initial candidates (no tag filters)
184 basefilters = self.build_filters({}, self.BASEFILTERS)
185 rootfilters = basefilters.copy()
187 basefilters['|slice_ids'] = (filter_slice_id,)
189 # only pick healthy nodes
190 basefilters['run_level'] = 'boot'
191 basefilters['boot_state'] = 'boot'
192 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
193 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
195 # keyword-only "pseudofilters"
198 extra['peer'] = self.site
200 candidates = set(map(operator.itemgetter('node_id'),
201 self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
203 # filter by tag, one tag at a time
204 applicable = self.applicable_filters
205 for tagfilter in self.TAGFILTERS.iteritems():
206 attr, (tagname, expr) = tagfilter
208 # don't bother if there's no filter defined
209 if attr in applicable:
210 tagfilter = rootfilters.copy()
211 tagfilter['tagname'] = tagname % replacements
212 tagfilter[expr % replacements] = getattr(self,attr)
213 tagfilter['node_id'] = list(candidates)
215 candidates &= set(map(operator.itemgetter('node_id'),
216 self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
218 # filter by vsys tags - special case since it doesn't follow
219 # the usual semantics
220 if self.required_vsys:
221 newcandidates = collections.defaultdict(set)
223 vsys_tags = self._sliceapi.GetNodeTags(
225 node_id = list(candidates),
226 fields = ['node_id','value'])
229 operator.itemgetter(['node_id','value']),
232 required_vsys = self.required_vsys
233 for node_id, value in vsys_tags:
234 if value in required_vsys:
235 newcandidates[value].add(node_id)
237 # take only those that have all the required vsys tags
238 newcandidates = reduce(
239 lambda accum, new : accum & new,
240 newcandidates.itervalues(),
243 # filter by iface count
244 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245 # fetch interfaces for all, in one go
246 filters = basefilters.copy()
247 filters['node_id'] = list(candidates)
248 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
249 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
251 # filter candidates by interface count
252 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
253 predicate = ( lambda node_id :
254 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
255 elif self.min_num_external_ifaces is not None:
256 predicate = ( lambda node_id :
257 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
259 predicate = ( lambda node_id :
260 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
262 candidates = set(filter(predicate, candidates))
264 # make sure hostnames are resolvable
267 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
269 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
270 self._api.GetNodes(list(candidates), ['node_id','hostname'])
273 def resolvable(node_id):
275 addr = socket.gethostbyname(hostnames[node_id])
276 return addr is not None
279 candidates = set(parallel.pfilter(resolvable, candidates,
282 self._logger.info(" Found %s reachable candidates.", len(candidates))
284 for h in hostnames.keys():
285 if h not in candidates:
288 hostnames = dict((v,k) for k, v in hostnames.iteritems())
292 def make_filter_description(self):
294 Makes a human-readable description of filtering conditions
298 # get initial candidates (no tag filters)
299 filters = self.build_filters({}, self.BASEFILTERS)
301 # keyword-only "pseudofilters"
303 filters['peer'] = self.site
305 # filter by tag, one tag at a time
306 applicable = self.applicable_filters
307 for tagfilter in self.TAGFILTERS.iteritems():
308 attr, (tagname, expr) = tagfilter
310 # don't bother if there's no filter defined
311 if attr in applicable:
312 filters[attr] = getattr(self,attr)
314 # filter by vsys tags - special case since it doesn't follow
315 # the usual semantics
316 if self.required_vsys:
317 filters['vsys'] = ','.join(list(self.required_vsys))
319 # filter by iface count
320 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
321 filters['num_ifaces'] = '-'.join([
322 str(self.min_num_external_ifaces or '0'),
323 str(self.max_num_external_ifaces or 'inf')
326 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
328 def assign_node_id(self, node_id):
329 self._node_id = node_id
330 self.fetch_node_info()
332 def unassign_node(self):
337 orig_attrs = self.__orig_attrs
338 except AttributeError:
341 for key, value in orig_attrs.iteritems():
342 setattr(self, key, value)
343 del self.__orig_attrs
345 def rate_nodes(self, nodes):
346 rates = collections.defaultdict(int)
347 tags = collections.defaultdict(dict)
348 replacements = {'timeframe':self.timeframe}
349 tagnames = [ tagname % replacements
350 for tagname, weight, default in self.RATE_FACTORS ]
352 taginfo = self._sliceapi.GetNodeTags(
355 fields=('node_id','tagname','value'))
357 unpack = operator.itemgetter('node_id','tagname','value')
358 for value in taginfo:
359 node, tagname, value = unpack(value)
360 if value and value.lower() != 'n/a':
361 tags[tagname][int(node)] = float(value)
363 for tagname, weight, default in self.RATE_FACTORS:
364 taginfo = tags[tagname % replacements].get
366 rates[node] += weight * taginfo(node,default)
368 return map(rates.__getitem__, nodes)
370 def fetch_node_info(self):
373 info, tags = self._sliceapi.GetNodeInfo(self._node_id)
376 tags = dict( (t['tagname'],t['value'])
379 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
380 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
381 self.min_num_external_ifaces = None
382 self.max_num_external_ifaces = None
385 replacements = {'timeframe':self.timeframe}
386 for attr, tag in self.BASEFILTERS.iteritems():
389 if hasattr(self, attr):
390 orig_attrs[attr] = getattr(self, attr)
391 setattr(self, attr, value)
392 for attr, (tag,_) in self.TAGFILTERS.iteritems():
393 tag = tag % replacements
396 if hasattr(self, attr):
397 orig_attrs[attr] = getattr(self, attr)
398 if not value or value.lower() == 'n/a':
400 setattr(self, attr, value)
402 if 'peer_id' in info:
403 orig_attrs['site'] = self.site
404 self.site = self._api.peer_map[info['peer_id']]
406 if 'interface_ids' in info:
407 self.min_num_external_ifaces = \
408 self.max_num_external_ifaces = len(info['interface_ids'])
410 if 'ssh_rsa_key' in info:
411 orig_attrs['server_key'] = self.server_key
412 self.server_key = info['ssh_rsa_key']
414 self.hostip = socket.gethostbyname(self.hostname)
418 except AttributeError:
419 self.__orig_attrs = orig_attrs
422 if self.home_path is None:
423 raise AssertionError, "Misconfigured node: missing home path"
424 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
425 raise AssertionError, "Misconfigured node: missing slice SSH key"
426 if self.slicename is None:
427 raise AssertionError, "Misconfigured node: unspecified slice"
430 # Mark dependencies installed
431 self._installed = True
433 # Clear load attributes, they impair re-discovery
434 self.minReliability = \
435 self.maxReliability = \
436 self.minBandwidth = \
437 self.maxBandwidth = \
443 def install_dependencies(self):
444 if self.required_packages and not self._installed:
445 # If we need rpmfusion, we must install the repo definition and the gpg keys
447 if self.operatingSystem == 'f12':
448 # Fedora 12 requires a different rpmfusion package
449 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
451 # This one works for f13+
452 RPM_FUSION_URL = self.RPM_FUSION_URL
455 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
457 'RPM_FUSION_URL' : RPM_FUSION_URL
463 (out,err),proc = server.popen_ssh_command(
465 host = self.hostname,
467 user = self.slicename,
469 ident_key = self.ident_path,
470 server_key = self.server_key,
475 if self.check_bad_host(out,err):
477 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
479 # Launch p2p yum dependency installer
480 self._yum_dependencies.async_setup()
482 def wait_provisioning(self, timeout = 20*60):
483 # Wait for the p2p installer
486 while not self.is_alive():
487 time.sleep(sleeptime)
488 totaltime += sleeptime
489 sleeptime = min(30.0, sleeptime*1.5)
491 if totaltime > timeout:
492 # PlanetLab has a 15' delay on configuration propagation
493 # If we're above that delay, the unresponsiveness is not due
495 if not self.is_alive(verbose=True):
496 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
498 # Ensure the node is clean (no apps running that could interfere with operations)
499 if self.enable_cleanup:
502 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
503 # Wait for the p2p installer
504 if self._yum_dependencies and not self._installed:
505 self._yum_dependencies.async_setup_wait()
506 self._installed = True
508 def is_alive(self, verbose = False):
509 # Make sure all the paths are created where
510 # they have to be created for deployment
511 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
513 host = self.hostname,
515 user = self.slicename,
517 ident_key = self.ident_path,
518 server_key = self.server_key,
520 err_on_timeout = False,
526 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
528 elif not err and out.strip() == 'ALIVE':
532 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
536 if self.enable_cleanup:
541 self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
543 util.appendBlacklist(self.hostname)
545 def do_cleanup(self):
546 if self.testbed().recovering:
550 self._logger.info("Cleaning up %s", self.hostname)
553 "sudo -S killall python tcpdump || /bin/true ; "
554 "sudo -S killall python tcpdump || /bin/true ; "
555 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
556 "sudo -S killall -u %(slicename)s || /bin/true ",
557 "sudo -S killall -u root || /bin/true ",
558 "sudo -S killall -u %(slicename)s || /bin/true ",
559 "sudo -S killall -u root || /bin/true ",
563 (out,err),proc = server.popen_ssh_command(
564 # Some apps need two kills
566 'slicename' : self.slicename ,
568 host = self.hostname,
570 user = self.slicename,
572 ident_key = self.ident_path,
573 server_key = self.server_key,
574 tty = True, # so that ps -N -T works as advertised...
580 def prepare_dependencies(self):
581 # Configure p2p yum dependency installer
582 if self.required_packages and not self._installed:
583 self._yum_dependencies = application.YumDependency(self._api)
584 self._yum_dependencies.node = self
585 self._yum_dependencies.home_path = "nepi-yumdep"
586 self._yum_dependencies.depends = ' '.join(self.required_packages)
588 def routing_method(self, routes, vsys_vnet):
590 There are two methods, vroute and sliceip.
593 Modifies the node's routing table directly, validating that the IP
594 range lies within the network given by the slice's vsys_vnet tag.
595 This method is the most scalable for very small routing tables
596 that need not modify other routes (including the default)
599 Uses policy routing and iptables filters to create per-sliver
600 routing tables. It's the most flexible way, but it doesn't scale
601 as well since only 155 routing tables can be created this way.
603 This method will return the most appropriate routing method, which will
604 prefer vroute for small routing tables.
607 # For now, sliceip results in kernel panics
608 # so we HAVE to use vroute
611 # We should not make the routing table grow too big
612 if len(routes) > MAX_VROUTE_ROUTES:
615 vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
617 dest, prefix, nexthop, metric = route
618 dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
619 nexthop = ipaddr.IPAddress(nexthop)
620 if dest not in vsys_vnet or nexthop not in vsys_vnet:
625 def format_route(self, route, dev, method, action):
626 dest, prefix, nexthop, metric = route
627 if method == 'vroute':
629 "%s %s%s gw %s %s" % (
632 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
637 elif method == 'sliceip':
639 "route %s to %s%s via %s metric %s dev %s" % (
642 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
649 raise AssertionError, "Unknown method"
651 def _annotate_routes_with_devs(self, routes, devs, method):
655 if dev.routes_here(route):
656 dev_routes.append(tuple(route) + (dev.if_name,))
661 if method == 'sliceip':
662 dev_routes.append(tuple(route) + ('eth0',))
664 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
665 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
668 def configure_routes(self, routes, devs, vsys_vnet):
670 Add the specified routes to the node's routing table
673 method = self.routing_method(routes, vsys_vnet)
676 # annotate routes with devices
677 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
678 for route in dev_routes:
679 route, dev = route[:-1], route[-1]
683 rules.append(self.format_route(route, dev, method, 'add'))
685 if method == 'sliceip':
686 rules = map('enable '.__add__, tdevs) + rules
688 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
689 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
691 self.apply_route_rules(rules, method)
693 self._configured_routes = set(routes)
694 self._configured_devs = tdevs
695 self._configured_method = method
697 def reconfigure_routes(self, routes, devs, vsys_vnet):
699 Updates the routes in the node's routing table to match
702 method = self._configured_method
704 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
706 current = self._configured_routes
707 current_devs = self._configured_devs
709 new = set(dev_routes)
710 new_devs = set(map(operator.itemgetter(-1), dev_routes))
712 deletions = current - new
713 insertions = new - current
715 dev_deletions = current_devs - new_devs
716 dev_insertions = new_devs - current_devs
721 # Rule deletions first
722 for route in deletions:
723 route, dev = route[:-1], route[-1]
724 rules.append(self.format_route(route, dev, method, 'del'))
726 if method == 'sliceip':
728 rules.extend(map('disable '.__add__, dev_deletions))
731 rules.extend(map('enable '.__add__, dev_insertions))
733 # Rule insertions now
734 for route in insertions:
735 route, dev = route[:-1], dev[-1]
736 rules.append(self.format_route(route, dev, method, 'add'))
739 self.apply_route_rules(rules, method)
741 self._configured_routes = dev_routes
742 self._configured_devs = new_devs
744 def apply_route_rules(self, rules, method):
745 (out,err),proc = server.popen_ssh_command(
746 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
747 home = server.shell_escape(self.home_path),
749 host = self.hostname,
751 user = self.slicename,
753 ident_key = self.ident_path,
754 server_key = self.server_key,
755 stdin = '\n'.join(rules),
759 if proc.wait() or err:
760 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
762 logger.debug("%s said: %s%s", method, out, err)
764 def check_bad_host(self, out, err):
765 badre = re.compile(r'(?:'
766 r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
767 r'|Error: disk I/O error'
770 return badre.search(out) or badre.search(err)