2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
19 from nepi.util import server
20 from nepi.util import parallel
26 class UnresponsiveNodeError(RuntimeError):
29 def _castproperty(typ, propattr):
31 return getattr(self, propattr)
32 def _set(self, value):
33 if value is not None or (isinstance(value, basestring) and not value):
35 return setattr(self, propattr, value)
36 def _del(self, value):
37 return delattr(self, propattr)
38 _get.__name__ = propattr + '_get'
39 _set.__name__ = propattr + '_set'
40 _del.__name__ = propattr + '_del'
41 return property(_get, _set, _del)
45 # Map Node attribute to plcapi filter name
46 'hostname' : 'hostname',
50 # Map Node attribute to (<tag name>, <plcapi filter expression>)
51 # There are replacements that are applied with string formatting,
52 # so '%' has to be escaped as '%%'.
53 'architecture' : ('arch','value'),
54 'operatingSystem' : ('fcdistro','value'),
55 'pl_distro' : ('pldistro','value'),
56 'city' : ('city','value'),
57 'country' : ('country','value'),
58 'region' : ('region','value'),
59 'minReliability' : ('reliability%(timeframe)s', ']value'),
60 'maxReliability' : ('reliability%(timeframe)s', '[value'),
61 'minBandwidth' : ('bw%(timeframe)s', ']value'),
62 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
63 'minLoad' : ('load%(timeframe)s', ']value'),
64 'maxLoad' : ('load%(timeframe)s', '[value'),
65 'minCpu' : ('cpu%(timeframe)s', ']value'),
66 'maxCpu' : ('cpu%(timeframe)s', '[value'),
69 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
70 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
71 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
72 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
74 minReliability = _castproperty(float, '_minReliability')
75 maxReliability = _castproperty(float, '_maxReliability')
76 minBandwidth = _castproperty(float, '_minBandwidth')
77 maxBandwidth = _castproperty(float, '_maxBandwidth')
78 minCpu = _castproperty(float, '_minCpu')
79 maxCpu = _castproperty(float, '_maxCpu')
80 minLoad = _castproperty(float, '_minLoad')
81 maxLoad = _castproperty(float, '_maxLoad')
83 def __init__(self, api=None):
90 self.architecture = None
91 self.operatingSystem = None
97 self.minReliability = None
98 self.maxReliability = None
99 self.minBandwidth = None
100 self.maxBandwidth = None
105 self.min_num_external_ifaces = None
106 self.max_num_external_ifaces = None
109 # Applications and routes add requirements to connected nodes
110 self.required_packages = set()
111 self.required_vsys = set()
113 self.rpmFusion = False
114 self.env = collections.defaultdict(list)
116 # Testbed-derived attributes
117 self.slicename = None
118 self.ident_path = None
119 self.server_key = None
120 self.home_path = None
121 self.enable_cleanup = False
123 # Those are filled when an actual node is allocated
125 self._yum_dependencies = None
126 self._installed = False
129 self._logger = logging.getLogger('nepi.testbeds.planetlab')
131 def _nepi_testbed_environment_setup_get(self):
132 command = cStringIO.StringIO()
133 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
134 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
136 command.write(' ; export PATH=$PATH:%s' % (
137 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
140 for envkey, envvals in self.env.iteritems():
141 for envval in envvals:
142 command.write(' ; export %s=%s' % (envkey, envval))
143 return command.getvalue()
144 def _nepi_testbed_environment_setup_set(self, value):
146 _nepi_testbed_environment_setup = property(
147 _nepi_testbed_environment_setup_get,
148 _nepi_testbed_environment_setup_set)
150 def build_filters(self, target_filters, filter_map):
151 for attr, tag in filter_map.iteritems():
152 value = getattr(self, attr, None)
153 if value is not None:
154 target_filters[tag] = value
155 return target_filters
158 def applicable_filters(self):
159 has = lambda att : getattr(self,att,None) is not None
161 filter(has, self.BASEFILTERS.iterkeys())
162 + filter(has, self.TAGFILTERS.iterkeys())
165 def find_candidates(self, filter_slice_id=None):
166 self._logger.info("Finding candidates for %s", self.make_filter_description())
168 fields = ('node_id',)
169 replacements = {'timeframe':self.timeframe}
171 # get initial candidates (no tag filters)
172 basefilters = self.build_filters({}, self.BASEFILTERS)
173 rootfilters = basefilters.copy()
175 basefilters['|slice_ids'] = (filter_slice_id,)
177 # only pick healthy nodes
178 basefilters['run_level'] = 'boot'
179 basefilters['boot_state'] = 'boot'
180 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
181 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
183 # keyword-only "pseudofilters"
186 extra['peer'] = self.site
188 candidates = set(map(operator.itemgetter('node_id'),
189 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
191 # filter by tag, one tag at a time
192 applicable = self.applicable_filters
193 for tagfilter in self.TAGFILTERS.iteritems():
194 attr, (tagname, expr) = tagfilter
196 # don't bother if there's no filter defined
197 if attr in applicable:
198 tagfilter = rootfilters.copy()
199 tagfilter['tagname'] = tagname % replacements
200 tagfilter[expr % replacements] = getattr(self,attr)
201 tagfilter['node_id'] = list(candidates)
203 candidates &= set(map(operator.itemgetter('node_id'),
204 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
206 # filter by vsys tags - special case since it doesn't follow
207 # the usual semantics
208 if self.required_vsys:
209 newcandidates = collections.defaultdict(set)
211 vsys_tags = self._api.GetNodeTags(
213 node_id = list(candidates),
214 fields = ['node_id','value'])
217 operator.itemgetter(['node_id','value']),
220 required_vsys = self.required_vsys
221 for node_id, value in vsys_tags:
222 if value in required_vsys:
223 newcandidates[value].add(node_id)
225 # take only those that have all the required vsys tags
226 newcandidates = reduce(
227 lambda accum, new : accum & new,
228 newcandidates.itervalues(),
231 # filter by iface count
232 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
233 # fetch interfaces for all, in one go
234 filters = basefilters.copy()
235 filters['node_id'] = list(candidates)
236 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
237 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
239 # filter candidates by interface count
240 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
241 predicate = ( lambda node_id :
242 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
243 elif self.min_num_external_ifaces is not None:
244 predicate = ( lambda node_id :
245 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
247 predicate = ( lambda node_id :
248 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
250 candidates = set(filter(predicate, candidates))
252 # make sure hostnames are resolvable
254 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
256 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
257 self._api.GetNodes(list(candidates), ['node_id','hostname'])
259 def resolvable(node_id):
261 addr = socket.gethostbyname(hostnames[node_id])
262 return addr is not None
265 candidates = set(parallel.pfilter(resolvable, candidates,
268 self._logger.info(" Found %s reachable candidates.", len(candidates))
272 def make_filter_description(self):
274 Makes a human-readable description of filtering conditions
278 # get initial candidates (no tag filters)
279 filters = self.build_filters({}, self.BASEFILTERS)
281 # keyword-only "pseudofilters"
283 filters['peer'] = self.site
285 # filter by tag, one tag at a time
286 applicable = self.applicable_filters
287 for tagfilter in self.TAGFILTERS.iteritems():
288 attr, (tagname, expr) = tagfilter
290 # don't bother if there's no filter defined
291 if attr in applicable:
292 filters[attr] = getattr(self,attr)
294 # filter by vsys tags - special case since it doesn't follow
295 # the usual semantics
296 if self.required_vsys:
297 filters['vsys'] = ','.join(list(self.required_vsys))
299 # filter by iface count
300 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
301 filters['num_ifaces'] = '-'.join([
302 str(self.min_num_external_ifaces or '0'),
303 str(self.max_num_external_ifaces or 'inf')
306 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
308 def assign_node_id(self, node_id):
309 self._node_id = node_id
310 self.fetch_node_info()
312 def unassign_node(self):
314 self.__dict__.update(self.__orig_attrs)
316 def fetch_node_info(self):
319 info = self._api.GetNodes(self._node_id)[0]
320 tags = dict( (t['tagname'],t['value'])
321 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
323 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
324 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
325 self.min_num_external_ifaces = None
326 self.max_num_external_ifaces = None
329 replacements = {'timeframe':self.timeframe}
330 for attr, tag in self.BASEFILTERS.iteritems():
333 if hasattr(self, attr):
334 orig_attrs[attr] = getattr(self, attr)
335 setattr(self, attr, value)
336 for attr, (tag,_) in self.TAGFILTERS.iteritems():
337 tag = tag % replacements
340 if hasattr(self, attr):
341 orig_attrs[attr] = getattr(self, attr)
342 setattr(self, attr, value)
344 if 'peer_id' in info:
345 orig_attrs['site'] = self.site
346 self.site = self._api.peer_map[info['peer_id']]
348 if 'interface_ids' in info:
349 self.min_num_external_ifaces = \
350 self.max_num_external_ifaces = len(info['interface_ids'])
352 if 'ssh_rsa_key' in info:
353 orig_attrs['server_key'] = self.server_key
354 self.server_key = info['ssh_rsa_key']
356 self.__orig_attrs = orig_attrs
359 if self.home_path is None:
360 raise AssertionError, "Misconfigured node: missing home path"
361 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
362 raise AssertionError, "Misconfigured node: missing slice SSH key"
363 if self.slicename is None:
364 raise AssertionError, "Misconfigured node: unspecified slice"
367 # Mark dependencies installed
368 self._installed = True
370 # Clear load attributes, they impair re-discovery
371 self.minReliability = \
372 self.maxReliability = \
373 self.minBandwidth = \
374 self.maxBandwidth = \
380 def install_dependencies(self):
381 if self.required_packages and not self._installed:
382 # If we need rpmfusion, we must install the repo definition and the gpg keys
384 if self.operatingSystem == 'f12':
385 # Fedora 12 requires a different rpmfusion package
386 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
388 # This one works for f13+
389 RPM_FUSION_URL = self.RPM_FUSION_URL
392 '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
394 'RPM_FUSION_URL' : RPM_FUSION_URL
400 (out,err),proc = server.popen_ssh_command(
402 host = self.hostname,
404 user = self.slicename,
406 ident_key = self.ident_path,
407 server_key = self.server_key
411 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
413 # Launch p2p yum dependency installer
414 self._yum_dependencies.async_setup()
416 def wait_provisioning(self, timeout = 20*60):
417 # Wait for the p2p installer
420 while not self.is_alive():
421 time.sleep(sleeptime)
422 totaltime += sleeptime
423 sleeptime = min(30.0, sleeptime*1.5)
425 if totaltime > timeout:
426 # PlanetLab has a 15' delay on configuration propagation
427 # If we're above that delay, the unresponsiveness is not due
429 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
431 # Ensure the node is clean (no apps running that could interfere with operations)
432 if self.enable_cleanup:
435 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
436 # Wait for the p2p installer
437 if self._yum_dependencies and not self._installed:
438 self._yum_dependencies.async_setup_wait()
439 self._installed = True
442 # Make sure all the paths are created where
443 # they have to be created for deployment
444 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
446 host = self.hostname,
448 user = self.slicename,
450 ident_key = self.ident_path,
451 server_key = self.server_key
456 elif not err and out.strip() == 'ALIVE':
462 if self.enable_cleanup:
465 def do_cleanup(self):
466 if self.testbed().recovering:
470 self._logger.info("Cleaning up %s", self.hostname)
472 (out,err),proc = server.popen_ssh_command(
473 # Some apps need two kills
474 "sudo -S killall python tcpdump || /bin/true ; "
475 "sudo -S killall python tcpdump || /bin/true ; "
476 "sudo -S kill $(ps -N T -o pid --no-heading | sort) || /bin/true ; "
477 "sudo -S killall -u %(slicename)s || /bin/true ; "
478 "sudo -S killall -u %(slicename)s || /bin/true ; "
479 "sudo -S killall -u root || /bin/true ; "
480 "sudo -S killall -u root || /bin/true " % {
481 'slicename' : self.slicename ,
483 host = self.hostname,
485 user = self.slicename,
487 ident_key = self.ident_path,
488 server_key = self.server_key,
489 tty = True, # so that ps -N -T works as advertised...
493 def prepare_dependencies(self):
494 # Configure p2p yum dependency installer
495 if self.required_packages and not self._installed:
496 self._yum_dependencies = application.YumDependency(self._api)
497 self._yum_dependencies.node = self
498 self._yum_dependencies.home_path = "nepi-yumdep"
499 self._yum_dependencies.depends = ' '.join(self.required_packages)
501 def routing_method(self, routes, vsys_vnet):
503 There are two methods, vroute and sliceip.
506 Modifies the node's routing table directly, validating that the IP
507 range lies within the network given by the slice's vsys_vnet tag.
508 This method is the most scalable for very small routing tables
509 that need not modify other routes (including the default)
512 Uses policy routing and iptables filters to create per-sliver
513 routing tables. It's the most flexible way, but it doesn't scale
514 as well since only 155 routing tables can be created this way.
516 This method will return the most appropriate routing method, which will
517 prefer vroute for small routing tables.
520 # For now, sliceip results in kernel panics
521 # so we HAVE to use vroute
524 # We should not make the routing table grow too big
525 if len(routes) > MAX_VROUTE_ROUTES:
528 vsys_vnet = ipaddr.IPNetwork(vsys_vnet)
530 dest, prefix, nexthop, metric = route
531 dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix))
532 nexthop = ipaddr.IPAddress(nexthop)
533 if dest not in vsys_vnet or nexthop not in vsys_vnet:
538 def format_route(self, route, dev, method, action):
539 dest, prefix, nexthop, metric = route
540 if method == 'vroute':
542 "%s %s%s gw %s %s" % (
545 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
550 elif method == 'sliceip':
552 "route %s to %s%s via %s metric %s dev %s" % (
555 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
562 raise AssertionError, "Unknown method"
564 def _annotate_routes_with_devs(self, routes, devs, method):
568 if dev.routes_here(route):
569 dev_routes.append(tuple(route) + (dev.if_name,))
574 if method == 'sliceip':
575 dev_routes.append(tuple(route) + ('eth0',))
577 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
578 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
581 def configure_routes(self, routes, devs, vsys_vnet):
583 Add the specified routes to the node's routing table
586 method = self.routing_method(routes, vsys_vnet)
589 # annotate routes with devices
590 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
591 for route in dev_routes:
592 route, dev = route[:-1], route[-1]
596 rules.append(self.format_route(route, dev, method, 'add'))
598 if method == 'sliceip':
599 rules = map('enable '.__add__, tdevs) + rules
601 self._logger.info("Setting up routes for %s using %s", self.hostname, method)
602 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
604 self.apply_route_rules(rules, method)
606 self._configured_routes = set(routes)
607 self._configured_devs = tdevs
608 self._configured_method = method
610 def reconfigure_routes(self, routes, devs, vsys_vnet):
612 Updates the routes in the node's routing table to match
615 method = self._configured_method
617 dev_routes = self._annotate_routes_with_devs(routes, devs, method)
619 current = self._configured_routes
620 current_devs = self._configured_devs
622 new = set(dev_routes)
623 new_devs = set(map(operator.itemgetter(-1), dev_routes))
625 deletions = current - new
626 insertions = new - current
628 dev_deletions = current_devs - new_devs
629 dev_insertions = new_devs - current_devs
634 # Rule deletions first
635 for route in deletions:
636 route, dev = route[:-1], route[-1]
637 rules.append(self.format_route(route, dev, method, 'del'))
639 if method == 'sliceip':
641 rules.extend(map('disable '.__add__, dev_deletions))
644 rules.extend(map('enable '.__add__, dev_insertions))
646 # Rule insertions now
647 for route in insertions:
648 route, dev = route[:-1], dev[-1]
649 rules.append(self.format_route(route, dev, method, 'add'))
652 self.apply_route_rules(rules, method)
654 self._configured_routes = dev_routes
655 self._configured_devs = new_devs
657 def apply_route_rules(self, rules, method):
658 (out,err),proc = server.popen_ssh_command(
659 "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict(
660 home = server.shell_escape(self.home_path),
662 host = self.hostname,
664 user = self.slicename,
666 ident_key = self.ident_path,
667 server_key = self.server_key,
668 stdin = '\n'.join(rules)
671 if proc.wait() or err:
672 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
674 logger.debug("%s said: %s%s", method, out, err)