1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID
19 from nepi.util import server
20 from nepi.util import parallel
26 class UnresponsiveNodeError(RuntimeError):
29 def _castproperty(typ, propattr):
31 return getattr(self, propattr)
32 def _set(self, value):
33 if value is not None or (isinstance(value, basestring) and not value):
35 return setattr(self, propattr, value)
36 def _del(self, value):
37 return delattr(self, propattr)
38 _get.__name__ = propattr + '_get'
39 _set.__name__ = propattr + '_set'
40 _del.__name__ = propattr + '_del'
41 return property(_get, _set, _del)
45 # Map Node attribute to plcapi filter name
46 'hostname' : 'hostname',
50 # Map Node attribute to (<tag name>, <plcapi filter expression>)
51 # There are replacements that are applied with string formatting,
52 # so '%' has to be escaped as '%%'.
53 'architecture' : ('arch','value'),
54 'operatingSystem' : ('fcdistro','value'),
55 'pl_distro' : ('pldistro','value'),
56 'city' : ('city','value'),
57 'country' : ('country','value'),
58 'region' : ('region','value'),
59 'minReliability' : ('reliability%(timeframe)s', ']value'),
60 'maxReliability' : ('reliability%(timeframe)s', '[value'),
61 'minBandwidth' : ('bw%(timeframe)s', ']value'),
62 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63 'minLoad' : ('load%(timeframe)s', ']value'),
64 'maxLoad' : ('load%(timeframe)s', '[value'),
65 'minCpu' : ('cpu%(timeframe)s', ']value'),
66 'maxCpu' : ('cpu%(timeframe)s', '[value'),
70 # (<tag name>, <weight>, <default>)
71 ('bw%(timeframe)s', -0.001, 1024.0),
72 ('cpu%(timeframe)s', 0.1, 40.0),
73 ('load%(timeframe)s', -0.2, 3.0),
74 ('reliability%(timeframe)s', 1, 100.0),
77 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
82 minReliability = _castproperty(float, '_minReliability')
83 maxReliability = _castproperty(float, '_maxReliability')
84 minBandwidth = _castproperty(float, '_minBandwidth')
85 maxBandwidth = _castproperty(float, '_maxBandwidth')
86 minCpu = _castproperty(float, '_minCpu')
87 maxCpu = _castproperty(float, '_maxCpu')
88 minLoad = _castproperty(float, '_minLoad')
89 maxLoad = _castproperty(float, '_maxLoad')
91 def __init__(self, api=None, sliceapi=None):
95 self._sliceapi = sliceapi or api
99 self.architecture = None
100 self.operatingSystem = None
101 self.pl_distro = None
106 self.minReliability = None
107 self.maxReliability = None
108 self.minBandwidth = None
109 self.maxBandwidth = None
114 self.min_num_external_ifaces = None
115 self.max_num_external_ifaces = None
118 # Applications and routes add requirements to connected nodes
119 self.required_packages = set()
120 self.required_vsys = set()
122 self.rpmFusion = False
123 self.env = collections.defaultdict(list)
125 # Some special applications - initialized when connected
126 self.multicast_forwarder = None
128 # Testbed-derived attributes
129 self.slicename = None
130 self.ident_path = None
131 self.server_key = None
132 self.home_path = None
133 self.enable_cleanup = False
135 # Those are filled when an actual node is allocated
137 self._yum_dependencies = None
138 self._installed = False
141 self._logger = logging.getLogger('nepi.testbeds.planetlab')
143 def _nepi_testbed_environment_setup_get(self):
144 command = cStringIO.StringIO()
145 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
146 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
148 command.write(' ; export PATH=$PATH:%s' % (
149 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
152 for envkey, envvals in self.env.iteritems():
153 for envval in envvals:
154 command.write(' ; export %s=%s' % (envkey, envval))
155 return command.getvalue()
156 def _nepi_testbed_environment_setup_set(self, value):
158 _nepi_testbed_environment_setup = property(
159 _nepi_testbed_environment_setup_get,
160 _nepi_testbed_environment_setup_set)
162 def build_filters(self, target_filters, filter_map):
163 for attr, tag in filter_map.iteritems():
164 value = getattr(self, attr, None)
165 if value is not None:
166 target_filters[tag] = value
167 return target_filters
170 def applicable_filters(self):
171 has = lambda att : getattr(self,att,None) is not None
173 filter(has, self.BASEFILTERS.iterkeys())
174 + filter(has, self.TAGFILTERS.iterkeys())
177 def find_candidates(self, filter_slice_id=None):
178 self._logger.info("Finding candidates for %s", self.make_filter_description())
180 fields = ('node_id',)
181 replacements = {'timeframe':self.timeframe}
183 # get initial candidates (no tag filters)
184 basefilters = self.build_filters({}, self.BASEFILTERS)
185 rootfilters = basefilters.copy()
187 basefilters['|slice_ids'] = (filter_slice_id,)
189 # only pick healthy nodes
190 basefilters['run_level'] = 'boot'
191 basefilters['boot_state'] = 'boot'
192 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
193 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
195 # keyword-only "pseudofilters"
198 extra['peer'] = self.site
200 candidates = set(map(operator.itemgetter('node_id'),
201 self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
203 # filter by tag, one tag at a time
204 applicable = self.applicable_filters
205 for tagfilter in self.TAGFILTERS.iteritems():
206 attr, (tagname, expr) = tagfilter
208 # don't bother if there's no filter defined
209 if attr in applicable:
210 tagfilter = rootfilters.copy()
211 tagfilter['tagname'] = tagname % replacements
212 tagfilter[expr % replacements] = getattr(self,attr)
213 tagfilter['node_id'] = list(candidates)
215 candidates &= set(map(operator.itemgetter('node_id'),
216 self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
218 # filter by vsys tags - special case since it doesn't follow
219 # the usual semantics
220 if self.required_vsys:
221 newcandidates = collections.defaultdict(set)
223 vsys_tags = self._sliceapi.GetNodeTags(
225 node_id = list(candidates),
226 fields = ['node_id','value'])
229 operator.itemgetter(['node_id','value']),
232 required_vsys = self.required_vsys
233 for node_id, value in vsys_tags:
234 if value in required_vsys:
235 newcandidates[value].add(node_id)
237 # take only those that have all the required vsys tags
238 newcandidates = reduce(
239 lambda accum, new : accum & new,
240 newcandidates.itervalues(),
243 # filter by iface count
244 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245 # fetch interfaces for all, in one go
246 filters = basefilters.copy()
247 filters['node_id'] = list(candidates)
248 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
249 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
251 # filter candidates by interface count
252 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
253 predicate = ( lambda node_id :
254 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
255 elif self.min_num_external_ifaces is not None:
256 predicate = ( lambda node_id :
257 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
259 predicate = ( lambda node_id :
260 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
262 candidates = set(filter(predicate, candidates))
264 # make sure hostnames are resolvable
267 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
269 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
270 self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
273 def resolvable(node_id):
275 addr = socket.gethostbyname(hostnames[node_id])
276 return addr is not None
279 candidates = set(parallel.pfilter(resolvable, candidates,
282 self._logger.info(" Found %s reachable candidates.", len(candidates))
284 for h in hostnames.keys():
285 if h not in candidates:
288 hostnames = dict((v,k) for k, v in hostnames.iteritems())
292 def make_filter_description(self):
294 Makes a human-readable description of filtering conditions
298 # get initial candidates (no tag filters)
299 filters = self.build_filters({}, self.BASEFILTERS)
301 # keyword-only "pseudofilters"
303 filters['peer'] = self.site
305 # filter by tag, one tag at a time
306 applicable = self.applicable_filters
307 for tagfilter in self.TAGFILTERS.iteritems():
308 attr, (tagname, expr) = tagfilter
310 # don't bother if there's no filter defined
311 if attr in applicable:
312 filters[attr] = getattr(self,attr)
314 # filter by vsys tags - special case since it doesn't follow
315 # the usual semantics
316 if self.required_vsys:
317 filters['vsys'] = ','.join(list(self.required_vsys))
319 # filter by iface count
320 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
321 filters['num_ifaces'] = '-'.join([
322 str(self.min_num_external_ifaces or '0'),
323 str(self.max_num_external_ifaces or 'inf')
326 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
328 def assign_node_id(self, node_id):
329 self._node_id = node_id
330 self.fetch_node_info()
332 def unassign_node(self):
337 orig_attrs = self.__orig_attrs
338 except AttributeError:
341 for key, value in orig_attrs.iteritems():
342 setattr(self, key, value)
343 del self.__orig_attrs
345 def rate_nodes(self, nodes):
346 rates = collections.defaultdict(int)
347 tags = collections.defaultdict(dict)
348 replacements = {'timeframe':self.timeframe}
349 tagnames = [ tagname % replacements
350 for tagname, weight, default in self.RATE_FACTORS ]
352 taginfo = self._sliceapi.GetNodeTags(
355 fields=('node_id','tagname','value'))
357 unpack = operator.itemgetter('node_id','tagname','value')
358 for value in taginfo:
359 node, tagname, value = unpack(value)
360 if value and value.lower() != 'n/a':
361 tags[tagname][node] = float(value)
363 for tagname, weight, default in self.RATE_FACTORS:
364 taginfo = tags[tagname % replacements].get
366 rates[node] += weight * taginfo(node,default)
368 return map(rates.__getitem__, nodes)
370 def fetch_node_info(self):
373 info, tags = self._sliceapi.GetNodeInfo(self._node_id)
376 tags = dict( (t['tagname'],t['value'])
379 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
380 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
381 self.min_num_external_ifaces = None
382 self.max_num_external_ifaces = None
385 replacements = {'timeframe':self.timeframe}
387 for attr, tag in self.BASEFILTERS.iteritems():
390 if hasattr(self, attr):
391 orig_attrs[attr] = getattr(self, attr)
392 setattr(self, attr, value)
393 for attr, (tag,_) in self.TAGFILTERS.iteritems():
394 tag = tag % replacements
397 if hasattr(self, attr):
398 orig_attrs[attr] = getattr(self, attr)
399 if not value or value.lower() == 'n/a':
401 setattr(self, attr, value)
403 if 'peer_id' in info:
404 orig_attrs['site'] = self.site
405 self.site = self._sliceapi.peer_map[info['peer_id']]
407 if 'interface_ids' in info:
408 self.min_num_external_ifaces = \
409 self.max_num_external_ifaces = len(info['interface_ids'])
411 if 'ssh_rsa_key' in info:
412 orig_attrs['server_key'] = self.server_key
413 self.server_key = info['ssh_rsa_key']
415 self.hostip = socket.gethostbyname(self.hostname)
419 except AttributeError:
420 self.__orig_attrs = orig_attrs
423 if self.home_path is None:
424 raise AssertionError, "Misconfigured node: missing home path"
425 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
426 raise AssertionError, "Misconfigured node: missing slice SSH key"
427 if self.slicename is None:
428 raise AssertionError, "Misconfigured node: unspecified slice"
431 # Mark dependencies installed
432 self._installed = True
434 # Clear load attributes, they impair re-discovery
435 self.minReliability = \
436 self.maxReliability = \
437 self.minBandwidth = \
438 self.maxBandwidth = \
444 def install_dependencies(self):
445 if self.required_packages and not self._installed:
446 # If we need rpmfusion, we must install the repo definition and the gpg keys
448 if self.operatingSystem == 'f12':
449 # Fedora 12 requires a different rpmfusion package
450 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
452 # This one works for f13+
453 RPM_FUSION_URL = self.RPM_FUSION_URL
456 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
458 'RPM_FUSION_URL' : RPM_FUSION_URL
464 (out,err),proc = server.popen_ssh_command(
466 host = self.hostname,
468 user = self.slicename,
470 ident_key = self.ident_path,
471 server_key = self.server_key,
476 if self.check_bad_host(out,err):
478 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
480 # Launch p2p yum dependency installer
481 self._yum_dependencies.async_setup()
483 def wait_provisioning(self, timeout = 20*60):
484 # Wait for the p2p installer
487 while not self.is_alive():
488 time.sleep(sleeptime)
489 totaltime += sleeptime
490 sleeptime = min(30.0, sleeptime*1.5)
492 if totaltime > timeout:
493 # PlanetLab has a 15' delay on configuration propagation
494 # If we're above that delay, the unresponsiveness is not due
496 if not self.is_alive(verbose=True):
497 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
499 # Ensure the node is clean (no apps running that could interfere with operations)
500 if self.enable_cleanup:
503 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
504 # Wait for the p2p installer
505 if self._yum_dependencies and not self._installed:
506 self._yum_dependencies.async_setup_wait()
507 self._installed = True
509 def is_alive(self, verbose = False):
510 # Make sure all the paths are created where
511 # they have to be created for deployment
512 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
514 host = self.hostname,
516 user = self.slicename,
518 ident_key = self.ident_path,
519 server_key = self.server_key,
521 err_on_timeout = False,
527 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
529 elif not err and out.strip() == 'ALIVE':
533 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
537 if self.enable_cleanup:
542 self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
544 util.appendBlacklist(self.hostname)
546 def do_cleanup(self):
547 if self.testbed().recovering:
551 self._logger.info("Cleaning up %s", self.hostname)
554 "sudo -S killall python tcpdump || /bin/true ; "
555 "sudo -S killall python tcpdump || /bin/true ; "
556 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
557 "sudo -S killall -u %(slicename)s || /bin/true ",
558 "sudo -S killall -u root || /bin/true ",
559 "sudo -S killall -u %(slicename)s || /bin/true ",
560 "sudo -S killall -u root || /bin/true ",
564 (out,err),proc = server.popen_ssh_command(
565 # Some apps need two kills
567 'slicename' : self.slicename ,
569 host = self.hostname,
571 user = self.slicename,
573 ident_key = self.ident_path,
574 server_key = self.server_key,
575 tty = True, # so that ps -N -T works as advertised...
581 def prepare_dependencies(self):
582 # Configure p2p yum dependency installer
583 if self.required_packages and not self._installed:
584 self._yum_dependencies = application.YumDependency(self._api)
585 self._yum_dependencies.node = self
586 self._yum_dependencies.home_path = "nepi-yumdep"
587 self._yum_dependencies.depends = ' '.join(self.required_packages)
589 def routing_method(self, routes, vsys_vnet):
591 There are two methods, vroute and sliceip.
594 Modifies the node's routing table directly, validating that the IP
595 range lies within the network given by the slice's vsys_vnet tag.
596 This method is the most scalable for very small routing tables
597 that need not modify other routes (including the default)
600 Uses policy routing and iptables filters to create per-sliver
601 routing tables. It's the most flexible way, but it doesn't scale
602 as well since only 155 routing tables can be created this way.
604 This method will return the most appropriate routing method, which will
605 prefer vroute for small routing tables.
608 # For now, sliceip results in kernel panics
609 # so we HAVE to use vroute
612 # We should not make the routing table grow too big
613 if len(routes) > MAX_VROUTE_ROUTES:
616 vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
618 dest, prefix, nexthop, metric = route
619 dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
620 nexthop = ipaddr.IPAddress(nexthop)
621 if dest not in vsys_vnet or nexthop not in vsys_vnet:
626 def format_route(self, route, dev, method, action):
627 dest, prefix, nexthop, metric = route
628 if method == 'vroute':
630 "%s %s%s gw %s %s" % (
633 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
638 elif method == 'sliceip':
640 "route %s to %s%s via %s metric %s dev %s" % (
643 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
650 raise AssertionError, "Unknown method"
652 def _annotate_routes_with_devs(self, routes, devs, method):
656 if dev.routes_here(route):
657 dev_routes.append(tuple(route) + (dev.if_name,))
662 if method == 'sliceip':
663 dev_routes.append(tuple(route) + ('eth0',))
665 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
666 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
669 def configure_routes(self, routes, devs, vsys_vnet):
671 Add the specified routes to the node's routing table
674 method = self.routing_method(routes, vsys_vnet)
677 # annotate routes with devices
678 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
679 for route in dev_routes:
680 route, dev = route[:-1], route[-1]
684 rules.append(self.format_route(route, dev, method, 'add'))
686 if method == 'sliceip':
687 rules = map('enable '.__add__, tdevs) + rules
689 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
690 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
692 self.apply_route_rules(rules, method)
694 self._configured_routes = set(routes)
695 self._configured_devs = tdevs
696 self._configured_method = method
698 def reconfigure_routes(self, routes, devs, vsys_vnet):
700 Updates the routes in the node's routing table to match
703 method = self._configured_method
705 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
707 current = self._configured_routes
708 current_devs = self._configured_devs
710 new = set(dev_routes)
711 new_devs = set(map(operator.itemgetter(-1), dev_routes))
713 deletions = current - new
714 insertions = new - current
716 dev_deletions = current_devs - new_devs
717 dev_insertions = new_devs - current_devs
722 # Rule deletions first
723 for route in deletions:
724 route, dev = route[:-1], route[-1]
725 rules.append(self.format_route(route, dev, method, 'del'))
727 if method == 'sliceip':
729 rules.extend(map('disable '.__add__, dev_deletions))
732 rules.extend(map('enable '.__add__, dev_insertions))
734 # Rule insertions now
735 for route in insertions:
736 route, dev = route[:-1], dev[-1]
737 rules.append(self.format_route(route, dev, method, 'add'))
740 self.apply_route_rules(rules, method)
742 self._configured_routes = dev_routes
743 self._configured_devs = new_devs
745 def apply_route_rules(self, rules, method):
746 (out,err),proc = server.popen_ssh_command(
747 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
748 home = server.shell_escape(self.home_path),
750 host = self.hostname,
752 user = self.slicename,
754 ident_key = self.ident_path,
755 server_key = self.server_key,
756 stdin = '\n'.join(rules),
760 if proc.wait() or err:
761 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
763 logger.debug("%s said: %s%s", method, out, err)
765 def check_bad_host(self, out, err):
766 badre = re.compile(r'(?:'
767 r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
768 r'|Error: disk I/O error'
771 return badre.search(out) or badre.search(err)