1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID
19 from nepi.util import server
20 from nepi.util import parallel
26 class UnresponsiveNodeError(RuntimeError):
29 def _castproperty(typ, propattr):
31 return getattr(self, propattr)
32 def _set(self, value):
33 if value is not None or (isinstance(value, basestring) and not value):
35 return setattr(self, propattr, value)
36 def _del(self, value):
37 return delattr(self, propattr)
38 _get.__name__ = propattr + '_get'
39 _set.__name__ = propattr + '_set'
40 _del.__name__ = propattr + '_del'
41 return property(_get, _set, _del)
45 # Map Node attribute to plcapi filter name
46 'hostname' : 'hostname',
50 # Map Node attribute to (<tag name>, <plcapi filter expression>)
51 # There are replacements that are applied with string formatting,
52 # so '%' has to be escaped as '%%'.
53 'architecture' : ('arch','value'),
54 'operatingSystem' : ('fcdistro','value'),
55 'pl_distro' : ('pldistro','value'),
56 'city' : ('city','value'),
57 'country' : ('country','value'),
58 'region' : ('region','value'),
59 'minReliability' : ('reliability%(timeframe)s', ']value'),
60 'maxReliability' : ('reliability%(timeframe)s', '[value'),
61 'minBandwidth' : ('bw%(timeframe)s', ']value'),
62 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63 'minLoad' : ('load%(timeframe)s', ']value'),
64 'maxLoad' : ('load%(timeframe)s', '[value'),
65 'minCpu' : ('cpu%(timeframe)s', ']value'),
66 'maxCpu' : ('cpu%(timeframe)s', '[value'),
70 # (<tag name>, <weight>, <default>)
71 ('bw%(timeframe)s', -0.001, 1024.0),
72 ('cpu%(timeframe)s', 0.1, 40.0),
73 ('load%(timeframe)s', -0.2, 3.0),
74 ('reliability%(timeframe)s', 1, 100.0),
77 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
78 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
79 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
80 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
82 minReliability = _castproperty(float, '_minReliability')
83 maxReliability = _castproperty(float, '_maxReliability')
84 minBandwidth = _castproperty(float, '_minBandwidth')
85 maxBandwidth = _castproperty(float, '_maxBandwidth')
86 minCpu = _castproperty(float, '_minCpu')
87 maxCpu = _castproperty(float, '_maxCpu')
88 minLoad = _castproperty(float, '_minLoad')
89 maxLoad = _castproperty(float, '_maxLoad')
91 def __init__(self, api=None, sliceapi=None):
95 self._sliceapi = sliceapi or api
99 self.architecture = None
100 self.operatingSystem = None
101 self.pl_distro = None
106 self.minReliability = None
107 self.maxReliability = None
108 self.minBandwidth = None
109 self.maxBandwidth = None
114 self.min_num_external_ifaces = None
115 self.max_num_external_ifaces = None
116 self._timeframe = 'w'
118 # Applications and routes add requirements to connected nodes
119 self.required_packages = set()
120 self.required_vsys = set()
122 self.rpmFusion = False
123 self.env = collections.defaultdict(list)
125 # Some special applications - initialized when connected
126 self.multicast_forwarder = None
128 # Testbed-derived attributes
129 self.slicename = None
130 self.ident_path = None
131 self.server_key = None
132 self.home_path = None
133 self.enable_proc_cleanup = False
134 self.enable_home_cleanup = False
136 # Those are filled when an actual node is allocated
138 self._yum_dependencies = None
139 self._installed = False
142 self._logger = logging.getLogger('nepi.testbeds.planetlab')
144 def set_timeframe(self, timeframe):
145 if timeframe == "latest":
147 elif timeframe == "month":
148 self._timeframe = "m"
149 elif timeframe == "year":
150 self._timeframe = "y"
152 self._timeframe = "w"
154 def get_timeframe(self):
155 if self._timeframe == "":
157 if self._timeframe == "m":
159 if self._timeframe == "y":
163 timeframe = property(get_timeframe, set_timeframe)
165 def _nepi_testbed_environment_setup_get(self):
166 command = cStringIO.StringIO()
167 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
168 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
170 command.write(' ; export PATH=$PATH:%s' % (
171 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
174 for envkey, envvals in self.env.iteritems():
175 for envval in envvals:
176 command.write(' ; export %s=%s' % (envkey, envval))
177 return command.getvalue()
179 def _nepi_testbed_environment_setup_set(self, value):
182 _nepi_testbed_environment_setup = property(
183 _nepi_testbed_environment_setup_get,
184 _nepi_testbed_environment_setup_set)
186 def build_filters(self, target_filters, filter_map):
187 for attr, tag in filter_map.iteritems():
188 value = getattr(self, attr, None)
189 if value is not None:
190 target_filters[tag] = value
191 return target_filters
194 def applicable_filters(self):
195 has = lambda att : getattr(self,att,None) is not None
197 filter(has, self.BASEFILTERS.iterkeys())
198 + filter(has, self.TAGFILTERS.iterkeys())
201 def find_candidates(self, filter_slice_id=None):
202 self._logger.info("Finding candidates for %s", self.make_filter_description())
204 fields = ('node_id',)
205 replacements = {'timeframe':self._timeframe}
207 # get initial candidates (no tag filters)
208 basefilters = self.build_filters({}, self.BASEFILTERS)
209 rootfilters = basefilters.copy()
211 basefilters['|slice_ids'] = (filter_slice_id,)
213 # only pick healthy nodes
214 basefilters['run_level'] = 'boot'
215 basefilters['boot_state'] = 'boot'
216 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
217 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
219 # keyword-only "pseudofilters"
222 extra['peer'] = self.site
224 candidates = set(map(operator.itemgetter('node_id'),
225 self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra)))
227 # filter by tag, one tag at a time
228 applicable = self.applicable_filters
229 for tagfilter in self.TAGFILTERS.iteritems():
230 attr, (tagname, expr) = tagfilter
232 # don't bother if there's no filter defined
233 if attr in applicable:
234 tagfilter = rootfilters.copy()
235 tagfilter['tagname'] = tagname % replacements
236 tagfilter[expr % replacements] = str(getattr(self,attr))
237 tagfilter['node_id'] = list(candidates)
239 candidates &= set(map(operator.itemgetter('node_id'),
240 self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields)))
242 # filter by vsys tags - special case since it doesn't follow
243 # the usual semantics
244 if self.required_vsys:
245 newcandidates = collections.defaultdict(set)
247 vsys_tags = self._sliceapi.GetNodeTags(
249 node_id = list(candidates),
250 fields = ['node_id','value'])
253 operator.itemgetter(['node_id','value']),
256 required_vsys = self.required_vsys
257 for node_id, value in vsys_tags:
258 if value in required_vsys:
259 newcandidates[value].add(node_id)
261 # take only those that have all the required vsys tags
262 newcandidates = reduce(
263 lambda accum, new : accum & new,
264 newcandidates.itervalues(),
267 # filter by iface count
268 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
269 # fetch interfaces for all, in one go
270 filters = basefilters.copy()
271 filters['node_id'] = list(candidates)
272 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
273 self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
275 # filter candidates by interface count
276 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
277 predicate = ( lambda node_id :
278 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
279 elif self.min_num_external_ifaces is not None:
280 predicate = ( lambda node_id :
281 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
283 predicate = ( lambda node_id :
284 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
286 candidates = set(filter(predicate, candidates))
288 # make sure hostnames are resolvable
291 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
293 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
294 self._sliceapi.GetNodes(list(candidates), ['node_id','hostname'])
297 def resolvable(node_id):
299 addr = socket.gethostbyname(hostnames[node_id])
300 return addr is not None
303 candidates = set(parallel.pfilter(resolvable, candidates,
306 self._logger.info(" Found %s reachable candidates.", len(candidates))
308 for h in hostnames.keys():
309 if h not in candidates:
312 hostnames = dict((v,k) for k, v in hostnames.iteritems())
316 def make_filter_description(self):
318 Makes a human-readable description of filtering conditions
322 # get initial candidates (no tag filters)
323 filters = self.build_filters({}, self.BASEFILTERS)
325 # keyword-only "pseudofilters"
327 filters['peer'] = self.site
329 # filter by tag, one tag at a time
330 applicable = self.applicable_filters
331 for tagfilter in self.TAGFILTERS.iteritems():
332 attr, (tagname, expr) = tagfilter
334 # don't bother if there's no filter defined
335 if attr in applicable:
336 filters[attr] = getattr(self,attr)
338 # filter by vsys tags - special case since it doesn't follow
339 # the usual semantics
340 if self.required_vsys:
341 filters['vsys'] = ','.join(list(self.required_vsys))
343 # filter by iface count
344 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
345 filters['num_ifaces'] = '-'.join([
346 str(self.min_num_external_ifaces or '0'),
347 str(self.max_num_external_ifaces or 'inf')
350 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
352 def assign_node_id(self, node_id):
353 self._node_id = node_id
354 self.fetch_node_info()
356 def unassign_node(self):
361 orig_attrs = self.__orig_attrs
362 except AttributeError:
365 for key, value in orig_attrs.iteritems():
366 setattr(self, key, value)
367 del self.__orig_attrs
369 def rate_nodes(self, nodes):
370 rates = collections.defaultdict(int)
371 tags = collections.defaultdict(dict)
372 replacements = {'timeframe':self._timeframe}
373 tagnames = [ tagname % replacements
374 for tagname, weight, default in self.RATE_FACTORS ]
376 taginfo = self._sliceapi.GetNodeTags(
379 fields=('node_id','tagname','value'))
381 unpack = operator.itemgetter('node_id','tagname','value')
382 for value in taginfo:
383 node, tagname, value = unpack(value)
384 if value and value.lower() != 'n/a':
385 tags[tagname][node] = float(value)
387 for tagname, weight, default in self.RATE_FACTORS:
388 taginfo = tags[tagname % replacements].get
390 rates[node] += weight * taginfo(node,default)
392 return map(rates.__getitem__, nodes)
394 def fetch_node_info(self):
397 info, tags = self._sliceapi.GetNodeInfo(self._node_id)
400 tags = dict( (t['tagname'],t['value'])
403 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
404 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
405 self.min_num_external_ifaces = None
406 self.max_num_external_ifaces = None
407 if not self._timeframe: self._timeframe = 'w'
409 replacements = {'timeframe':self._timeframe}
411 for attr, tag in self.BASEFILTERS.iteritems():
414 if hasattr(self, attr):
415 orig_attrs[attr] = getattr(self, attr)
416 setattr(self, attr, value)
417 for attr, (tag,_) in self.TAGFILTERS.iteritems():
418 tag = tag % replacements
421 if hasattr(self, attr):
422 orig_attrs[attr] = getattr(self, attr)
423 if not value or value.lower() == 'n/a':
425 setattr(self, attr, value)
427 if 'peer_id' in info:
428 orig_attrs['site'] = self.site
429 self.site = self._sliceapi.peer_map[info['peer_id']]
431 if 'interface_ids' in info:
432 self.min_num_external_ifaces = \
433 self.max_num_external_ifaces = len(info['interface_ids'])
435 if 'ssh_rsa_key' in info:
436 orig_attrs['server_key'] = self.server_key
437 self.server_key = info['ssh_rsa_key']
439 self.hostip = socket.gethostbyname(self.hostname)
443 except AttributeError:
444 self.__orig_attrs = orig_attrs
447 if self.home_path is None:
448 raise AssertionError, "Misconfigured node: missing home path"
449 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
450 raise AssertionError, "Misconfigured node: missing slice SSH key"
451 if self.slicename is None:
452 raise AssertionError, "Misconfigured node: unspecified slice"
455 # Mark dependencies installed
456 self._installed = True
458 # Clear load attributes, they impair re-discovery
459 self.minReliability = \
460 self.maxReliability = \
461 self.minBandwidth = \
462 self.maxBandwidth = \
468 def install_dependencies(self):
469 if self.required_packages and not self._installed:
470 # If we need rpmfusion, we must install the repo definition and the gpg keys
472 if self.operatingSystem == 'f12':
473 # Fedora 12 requires a different rpmfusion package
474 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
476 # This one works for f13+
477 RPM_FUSION_URL = self.RPM_FUSION_URL
480 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s'
482 'RPM_FUSION_URL' : RPM_FUSION_URL
488 (out,err),proc = server.popen_ssh_command(
490 host = self.hostname,
492 user = self.slicename,
494 ident_key = self.ident_path,
495 server_key = self.server_key,
500 if self.check_bad_host(out,err):
502 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
504 # Launch p2p yum dependency installer
505 self._yum_dependencies.async_setup()
507 def wait_provisioning(self, timeout = 20*60):
508 # Wait for the p2p installer
511 while not self.is_alive():
512 time.sleep(sleeptime)
513 totaltime += sleeptime
514 sleeptime = min(30.0, sleeptime*1.5)
516 if totaltime > timeout:
517 # PlanetLab has a 15' delay on configuration propagation
518 # If we're above that delay, the unresponsiveness is not due
520 if not self.is_alive(verbose=True):
521 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
523 # Ensure the node is clean (no apps running that could interfere with operations)
524 if self.enable_proc_cleanup:
525 self.do_proc_cleanup()
526 if self.enable_home_cleanup:
527 self.do_home_cleanup()
529 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
530 # Wait for the p2p installer
531 if self._yum_dependencies and not self._installed:
532 self._yum_dependencies.async_setup_wait()
533 self._installed = True
535 def is_alive(self, verbose = False):
536 # Make sure all the paths are created where
537 # they have to be created for deployment
538 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
540 host = self.hostname,
542 user = self.slicename,
544 ident_key = self.ident_path,
545 server_key = self.server_key,
547 err_on_timeout = False,
553 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
555 elif not err and out.strip() == 'ALIVE':
559 self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
563 if self.enable_proc_cleanup:
564 self.do_proc_cleanup()
568 self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
570 util.appendBlacklist(self.hostname)
572 def do_proc_cleanup(self):
573 if self.testbed().recovering:
577 self._logger.info("Cleaning up processes on %s", self.hostname)
580 "sudo -S killall python tcpdump || /bin/true ; "
581 "sudo -S killall python tcpdump || /bin/true ; "
582 "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ",
583 "sudo -S killall -u %(slicename)s || /bin/true ",
584 "sudo -S killall -u root || /bin/true ",
585 "sudo -S killall -u %(slicename)s || /bin/true ",
586 "sudo -S killall -u root || /bin/true ",
590 (out,err),proc = server.popen_ssh_command(
591 # Some apps need two kills
593 'slicename' : self.slicename ,
595 host = self.hostname,
597 user = self.slicename,
599 ident_key = self.ident_path,
600 server_key = self.server_key,
601 tty = True, # so that ps -N -T works as advertised...
607 def do_home_cleanup(self):
608 if self.testbed().recovering:
612 self._logger.info("Cleaning up home on %s", self.hostname)
615 "find . -maxdepth 1 -name 'nepi-*' -execdir rm -rf {} + "
619 (out,err),proc = server.popen_ssh_command(
620 # Some apps need two kills
622 'slicename' : self.slicename ,
624 host = self.hostname,
626 user = self.slicename,
628 ident_key = self.ident_path,
629 server_key = self.server_key,
630 tty = True, # so that ps -N -T works as advertised...
636 def prepare_dependencies(self):
637 # Configure p2p yum dependency installer
638 if self.required_packages and not self._installed:
639 self._yum_dependencies = application.YumDependency(self._api)
640 self._yum_dependencies.node = self
641 self._yum_dependencies.home_path = "nepi-yumdep"
642 self._yum_dependencies.depends = ' '.join(self.required_packages)
644 def routing_method(self, routes, vsys_vnet):
646 There are two methods, vroute and sliceip.
649 Modifies the node's routing table directly, validating that the IP
650 range lies within the network given by the slice's vsys_vnet tag.
651 This method is the most scalable for very small routing tables
652 that need not modify other routes (including the default)
655 Uses policy routing and iptables filters to create per-sliver
656 routing tables. It's the most flexible way, but it doesn't scale
657 as well since only 155 routing tables can be created this way.
659 This method will return the most appropriate routing method, which will
660 prefer vroute for small routing tables.
663 # For now, sliceip results in kernel panics
664 # so we HAVE to use vroute
667 # We should not make the routing table grow too big
668 if len(routes) > MAX_VROUTE_ROUTES:
671 vsys_vnet = ipaddr.IPv4Network(vsys_vnet)
673 dest, prefix, nexthop, metric, device = route
674 dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix))
675 nexthop = ipaddr.IPAddress(nexthop)
676 if dest not in vsys_vnet or nexthop not in vsys_vnet:
681 def format_route(self, route, dev, method, action):
682 dest, prefix, nexthop, metric, device = route
683 if method == 'vroute':
685 "%s %s%s gw %s %s" % (
688 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
693 elif method == 'sliceip':
695 "route %s to %s%s via %s metric %s dev %s" % (
698 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
705 raise AssertionError, "Unknown method"
707 def _annotate_routes_with_devs(self, routes, devs, method):
711 if dev.routes_here(route):
712 dev_routes.append(tuple(route) + (dev.if_name,))
717 if method == 'sliceip':
718 dev_routes.append(tuple(route) + ('eth0',))
720 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
721 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
724 def configure_routes(self, routes, devs, vsys_vnet):
726 Add the specified routes to the node's routing table
729 method = self.routing_method(routes, vsys_vnet)
732 # annotate routes with devices
733 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
734 for route in dev_routes:
735 route, dev = route[:-1], route[-1]
739 rules.append(self.format_route(route, dev, method, 'add'))
741 if method == 'sliceip':
742 rules = map('enable '.__add__, tdevs) + rules
744 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
745 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
747 self.apply_route_rules(rules, method)
749 self._configured_routes = set(routes)
750 self._configured_devs = tdevs
751 self._configured_method = method
753 def reconfigure_routes(self, routes, devs, vsys_vnet):
755 Updates the routes in the node's routing table to match
758 method = self._configured_method
760 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
762 current = self._configured_routes
763 current_devs = self._configured_devs
765 new = set(dev_routes)
766 new_devs = set(map(operator.itemgetter(-1), dev_routes))
768 deletions = current - new
769 insertions = new - current
771 dev_deletions = current_devs - new_devs
772 dev_insertions = new_devs - current_devs
777 # Rule deletions first
778 for route in deletions:
779 route, dev = route[:-1], route[-1]
780 rules.append(self.format_route(route, dev, method, 'del'))
782 if method == 'sliceip':
784 rules.extend(map('disable '.__add__, dev_deletions))
787 rules.extend(map('enable '.__add__, dev_insertions))
789 # Rule insertions now
790 for route in insertions:
791 route, dev = route[:-1], dev[-1]
792 rules.append(self.format_route(route, dev, method, 'add'))
795 self.apply_route_rules(rules, method)
797 self._configured_routes = dev_routes
798 self._configured_devs = new_devs
800 def apply_route_rules(self, rules, method):
801 (out,err),proc = server.popen_ssh_command(
802 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
803 home = server.shell_escape(self.home_path),
805 host = self.hostname,
807 user = self.slicename,
809 ident_key = self.ident_path,
810 server_key = self.server_key,
811 stdin = '\n'.join(rules),
815 if proc.wait() or err:
816 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
818 logger.debug("%s said: %s%s", method, out, err)
820 def check_bad_host(self, out, err):
821 badre = re.compile(r'(?:'
822 r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'"
823 r'|Error: disk I/O error'
826 return badre.search(out) or badre.search(err)