1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID
19 from nepi.util import server
20 from nepi.util import parallel
26 class UnresponsiveNodeError(RuntimeError):
29 def _castproperty(typ, propattr):
31 return getattr(self, propattr)
32 def _set(self, value):
33 if value is not None or (isinstance(value, basestring) and not value):
35 return setattr(self, propattr, value)
36 def _del(self, value):
37 return delattr(self, propattr)
38 _get.__name__ = propattr + '_get'
39 _set.__name__ = propattr + '_set'
40 _del.__name__ = propattr + '_del'
41 return property(_get, _set, _del)
45 # Map Node attribute to plcapi filter name
46 'hostname' : 'hostname',
50 # Map Node attribute to (<tag name>, <plcapi filter expression>)
51 # There are replacements that are applied with string formatting,
52 # so '%' has to be escaped as '%%'.
53 'architecture' : ('arch','value'),
54 'operatingSystem' : ('fcdistro','value'),
55 'pl_distro' : ('pldistro','value'),
56 'city' : ('city','value'),
57 'country' : ('country','value'),
58 'region' : ('region','value'),
59 'minReliability' : ('reliability%(timeframe)s', ']value'),
60 'maxReliability' : ('reliability%(timeframe)s', '[value'),
61 'minBandwidth' : ('bw%(timeframe)s', ']value'),
62 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63 'minLoad' : ('load%(timeframe)s', ']value'),
64 'maxLoad' : ('load%(timeframe)s', '[value'),
65 'minCpu' : ('cpu%(timeframe)s', ']value'),
66 'maxCpu' : ('cpu%(timeframe)s', '[value'),
70 # (<tag name>, <weight>, <default>)
71 ('bw%(timeframe)s', -0.001, 1024.0),
72 ('cpu%(timeframe)s', 0.1, 40.0),
73 ('load%(timeframe)s', -0.2, 3.0),
74 ('reliability%(timeframe)s', 1, 100.0),
77 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
80 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
81 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
83 minReliability = _castproperty(float, '_minReliability')
84 maxReliability = _castproperty(float, '_maxReliability')
85 minBandwidth = _castproperty(float, '_minBandwidth')
86 maxBandwidth = _castproperty(float, '_maxBandwidth')
87 minCpu = _castproperty(float, '_minCpu')
88 maxCpu = _castproperty(float, '_maxCpu')
89 minLoad = _castproperty(float, '_minLoad')
90 maxLoad = _castproperty(float, '_maxLoad')
92 def __init__(self, api=None, sliceapi=None):
96 self._sliceapi = sliceapi or api
100 self.architecture = None
101 self.operatingSystem = None
102 self.pl_distro = None
107 self.minReliability = None
108 self.maxReliability = None
109 self.minBandwidth = None
110 self.maxBandwidth = None
115 self.min_num_external_ifaces = None
116 self.max_num_external_ifaces = None
117 self._timeframe = 'w'
119 # Applications and routes add requirements to connected nodes
120 self.required_packages = set()
121 self.required_vsys = set()
123 self.rpmFusion = False
124 self.env = collections.defaultdict(list)
126 # Some special applications - initialized when connected
127 self.multicast_forwarder = None
129 # Testbed-derived attributes
130 self.slicename = None
131 self.ident_path = None
132 self.server_key = None
133 self.home_path = None
134 self.enable_proc_cleanup = False
135 self.enable_home_cleanup = False
137 # Those are filled when an actual node is allocated
139 self._yum_dependencies = None
140 self._installed = False
143 self._logger = logging.getLogger('nepi.testbeds.planetlab')
145 def set_timeframe(self, timeframe):
146 if timeframe == "latest":
148 elif timeframe == "month":
149 self._timeframe = "m"
150 elif timeframe == "year":
151 self._timeframe = "y"
153 self._timeframe = "w"
155 def get_timeframe(self):
156 if self._timeframe == "":
158 if self._timeframe == "m":
160 if self._timeframe == "y":
164 timeframe = property(get_timeframe, set_timeframe)
166 def _nepi_testbed_environment_setup_get(self):
167 command = cStringIO.StringIO()
168 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
169 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
171 command.write(' ; export PATH=$PATH:%s' % (
172 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
175 for envkey, envvals in self.env.iteritems():
176 for envval in envvals:
177 command.write(' ; export %s=%s' % (envkey, envval))
178 return command.getvalue()
180 def _nepi_testbed_environment_setup_set(self, value):
183 _nepi_testbed_environment_setup = property(
184 _nepi_testbed_environment_setup_get,
185 _nepi_testbed_environment_setup_set)
187 def build_filters(self, target_filters, filter_map):
188 for attr, tag in filter_map.iteritems():
189 value = getattr(self, attr, None)
190 if value is not None:
191 target_filters[tag] = value
192 return target_filters
195 def applicable_filters(self):
196 has = lambda att : getattr(self,att,None) is not None
198 filter(has, self.BASEFILTERS.iterkeys())
199 + filter(has, self.TAGFILTERS.iterkeys())
202 def find_candidates(self, filter_slice_id=None):
203 self._logger.info("Finding candidates for %s", self.make_filter_description())
205 fields = ('node_id',)
206 replacements = {'timeframe':self._timeframe}
208 # get initial candidates (no tag filters)
209 basefilters = self.build_filters({}, self.BASEFILTERS)
210 rootfilters = basefilters.copy()
212 basefilters['|slice_ids'] = (filter_slice_id,)
214 # only pick healthy nodes
215 basefilters['run_level'] = 'boot'
216 basefilters['boot_state'] = 'boot'
217 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
218 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
220 # keyword-only "pseudofilters"
223 extra['peer'] = self.site
225 candidates = set(map(operator.itemgetter('node_id'),
226 self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
228 # filter by tag, one tag at a time
229 applicable = self.applicable_filters
230 for tagfilter in self.TAGFILTERS.iteritems():
231 attr, (tagname, expr) = tagfilter
233 # don't bother if there's no filter defined
234 if attr in applicable:
235 tagfilter = rootfilters.copy()
236 tagfilter['tagname'] = tagname % replacements
237 tagfilter[expr % replacements] = str(getattr(self,attr))
238 tagfilter['node_id'] = list(candidates)
240 candidates &= set(map(operator.itemgetter('node_id'),
241 self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
243 # filter by vsys tags - special case since it doesn't follow
244 # the usual semantics
245 if self.required_vsys:
246 newcandidates = collections.defaultdict(set)
248 vsys_tags = self._sliceapi.GetNodeTags(
250 node_id = list(candidates),
251 fields = ['node_id','value'])
254 operator.itemgetter(['node_id','value']),
257 required_vsys = self.required_vsys
258 for node_id, value in vsys_tags:
259 if value in required_vsys:
260 newcandidates[value].add(node_id)
262 # take only those that have all the required vsys tags
263 newcandidates = reduce(
264 lambda accum, new : accum & new,
265 newcandidates.itervalues(),
268 # filter by iface count
269 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
270 # fetch interfaces for all, in one go
271 filters = basefilters.copy()
272 filters['node_id'] = list(candidates)
273 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
274 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
276 # filter candidates by interface count
277 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
278 predicate = ( lambda node_id :
279 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
280 elif self.min_num_external_ifaces is not None:
281 predicate = ( lambda node_id :
282 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
284 predicate = ( lambda node_id :
285 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
287 candidates = set(filter(predicate, candidates))
289 # make sure hostnames are resolvable
292 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
294 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
295 self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
298 def resolvable(node_id):
300 addr = server.gethostbyname(hostnames[node_id])
301 return addr is not None
304 candidates = set(parallel.pfilter(resolvable, candidates,
307 self._logger.info(" Found %s reachable candidates.", len(candidates))
309 for h in hostnames.keys():
310 if h not in candidates:
313 hostnames = dict((v,k) for k, v in hostnames.iteritems())
317 def make_filter_description(self):
319 Makes a human-readable description of filtering conditions
323 # get initial candidates (no tag filters)
324 filters = self.build_filters({}, self.BASEFILTERS)
326 # keyword-only "pseudofilters"
328 filters['peer'] = self.site
330 # filter by tag, one tag at a time
331 applicable = self.applicable_filters
332 for tagfilter in self.TAGFILTERS.iteritems():
333 attr, (tagname, expr) = tagfilter
335 # don't bother if there's no filter defined
336 if attr in applicable:
337 filters[attr] = getattr(self,attr)
339 # filter by vsys tags - special case since it doesn't follow
340 # the usual semantics
341 if self.required_vsys:
342 filters['vsys'] = ','.join(list(self.required_vsys))
344 # filter by iface count
345 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
346 filters['num_ifaces'] = '-'.join([
347 str(self.min_num_external_ifaces or '0'),
348 str(self.max_num_external_ifaces or 'inf')
351 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
353 def assign_node_id(self, node_id):
354 self._node_id = node_id
355 self.fetch_node_info()
357 def unassign_node(self):
362 orig_attrs = self.__orig_attrs
363 except AttributeError:
366 for key, value in orig_attrs.iteritems():
367 setattr(self, key, value)
368 del self.__orig_attrs
370 def rate_nodes(self, nodes):
371 rates = collections.defaultdict(int)
372 tags = collections.defaultdict(dict)
373 replacements = {'timeframe':self._timeframe}
374 tagnames = [ tagname % replacements
375 for tagname, weight, default in self.RATE_FACTORS ]
377 taginfo = self._sliceapi.GetNodeTags(
380 fields=('node_id','tagname','value'))
382 unpack = operator.itemgetter('node_id','tagname','value')
383 for value in taginfo:
384 node, tagname, value = unpack(value)
385 if value and value.lower() != 'n/a':
386 tags[tagname][node] = float(value)
388 for tagname, weight, default in self.RATE_FACTORS:
389 taginfo = tags[tagname % replacements].get
391 rates[node] += weight * taginfo(node,default)
393 return map(rates.__getitem__, nodes)
395 def fetch_node_info(self):
398 info, tags = self._sliceapi.GetNodeInfo(self._node_id)
401 tags = dict( (t['tagname'],t['value'])
404 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
405 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
406 self.min_num_external_ifaces = None
407 self.max_num_external_ifaces = None
408 if not self._timeframe: self._timeframe = 'w'
410 replacements = {'timeframe':self._timeframe}
412 for attr, tag in self.BASEFILTERS.iteritems():
415 if hasattr(self, attr):
416 orig_attrs[attr] = getattr(self, attr)
417 setattr(self, attr, value)
418 for attr, (tag,_) in self.TAGFILTERS.iteritems():
419 tag = tag % replacements
422 if hasattr(self, attr):
423 orig_attrs[attr] = getattr(self, attr)
424 if not value or value.lower() == 'n/a':
426 setattr(self, attr, value)
428 if 'peer_id' in info:
429 orig_attrs['site'] = self.site
430 self.site = self._sliceapi.peer_map[info['peer_id']]
432 if 'interface_ids' in info:
433 self.min_num_external_ifaces = \
434 self.max_num_external_ifaces = len(info['interface_ids'])
436 if 'ssh_rsa_key' in info:
437 orig_attrs['server_key'] = self.server_key
438 self.server_key = info['ssh_rsa_key']
440 self.hostip = server.gethostbyname(self.hostname)
444 except AttributeError:
445 self.__orig_attrs = orig_attrs
448 if self.home_path is None:
449 raise AssertionError, "Misconfigured node: missing home path"
450 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
451 raise AssertionError, "Misconfigured node: missing slice SSH key"
452 if self.slicename is None:
453 raise AssertionError, "Misconfigured node: unspecified slice"
456 # Mark dependencies installed
457 self._installed = True
459 # Clear load attributes, they impair re-discovery
460 self.minReliability = \
461 self.maxReliability = \
462 self.minBandwidth = \
463 self.maxBandwidth = \
469 def install_dependencies(self):
470 if self.required_packages and not self._installed:
471 # If we need rpmfusion, we must install the repo definition and the gpg keys
473 if self.operatingSystem == 'f12':
474 # Fedora 12 requires a different rpmfusion package
475 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
477 # This one works for f13+
478 RPM_FUSION_URL = self.RPM_FUSION_URL
481 'rpm -q rpmfusion-free-release || sudo -S rpm -i %(RPM_FUSION_URL)s'
483 'RPM_FUSION_URL' : RPM_FUSION_URL
489 (out,err),proc = server.popen_ssh_command(
493 user = self.slicename,
495 ident_key = self.ident_path,
496 server_key = self.server_key,
501 if self.check_bad_host(out,err):
503 raise RuntimeError, "Failed to set up application on host %s: %s %s" % (self.hostname, out,err,)
505 # Launch p2p yum dependency installer
506 self._yum_dependencies.async_setup()
508 def wait_provisioning(self, timeout = 20*60):
509 # Wait for the p2p installer
512 while not self.is_alive():
513 time.sleep(sleeptime)
514 totaltime += sleeptime
515 sleeptime = min(30.0, sleeptime*1.5)
517 if totaltime > timeout:
518 # PlanetLab has a 15' delay on configuration propagation
519 # If we're above that delay, the unresponsiveness is not due
521 if not self.is_alive(verbose=True):
522 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
524 # Ensure the node is clean (no apps running that could interfere with operations)
525 if self.enable_proc_cleanup:
526 self.do_proc_cleanup()
527 if self.enable_home_cleanup:
528 self.do_home_cleanup()
530 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
531 # Wait for the p2p installer
532 if self._yum_dependencies and not self._installed:
533 self._yum_dependencies.async_setup_wait()
534 self._installed = True
536 def is_alive(self, verbose = False):
537 # Make sure all the paths are created where
538 # they have to be created for deployment
539 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
543 user = self.slicename,
545 ident_key = self.ident_path,
546 server_key = self.server_key,
548 err_on_timeout = False,
554 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
556 elif not err and out.strip() == 'ALIVE':
560 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
564 if self.enable_proc_cleanup:
565 self.do_proc_cleanup()
569 self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
571 util.appendBlacklist(self.hostname)
573 def do_proc_cleanup(self):
574 if self.testbed().recovering:
578 self._logger.info("Cleaning up processes on %s", self.hostname)
581 "sudo -S killall python tcpdump || /bin/true ; "
582 "sudo -S killall python tcpdump || /bin/true ; "
583 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
584 "sudo -S killall -u %(slicename)s || /bin/true ",
585 "sudo -S killall -u root || /bin/true ",
586 "sudo -S killall -u %(slicename)s || /bin/true ",
587 "sudo -S killall -u root || /bin/true ",
591 (out,err),proc = server.popen_ssh_command(
592 # Some apps need two kills
594 'slicename' : self.slicename ,
598 user = self.slicename,
600 ident_key = self.ident_path,
601 server_key = self.server_key,
602 tty = True, # so that ps -N -T works as advertised...
608 def do_home_cleanup(self):
609 if self.testbed().recovering:
613 self._logger.info("Cleaning up home on %s", self.hostname)
616 "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + "
620 (out,err),proc = server.popen_ssh_command(
621 # Some apps need two kills
625 user = self.slicename,
627 ident_key = self.ident_path,
628 server_key = self.server_key,
629 tty = True, # so that ps -N -T works as advertised...
635 def prepare_dependencies(self):
636 # Configure p2p yum dependency installer
637 if self.required_packages and not self._installed:
638 self._yum_dependencies = application.YumDependency(self._api)
639 self._yum_dependencies.node = self
640 self._yum_dependencies.home_path = "nepi-yumdep"
641 self._yum_dependencies.depends = ' '.join(self.required_packages)
643 def routing_method(self, routes, vsys_vnet):
645 There are two methods, vroute and sliceip.
648 Modifies the node's routing table directly, validating that the IP
649 range lies within the network given by the slice's vsys_vnet tag.
650 This method is the most scalable for very small routing tables
651 that need not modify other routes (including the default)
654 Uses policy routing and iptables filters to create per-sliver
655 routing tables. It's the most flexible way, but it doesn't scale
656 as well since only 155 routing tables can be created this way.
658 This method will return the most appropriate routing method, which will
659 prefer vroute for small routing tables.
662 # For now, sliceip results in kernel panics
663 # so we HAVE to use vroute
666 # We should not make the routing table grow too big
667 if len(routes) > MAX_VROUTE_ROUTES:
670 vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
672 dest, prefix, nexthop, metric, device = route
673 dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
674 nexthop = ipaddr.IPAddress(nexthop)
675 if dest not in vsys_vnet or nexthop not in vsys_vnet:
680 def format_route(self, route, dev, method, action):
681 dest, prefix, nexthop, metric, device = route
682 if method == 'vroute':
684 "%s %s%s gw %s %s" % (
687 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
692 elif method == 'sliceip':
694 "route %s to %s%s via %s metric %s dev %s" % (
697 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
704 raise AssertionError, "Unknown method"
706 def _annotate_routes_with_devs(self, routes, devs, method):
710 if dev.routes_here(route):
711 dev_routes.append(tuple(route) + (dev.if_name,))
716 if method == 'sliceip':
717 dev_routes.append(tuple(route) + ('eth0',))
719 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
720 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
723 def configure_routes(self, routes, devs, vsys_vnet):
725 Add the specified routes to the node's routing table
728 method = self.routing_method(routes, vsys_vnet)
731 # annotate routes with devices
732 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
733 for route in dev_routes:
734 route, dev = route[:-1], route[-1]
738 rules.append(self.format_route(route, dev, method, 'add'))
740 if method == 'sliceip':
741 rules = map('enable '.__add__, tdevs) + rules
743 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
744 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
746 self.apply_route_rules(rules, method)
748 self._configured_routes = set(routes)
749 self._configured_devs = tdevs
750 self._configured_method = method
752 def reconfigure_routes(self, routes, devs, vsys_vnet):
754 Updates the routes in the node's routing table to match
757 method = self._configured_method
759 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
761 current = self._configured_routes
762 current_devs = self._configured_devs
764 new = set(dev_routes)
765 new_devs = set(map(operator.itemgetter(-1), dev_routes))
767 deletions = current - new
768 insertions = new - current
770 dev_deletions = current_devs - new_devs
771 dev_insertions = new_devs - current_devs
776 # Rule deletions first
777 for route in deletions:
778 route, dev = route[:-1], route[-1]
779 rules.append(self.format_route(route, dev, method, 'del'))
781 if method == 'sliceip':
783 rules.extend(map('disable '.__add__, dev_deletions))
786 rules.extend(map('enable '.__add__, dev_insertions))
788 # Rule insertions now
789 for route in insertions:
790 route, dev = route[:-1], dev[-1]
791 rules.append(self.format_route(route, dev, method, 'add'))
794 self.apply_route_rules(rules, method)
796 self._configured_routes = dev_routes
797 self._configured_devs = new_devs
799 def apply_route_rules(self, rules, method):
800 (out,err),proc = server.popen_ssh_command(
801 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
802 home = server.shell_escape(self.home_path),
806 user = self.slicename,
808 ident_key = self.ident_path,
809 server_key = self.server_key,
810 stdin = '\n'.join(rules),
814 if proc.wait() or err:
815 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
817 logger.debug("%s said: %s%s", method, out, err)
819 def check_bad_host(self, out, err):
820 badre = re.compile(r'(?:'
821 #r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
822 r'|Error: disk I/O error'
825 return badre.search(out) or badre.search(err)