2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
17 from nepi.util import server
18 from nepi.util import parallel
22 class UnresponsiveNodeError(RuntimeError):
25 def _castproperty(typ, propattr):
27 return getattr(self, propattr)
28 def _set(self, value):
29 if value is not None or (isinstance(value, basestring) and not value):
31 return setattr(self, propattr, value)
32 def _del(self, value):
33 return delattr(self, propattr)
34 _get.__name__ = propattr + '_get'
35 _set.__name__ = propattr + '_set'
36 _del.__name__ = propattr + '_del'
37 return property(_get, _set, _del)
41 # Map Node attribute to plcapi filter name
42 'hostname' : 'hostname',
46 # Map Node attribute to (<tag name>, <plcapi filter expression>)
47 # There are replacements that are applied with string formatting,
48 # so '%' has to be escaped as '%%'.
49 'architecture' : ('arch','value'),
50 'operatingSystem' : ('fcdistro','value'),
51 'pl_distro' : ('pldistro','value'),
52 'city' : ('city','value'),
53 'country' : ('country','value'),
54 'region' : ('region','value'),
55 'minReliability' : ('reliability%(timeframe)s', ']value'),
56 'maxReliability' : ('reliability%(timeframe)s', '[value'),
57 'minBandwidth' : ('bw%(timeframe)s', ']value'),
58 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
59 'minLoad' : ('load%(timeframe)s', ']value'),
60 'maxLoad' : ('load%(timeframe)s', '[value'),
61 'minCpu' : ('cpu%(timeframe)s', ']value'),
62 'maxCpu' : ('cpu%(timeframe)s', '[value'),
65 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
66 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
67 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
68 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
70 minReliability = _castproperty(float, '_minReliability')
71 maxReliability = _castproperty(float, '_maxReliability')
72 minBandwidth = _castproperty(float, '_minBandwidth')
73 maxBandwidth = _castproperty(float, '_maxBandwidth')
74 minCpu = _castproperty(float, '_minCpu')
75 maxCpu = _castproperty(float, '_maxCpu')
76 minLoad = _castproperty(float, '_minLoad')
77 maxLoad = _castproperty(float, '_maxLoad')
79 def __init__(self, api=None):
86 self.architecture = None
87 self.operatingSystem = None
93 self.minReliability = None
94 self.maxReliability = None
95 self.minBandwidth = None
96 self.maxBandwidth = None
101 self.min_num_external_ifaces = None
102 self.max_num_external_ifaces = None
105 # Applications and routes add requirements to connected nodes
106 self.required_packages = set()
107 self.required_vsys = set()
109 self.rpmFusion = False
110 self.env = collections.defaultdict(list)
112 # Testbed-derived attributes
113 self.slicename = None
114 self.ident_path = None
115 self.server_key = None
116 self.home_path = None
117 self.enable_cleanup = False
119 # Those are filled when an actual node is allocated
121 self._yum_dependencies = None
122 self._installed = False
125 self._logger = logging.getLogger('nepi.testbeds.planetlab')
127 def _nepi_testbed_environment_setup_get(self):
128 command = cStringIO.StringIO()
129 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
130 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
132 command.write(' ; export PATH=$PATH:%s' % (
133 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
136 for envkey, envvals in self.env.iteritems():
137 for envval in envvals:
138 command.write(' ; export %s=%s' % (envkey, envval))
139 return command.getvalue()
140 def _nepi_testbed_environment_setup_set(self, value):
142 _nepi_testbed_environment_setup = property(
143 _nepi_testbed_environment_setup_get,
144 _nepi_testbed_environment_setup_set)
146 def build_filters(self, target_filters, filter_map):
147 for attr, tag in filter_map.iteritems():
148 value = getattr(self, attr, None)
149 if value is not None:
150 target_filters[tag] = value
151 return target_filters
154 def applicable_filters(self):
155 has = lambda att : getattr(self,att,None) is not None
157 filter(has, self.BASEFILTERS.iterkeys())
158 + filter(has, self.TAGFILTERS.iterkeys())
161 def find_candidates(self, filter_slice_id=None):
162 self._logger.info("Finding candidates for %s", self.make_filter_description())
164 fields = ('node_id',)
165 replacements = {'timeframe':self.timeframe}
167 # get initial candidates (no tag filters)
168 basefilters = self.build_filters({}, self.BASEFILTERS)
169 rootfilters = basefilters.copy()
171 basefilters['|slice_ids'] = (filter_slice_id,)
173 # only pick healthy nodes
174 basefilters['run_level'] = 'boot'
175 basefilters['boot_state'] = 'boot'
176 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
177 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
179 # keyword-only "pseudofilters"
182 extra['peer'] = self.site
184 candidates = set(map(operator.itemgetter('node_id'),
185 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
187 # filter by tag, one tag at a time
188 applicable = self.applicable_filters
189 for tagfilter in self.TAGFILTERS.iteritems():
190 attr, (tagname, expr) = tagfilter
192 # don't bother if there's no filter defined
193 if attr in applicable:
194 tagfilter = rootfilters.copy()
195 tagfilter['tagname'] = tagname % replacements
196 tagfilter[expr % replacements] = getattr(self,attr)
197 tagfilter['node_id'] = list(candidates)
199 candidates &= set(map(operator.itemgetter('node_id'),
200 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
202 # filter by vsys tags - special case since it doesn't follow
203 # the usual semantics
204 if self.required_vsys:
205 newcandidates = collections.defaultdict(set)
207 vsys_tags = self._api.GetNodeTags(
209 node_id = list(candidates),
210 fields = ['node_id','value'])
213 operator.itemgetter(['node_id','value']),
216 required_vsys = self.required_vsys
217 for node_id, value in vsys_tags:
218 if value in required_vsys:
219 newcandidates[value].add(node_id)
221 # take only those that have all the required vsys tags
222 newcandidates = reduce(
223 lambda accum, new : accum & new,
224 newcandidates.itervalues(),
227 # filter by iface count
228 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
229 # fetch interfaces for all, in one go
230 filters = basefilters.copy()
231 filters['node_id'] = list(candidates)
232 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
233 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
235 # filter candidates by interface count
236 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
237 predicate = ( lambda node_id :
238 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
239 elif self.min_num_external_ifaces is not None:
240 predicate = ( lambda node_id :
241 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
243 predicate = ( lambda node_id :
244 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
246 candidates = set(filter(predicate, candidates))
248 # make sure hostnames are resolvable
250 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
252 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
253 self._api.GetNodes(list(candidates), ['node_id','hostname'])
255 def resolvable(node_id):
257 addr = socket.gethostbyname(hostnames[node_id])
258 return addr is not None
261 candidates = set(parallel.pfilter(resolvable, candidates,
264 self._logger.info(" Found %s reachable candidates.", len(candidates))
268 def make_filter_description(self):
270 Makes a human-readable description of filtering conditions
274 # get initial candidates (no tag filters)
275 filters = self.build_filters({}, self.BASEFILTERS)
277 # keyword-only "pseudofilters"
279 filters['peer'] = self.site
281 # filter by tag, one tag at a time
282 applicable = self.applicable_filters
283 for tagfilter in self.TAGFILTERS.iteritems():
284 attr, (tagname, expr) = tagfilter
286 # don't bother if there's no filter defined
287 if attr in applicable:
288 filters[attr] = getattr(self,attr)
290 # filter by vsys tags - special case since it doesn't follow
291 # the usual semantics
292 if self.required_vsys:
293 filters['vsys'] = ','.join(list(self.required_vsys))
295 # filter by iface count
296 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
297 filters['num_ifaces'] = '-'.join([
298 str(self.min_num_external_ifaces or '0'),
299 str(self.max_num_external_ifaces or 'inf')
302 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
304 def assign_node_id(self, node_id):
305 self._node_id = node_id
306 self.fetch_node_info()
308 def unassign_node(self):
310 self.__dict__.update(self.__orig_attrs)
312 def fetch_node_info(self):
315 info = self._api.GetNodes(self._node_id)[0]
316 tags = dict( (t['tagname'],t['value'])
317 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
319 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
320 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
321 self.min_num_external_ifaces = None
322 self.max_num_external_ifaces = None
325 replacements = {'timeframe':self.timeframe}
326 for attr, tag in self.BASEFILTERS.iteritems():
329 if hasattr(self, attr):
330 orig_attrs[attr] = getattr(self, attr)
331 setattr(self, attr, value)
332 for attr, (tag,_) in self.TAGFILTERS.iteritems():
333 tag = tag % replacements
336 if hasattr(self, attr):
337 orig_attrs[attr] = getattr(self, attr)
338 setattr(self, attr, value)
340 if 'peer_id' in info:
341 orig_attrs['site'] = self.site
342 self.site = self._api.peer_map[info['peer_id']]
344 if 'interface_ids' in info:
345 self.min_num_external_ifaces = \
346 self.max_num_external_ifaces = len(info['interface_ids'])
348 if 'ssh_rsa_key' in info:
349 orig_attrs['server_key'] = self.server_key
350 self.server_key = info['ssh_rsa_key']
352 self.__orig_attrs = orig_attrs
355 if self.home_path is None:
356 raise AssertionError, "Misconfigured node: missing home path"
357 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
358 raise AssertionError, "Misconfigured node: missing slice SSH key"
359 if self.slicename is None:
360 raise AssertionError, "Misconfigured node: unspecified slice"
363 # Mark dependencies installed
364 self._installed = True
366 # Clear load attributes, they impair re-discovery
367 self.minReliability = \
368 self.maxReliability = \
369 self.minBandwidth = \
370 self.maxBandwidth = \
376 def install_dependencies(self):
377 if self.required_packages and not self._installed:
378 # If we need rpmfusion, we must install the repo definition and the gpg keys
380 if self.operatingSystem == 'f12':
381 # Fedora 12 requires a different rpmfusion package
382 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
384 # This one works for f13+
385 RPM_FUSION_URL = self.RPM_FUSION_URL
388 '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
390 'RPM_FUSION_URL' : RPM_FUSION_URL
396 (out,err),proc = server.popen_ssh_command(
398 host = self.hostname,
400 user = self.slicename,
402 ident_key = self.ident_path,
403 server_key = self.server_key
407 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
409 # Launch p2p yum dependency installer
410 self._yum_dependencies.async_setup()
412 def wait_provisioning(self, timeout = 20*60):
413 # Wait for the p2p installer
416 while not self.is_alive():
417 time.sleep(sleeptime)
418 totaltime += sleeptime
419 sleeptime = min(30.0, sleeptime*1.5)
421 if totaltime > timeout:
422 # PlanetLab has a 15' delay on configuration propagation
423 # If we're above that delay, the unresponsiveness is not due
425 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
427 # Ensure the node is clean (no apps running that could interfere with operations)
428 if self.enable_cleanup:
431 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
432 # Wait for the p2p installer
433 if self._yum_dependencies and not self._installed:
434 self._yum_dependencies.async_setup_wait()
435 self._installed = True
438 # Make sure all the paths are created where
439 # they have to be created for deployment
440 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
442 host = self.hostname,
444 user = self.slicename,
446 ident_key = self.ident_path,
447 server_key = self.server_key
452 elif not err and out.strip() == 'ALIVE':
458 if self.enable_cleanup:
461 def do_cleanup(self):
462 self._logger.info("Cleaning up %s", self.hostname)
464 (out,err),proc = server.popen_ssh_command(
465 # Some apps need two kills
466 "sudo -S killall -u %(slicename)s ; "
467 "sudo -S killall -u %(slicename)s ; "
468 "sudo -S killall -u root ; "
469 "sudo -S killall -u root " % {
470 'slicename' : self.slicename ,
472 host = self.hostname,
474 user = self.slicename,
476 ident_key = self.ident_path,
477 server_key = self.server_key
481 def prepare_dependencies(self):
482 # Configure p2p yum dependency installer
483 if self.required_packages and not self._installed:
484 self._yum_dependencies = application.YumDependency(self._api)
485 self._yum_dependencies.node = self
486 self._yum_dependencies.home_path = "nepi-yumdep"
487 self._yum_dependencies.depends = ' '.join(self.required_packages)
489 def configure_routes(self, routes, devs):
491 Add the specified routes to the node's routing table
497 if dev.routes_here(route):
499 dest, prefix, nexthop, metric = route
501 "add %s%s gw %s %s" % (
503 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
512 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
513 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
515 self._logger.info("Setting up routes for %s", self.hostname)
516 self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules))
518 (out,err),proc = server.popen_ssh_command(
519 "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.5" % dict(
520 home = server.shell_escape(self.home_path)),
521 host = self.hostname,
523 user = self.slicename,
525 ident_key = self.ident_path,
526 server_key = self.server_key,
527 stdin = '\n'.join(rules)
530 if proc.wait() or err:
531 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
533 logger.debug("Routes said: %s%s", out, err)