1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID
19 from nepi.util import server
20 from nepi.util import parallel
26 class UnresponsiveNodeError(RuntimeError):
29 def _castproperty(typ, propattr):
31 return getattr(self, propattr)
32 def _set(self, value):
33 if value is not None or (isinstance(value, basestring) and not value):
35 return setattr(self, propattr, value)
36 def _del(self, value):
37 return delattr(self, propattr)
38 _get.__name__ = propattr + '_get'
39 _set.__name__ = propattr + '_set'
40 _del.__name__ = propattr + '_del'
41 return property(_get, _set, _del)
45 # Map Node attribute to plcapi filter name
46 'hostname' : 'hostname',
50 # Map Node attribute to (<tag name>, <plcapi filter expression>)
51 # There are replacements that are applied with string formatting,
52 # so '%' has to be escaped as '%%'.
53 'architecture' : ('arch','value'),
54 'operatingSystem' : ('fcdistro','value'),
55 'pl_distro' : ('pldistro','value'),
56 'city' : ('city','value'),
57 'country' : ('country','value'),
58 'region' : ('region','value'),
59 'minReliability' : ('reliability%(timeframe)s', ']value'),
60 'maxReliability' : ('reliability%(timeframe)s', '[value'),
61 'minBandwidth' : ('bw%(timeframe)s', ']value'),
62 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63 'minLoad' : ('load%(timeframe)s', ']value'),
64 'maxLoad' : ('load%(timeframe)s', '[value'),
65 'minCpu' : ('cpu%(timeframe)s', ']value'),
66 'maxCpu' : ('cpu%(timeframe)s', '[value'),
70 # (<tag name>, <weight>, <default>)
71 ('bw%(timeframe)s', -0.001, 1024.0),
72 ('cpu%(timeframe)s', 0.1, 40.0),
73 ('load%(timeframe)s', -0.2, 3.0),
74 ('reliability%(timeframe)s', 1, 100.0),
77 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
82 minReliability = _castproperty(float, '_minReliability')
83 maxReliability = _castproperty(float, '_maxReliability')
84 minBandwidth = _castproperty(float, '_minBandwidth')
85 maxBandwidth = _castproperty(float, '_maxBandwidth')
86 minCpu = _castproperty(float, '_minCpu')
87 maxCpu = _castproperty(float, '_maxCpu')
88 minLoad = _castproperty(float, '_minLoad')
89 maxLoad = _castproperty(float, '_maxLoad')
91 def __init__(self, api=None):
98 self.architecture = None
99 self.operatingSystem = None
100 self.pl_distro = None
105 self.minReliability = None
106 self.maxReliability = None
107 self.minBandwidth = None
108 self.maxBandwidth = None
113 self.min_num_external_ifaces = None
114 self.max_num_external_ifaces = None
117 # Applications and routes add requirements to connected nodes
118 self.required_packages = set()
119 self.required_vsys = set()
121 self.rpmFusion = False
122 self.env = collections.defaultdict(list)
124 # Some special applications - initialized when connected
125 self.multicast_forwarder = None
127 # Testbed-derived attributes
128 self.slicename = None
129 self.ident_path = None
130 self.server_key = None
131 self.home_path = None
132 self.enable_cleanup = False
134 # Those are filled when an actual node is allocated
136 self._yum_dependencies = None
137 self._installed = False
140 self._logger = logging.getLogger('nepi.testbeds.planetlab')
142 def _nepi_testbed_environment_setup_get(self):
143 command = cStringIO.StringIO()
144 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
145 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
147 command.write(' ; export PATH=$PATH:%s' % (
148 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
151 for envkey, envvals in self.env.iteritems():
152 for envval in envvals:
153 command.write(' ; export %s=%s' % (envkey, envval))
154 return command.getvalue()
155 def _nepi_testbed_environment_setup_set(self, value):
157 _nepi_testbed_environment_setup = property(
158 _nepi_testbed_environment_setup_get,
159 _nepi_testbed_environment_setup_set)
161 def build_filters(self, target_filters, filter_map):
162 for attr, tag in filter_map.iteritems():
163 value = getattr(self, attr, None)
164 if value is not None:
165 target_filters[tag] = value
166 return target_filters
169 def applicable_filters(self):
170 has = lambda att : getattr(self,att,None) is not None
172 filter(has, self.BASEFILTERS.iterkeys())
173 + filter(has, self.TAGFILTERS.iterkeys())
176 def find_candidates(self, filter_slice_id=None):
177 self._logger.info("Finding candidates for %s", self.make_filter_description())
179 fields = ('node_id',)
180 replacements = {'timeframe':self.timeframe}
182 # get initial candidates (no tag filters)
183 basefilters = self.build_filters({}, self.BASEFILTERS)
184 rootfilters = basefilters.copy()
186 basefilters['|slice_ids'] = (filter_slice_id,)
188 # only pick healthy nodes
189 basefilters['run_level'] = 'boot'
190 basefilters['boot_state'] = 'boot'
191 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
192 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
194 # keyword-only "pseudofilters"
197 extra['peer'] = self.site
199 candidates = set(map(operator.itemgetter('node_id'),
200 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
202 # filter by tag, one tag at a time
203 applicable = self.applicable_filters
204 for tagfilter in self.TAGFILTERS.iteritems():
205 attr, (tagname, expr) = tagfilter
207 # don't bother if there's no filter defined
208 if attr in applicable:
209 tagfilter = rootfilters.copy()
210 tagfilter['tagname'] = tagname % replacements
211 tagfilter[expr % replacements] = getattr(self,attr)
212 tagfilter['node_id'] = list(candidates)
214 candidates &= set(map(operator.itemgetter('node_id'),
215 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
217 # filter by vsys tags - special case since it doesn't follow
218 # the usual semantics
219 if self.required_vsys:
220 newcandidates = collections.defaultdict(set)
222 vsys_tags = self._api.GetNodeTags(
224 node_id = list(candidates),
225 fields = ['node_id','value'])
228 operator.itemgetter(['node_id','value']),
231 required_vsys = self.required_vsys
232 for node_id, value in vsys_tags:
233 if value in required_vsys:
234 newcandidates[value].add(node_id)
236 # take only those that have all the required vsys tags
237 newcandidates = reduce(
238 lambda accum, new : accum & new,
239 newcandidates.itervalues(),
242 # filter by iface count
243 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
244 # fetch interfaces for all, in one go
245 filters = basefilters.copy()
246 filters['node_id'] = list(candidates)
247 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
248 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
250 # filter candidates by interface count
251 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
252 predicate = ( lambda node_id :
253 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
254 elif self.min_num_external_ifaces is not None:
255 predicate = ( lambda node_id :
256 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
258 predicate = ( lambda node_id :
259 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
261 candidates = set(filter(predicate, candidates))
263 # make sure hostnames are resolvable
265 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
267 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
268 self._api.GetNodes(list(candidates), ['node_id','hostname'])
270 def resolvable(node_id):
272 addr = socket.gethostbyname(hostnames[node_id])
273 return addr is not None
276 candidates = set(parallel.pfilter(resolvable, candidates,
279 self._logger.info(" Found %s reachable candidates.", len(candidates))
283 def make_filter_description(self):
285 Makes a human-readable description of filtering conditions
289 # get initial candidates (no tag filters)
290 filters = self.build_filters({}, self.BASEFILTERS)
292 # keyword-only "pseudofilters"
294 filters['peer'] = self.site
296 # filter by tag, one tag at a time
297 applicable = self.applicable_filters
298 for tagfilter in self.TAGFILTERS.iteritems():
299 attr, (tagname, expr) = tagfilter
301 # don't bother if there's no filter defined
302 if attr in applicable:
303 filters[attr] = getattr(self,attr)
305 # filter by vsys tags - special case since it doesn't follow
306 # the usual semantics
307 if self.required_vsys:
308 filters['vsys'] = ','.join(list(self.required_vsys))
310 # filter by iface count
311 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
312 filters['num_ifaces'] = '-'.join([
313 str(self.min_num_external_ifaces or '0'),
314 str(self.max_num_external_ifaces or 'inf')
317 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
319 def assign_node_id(self, node_id):
320 self._node_id = node_id
321 self.fetch_node_info()
323 def unassign_node(self):
328 orig_attrs = self.__orig_attrs
329 except AttributeError:
332 for key, value in orig_attrs.iteritems():
333 setattr(self, key, value)
334 del self.__orig_attrs
336 def rate_nodes(self, nodes):
337 rates = collections.defaultdict(int)
338 tags = collections.defaultdict(dict)
339 replacements = {'timeframe':self.timeframe}
340 tagnames = [ tagname % replacements
341 for tagname, weight, default in self.RATE_FACTORS ]
343 taginfo = self._api.GetNodeTags(
346 fields=('node_id','tagname','value'))
348 unpack = operator.itemgetter('node_id','tagname','value')
349 for value in taginfo:
350 node, tagname, value = unpack(value)
351 if value and value.lower() != 'n/a':
352 tags[tagname][int(node)] = float(value)
354 for tagname, weight, default in self.RATE_FACTORS:
355 taginfo = tags[tagname % replacements].get
357 rates[node] += weight * taginfo(node,default)
359 return map(rates.__getitem__, nodes)
361 def fetch_node_info(self):
364 self._api.StartMulticall()
365 info = self._api.GetNodes(self._node_id)
366 tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
367 info, tags = self._api.FinishMulticall()
370 tags = dict( (t['tagname'],t['value'])
373 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
374 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
375 self.min_num_external_ifaces = None
376 self.max_num_external_ifaces = None
379 replacements = {'timeframe':self.timeframe}
380 for attr, tag in self.BASEFILTERS.iteritems():
383 if hasattr(self, attr):
384 orig_attrs[attr] = getattr(self, attr)
385 setattr(self, attr, value)
386 for attr, (tag,_) in self.TAGFILTERS.iteritems():
387 tag = tag % replacements
390 if hasattr(self, attr):
391 orig_attrs[attr] = getattr(self, attr)
392 if not value or value.lower() == 'n/a':
394 setattr(self, attr, value)
396 if 'peer_id' in info:
397 orig_attrs['site'] = self.site
398 self.site = self._api.peer_map[info['peer_id']]
400 if 'interface_ids' in info:
401 self.min_num_external_ifaces = \
402 self.max_num_external_ifaces = len(info['interface_ids'])
404 if 'ssh_rsa_key' in info:
405 orig_attrs['server_key'] = self.server_key
406 self.server_key = info['ssh_rsa_key']
408 self.hostip = socket.gethostbyname(self.hostname)
412 except AttributeError:
413 self.__orig_attrs = orig_attrs
416 if self.home_path is None:
417 raise AssertionError, "Misconfigured node: missing home path"
418 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
419 raise AssertionError, "Misconfigured node: missing slice SSH key"
420 if self.slicename is None:
421 raise AssertionError, "Misconfigured node: unspecified slice"
424 # Mark dependencies installed
425 self._installed = True
427 # Clear load attributes, they impair re-discovery
428 self.minReliability = \
429 self.maxReliability = \
430 self.minBandwidth = \
431 self.maxBandwidth = \
437 def install_dependencies(self):
438 if self.required_packages and not self._installed:
439 # If we need rpmfusion, we must install the repo definition and the gpg keys
441 if self.operatingSystem == 'f12':
442 # Fedora 12 requires a different rpmfusion package
443 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
445 # This one works for f13+
446 RPM_FUSION_URL = self.RPM_FUSION_URL
449 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
451 'RPM_FUSION_URL' : RPM_FUSION_URL
457 (out,err),proc = server.popen_ssh_command(
459 host = self.hostname,
461 user = self.slicename,
463 ident_key = self.ident_path,
464 server_key = self.server_key,
469 if self.check_bad_host(out,err):
471 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
473 # Launch p2p yum dependency installer
474 self._yum_dependencies.async_setup()
476 def wait_provisioning(self, timeout = 20*60):
477 # Wait for the p2p installer
480 while not self.is_alive():
481 time.sleep(sleeptime)
482 totaltime += sleeptime
483 sleeptime = min(30.0, sleeptime*1.5)
485 if totaltime > timeout:
486 # PlanetLab has a 15' delay on configuration propagation
487 # If we're above that delay, the unresponsiveness is not due
489 if not self.is_alive(verbose=True):
490 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
492 # Ensure the node is clean (no apps running that could interfere with operations)
493 if self.enable_cleanup:
496 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
497 # Wait for the p2p installer
498 if self._yum_dependencies and not self._installed:
499 self._yum_dependencies.async_setup_wait()
500 self._installed = True
502 def is_alive(self, verbose = False):
503 # Make sure all the paths are created where
504 # they have to be created for deployment
505 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
507 host = self.hostname,
509 user = self.slicename,
511 ident_key = self.ident_path,
512 server_key = self.server_key,
514 err_on_timeout = False,
520 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
522 elif not err and out.strip() == 'ALIVE':
526 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
530 if self.enable_cleanup:
535 self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
537 util.appendBlacklist(self._node_id)
539 def do_cleanup(self):
540 if self.testbed().recovering:
544 self._logger.info("Cleaning up %s", self.hostname)
547 "sudo -S killall python tcpdump || /bin/true ; "
548 "sudo -S killall python tcpdump || /bin/true ; "
549 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
550 "sudo -S killall -u %(slicename)s || /bin/true ",
551 "sudo -S killall -u root || /bin/true ",
552 "sudo -S killall -u %(slicename)s || /bin/true ",
553 "sudo -S killall -u root || /bin/true ",
557 (out,err),proc = server.popen_ssh_command(
558 # Some apps need two kills
560 'slicename' : self.slicename ,
562 host = self.hostname,
564 user = self.slicename,
566 ident_key = self.ident_path,
567 server_key = self.server_key,
568 tty = True, # so that ps -N -T works as advertised...
574 def prepare_dependencies(self):
575 # Configure p2p yum dependency installer
576 if self.required_packages and not self._installed:
577 self._yum_dependencies = application.YumDependency(self._api)
578 self._yum_dependencies.node = self
579 self._yum_dependencies.home_path = "nepi-yumdep"
580 self._yum_dependencies.depends = ' '.join(self.required_packages)
582 def routing_method(self, routes, vsys_vnet):
584 There are two methods, vroute and sliceip.
587 Modifies the node's routing table directly, validating that the IP
588 range lies within the network given by the slice's vsys_vnet tag.
589 This method is the most scalable for very small routing tables
590 that need not modify other routes (including the default)
593 Uses policy routing and iptables filters to create per-sliver
594 routing tables. It's the most flexible way, but it doesn't scale
595 as well since only 155 routing tables can be created this way.
597 This method will return the most appropriate routing method, which will
598 prefer vroute for small routing tables.
601 # For now, sliceip results in kernel panics
602 # so we HAVE to use vroute
605 # We should not make the routing table grow too big
606 if len(routes) > MAX_VROUTE_ROUTES:
609 vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
611 dest, prefix, nexthop, metric = route
612 dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
613 nexthop = ipaddr.IPAddress(nexthop)
614 if dest not in vsys_vnet or nexthop not in vsys_vnet:
619 def format_route(self, route, dev, method, action):
620 dest, prefix, nexthop, metric = route
621 if method == 'vroute':
623 "%s %s%s gw %s %s" % (
626 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
631 elif method == 'sliceip':
633 "route %s to %s%s via %s metric %s dev %s" % (
636 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
643 raise AssertionError, "Unknown method"
645 def _annotate_routes_with_devs(self, routes, devs, method):
649 if dev.routes_here(route):
650 dev_routes.append(tuple(route) + (dev.if_name,))
655 if method == 'sliceip':
656 dev_routes.append(tuple(route) + ('eth0',))
658 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
659 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
662 def configure_routes(self, routes, devs, vsys_vnet):
664 Add the specified routes to the node's routing table
667 method = self.routing_method(routes, vsys_vnet)
670 # annotate routes with devices
671 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
672 for route in dev_routes:
673 route, dev = route[:-1], route[-1]
677 rules.append(self.format_route(route, dev, method, 'add'))
679 if method == 'sliceip':
680 rules = map('enable '.__add__, tdevs) + rules
682 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
683 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
685 self.apply_route_rules(rules, method)
687 self._configured_routes = set(routes)
688 self._configured_devs = tdevs
689 self._configured_method = method
691 def reconfigure_routes(self, routes, devs, vsys_vnet):
693 Updates the routes in the node's routing table to match
696 method = self._configured_method
698 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
700 current = self._configured_routes
701 current_devs = self._configured_devs
703 new = set(dev_routes)
704 new_devs = set(map(operator.itemgetter(-1), dev_routes))
706 deletions = current - new
707 insertions = new - current
709 dev_deletions = current_devs - new_devs
710 dev_insertions = new_devs - current_devs
715 # Rule deletions first
716 for route in deletions:
717 route, dev = route[:-1], route[-1]
718 rules.append(self.format_route(route, dev, method, 'del'))
720 if method == 'sliceip':
722 rules.extend(map('disable '.__add__, dev_deletions))
725 rules.extend(map('enable '.__add__, dev_insertions))
727 # Rule insertions now
728 for route in insertions:
729 route, dev = route[:-1], dev[-1]
730 rules.append(self.format_route(route, dev, method, 'add'))
733 self.apply_route_rules(rules, method)
735 self._configured_routes = dev_routes
736 self._configured_devs = new_devs
738 def apply_route_rules(self, rules, method):
739 (out,err),proc = server.popen_ssh_command(
740 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
741 home = server.shell_escape(self.home_path),
743 host = self.hostname,
745 user = self.slicename,
747 ident_key = self.ident_path,
748 server_key = self.server_key,
749 stdin = '\n'.join(rules),
753 if proc.wait() or err:
754 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
756 logger.debug("%s said: %s%s", method, out, err)
758 def check_bad_host(self, out, err):
759 badre = re.compile(r'(?:'
760 r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
761 r'|Error: disk I/O error'
764 return badre.search(out) or badre.search(err)