2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
17 from nepi.util import server
18 from nepi.util import parallel
20 class UnresponsiveNodeError(RuntimeError):
25 # Map Node attribute to plcapi filter name
26 'hostname' : 'hostname',
30 # Map Node attribute to (<tag name>, <plcapi filter expression>)
31 # There are replacements that are applied with string formatting,
32 # so '%' has to be escaped as '%%'.
33 'architecture' : ('arch','value'),
34 'operatingSystem' : ('fcdistro','value'),
35 'pl_distro' : ('pldistro','value'),
36 'minReliability' : ('reliability%(timeframe)s', ']value'),
37 'maxReliability' : ('reliability%(timeframe)s', '[value'),
38 'minBandwidth' : ('bw%(timeframe)s', ']value'),
39 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
42 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
43 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
44 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
45 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
47 def __init__(self, api=None):
54 self.architecture = None
55 self.operatingSystem = None
58 self.minReliability = None
59 self.maxReliability = None
60 self.minBandwidth = None
61 self.maxBandwidth = None
62 self.min_num_external_ifaces = None
63 self.max_num_external_ifaces = None
66 # Applications and routes add requirements to connected nodes
67 self.required_packages = set()
68 self.required_vsys = set()
70 self.rpmFusion = False
71 self.env = collections.defaultdict(list)
73 # Testbed-derived attributes
75 self.ident_path = None
76 self.server_key = None
79 # Those are filled when an actual node is allocated
83 self._logger = logging.getLogger('nepi.testbeds.planetlab')
86 def _nepi_testbed_environment_setup(self):
87 command = cStringIO.StringIO()
88 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
89 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
91 command.write(' ; export PATH=$PATH:%s' % (
92 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
95 for envkey, envvals in self.env.iteritems():
96 for envval in envvals:
97 command.write(' ; export %s=%s' % (envkey, envval))
98 return command.getvalue()
100 def build_filters(self, target_filters, filter_map):
101 for attr, tag in filter_map.iteritems():
102 value = getattr(self, attr, None)
103 if value is not None:
104 target_filters[tag] = value
105 return target_filters
108 def applicable_filters(self):
109 has = lambda att : getattr(self,att,None) is not None
111 filter(has, self.BASEFILTERS.iterkeys())
112 + filter(has, self.TAGFILTERS.iterkeys())
115 def find_candidates(self, filter_slice_id=None):
116 self._logger.info("Finding candidates for %s", self.make_filter_description())
118 fields = ('node_id',)
119 replacements = {'timeframe':self.timeframe}
121 # get initial candidates (no tag filters)
122 basefilters = self.build_filters({}, self.BASEFILTERS)
123 rootfilters = basefilters.copy()
125 basefilters['|slice_ids'] = (filter_slice_id,)
127 # only pick healthy nodes
128 basefilters['run_level'] = 'boot'
129 basefilters['boot_state'] = 'boot'
130 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
131 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
133 # keyword-only "pseudofilters"
136 extra['peer'] = self.site
138 candidates = set(map(operator.itemgetter('node_id'),
139 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
141 # filter by tag, one tag at a time
142 applicable = self.applicable_filters
143 for tagfilter in self.TAGFILTERS.iteritems():
144 attr, (tagname, expr) = tagfilter
146 # don't bother if there's no filter defined
147 if attr in applicable:
148 tagfilter = rootfilters.copy()
149 tagfilter['tagname'] = tagname % replacements
150 tagfilter[expr % replacements] = getattr(self,attr)
151 tagfilter['node_id'] = list(candidates)
153 candidates &= set(map(operator.itemgetter('node_id'),
154 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
156 # filter by vsys tags - special case since it doesn't follow
157 # the usual semantics
158 if self.required_vsys:
159 newcandidates = collections.defaultdict(set)
161 vsys_tags = self._api.GetNodeTags(
163 node_id = list(candidates),
164 fields = ['node_id','value'])
167 operator.itemgetter(['node_id','value']),
170 required_vsys = self.required_vsys
171 for node_id, value in vsys_tags:
172 if value in required_vsys:
173 newcandidates[value].add(node_id)
175 # take only those that have all the required vsys tags
176 newcandidates = reduce(
177 lambda accum, new : accum & new,
178 newcandidates.itervalues(),
181 # filter by iface count
182 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
183 # fetch interfaces for all, in one go
184 filters = basefilters.copy()
185 filters['node_id'] = list(candidates)
186 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
187 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
189 # filter candidates by interface count
190 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
191 predicate = ( lambda node_id :
192 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
193 elif self.min_num_external_ifaces is not None:
194 predicate = ( lambda node_id :
195 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
197 predicate = ( lambda node_id :
198 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
200 candidates = set(filter(predicate, candidates))
202 # make sure hostnames are resolvable
204 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
206 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
207 self._api.GetNodes(list(candidates), ['node_id','hostname'])
209 def resolvable(node_id):
211 addr = socket.gethostbyname(hostnames[node_id])
212 return addr is not None
215 candidates = set(parallel.pfilter(resolvable, candidates,
218 self._logger.info(" Found %s reachable candidates.", len(candidates))
222 def make_filter_description(self):
224 Makes a human-readable description of filtering conditions
228 # get initial candidates (no tag filters)
229 filters = self.build_filters({}, self.BASEFILTERS)
231 # keyword-only "pseudofilters"
233 filters['peer'] = self.site
235 # filter by tag, one tag at a time
236 applicable = self.applicable_filters
237 for tagfilter in self.TAGFILTERS.iteritems():
238 attr, (tagname, expr) = tagfilter
240 # don't bother if there's no filter defined
241 if attr in applicable:
242 filters[attr] = getattr(self,attr)
244 # filter by vsys tags - special case since it doesn't follow
245 # the usual semantics
246 if self.required_vsys:
247 filters['vsys'] = ','.join(list(self.required_vsys))
249 # filter by iface count
250 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
251 filters['num_ifaces'] = '-'.join([
252 str(self.min_num_external_ifaces or '0'),
253 str(self.max_num_external_ifaces or 'inf')
256 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
258 def assign_node_id(self, node_id):
259 self._node_id = node_id
260 self.fetch_node_info()
262 def unassign_node(self):
264 self.__dict__.update(self.__orig_attrs)
266 def fetch_node_info(self):
269 info = self._api.GetNodes(self._node_id)[0]
270 tags = dict( (t['tagname'],t['value'])
271 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
273 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
274 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
275 self.min_num_external_ifaces = None
276 self.max_num_external_ifaces = None
279 replacements = {'timeframe':self.timeframe}
280 for attr, tag in self.BASEFILTERS.iteritems():
283 if hasattr(self, attr):
284 orig_attrs[attr] = getattr(self, attr)
285 setattr(self, attr, value)
286 for attr, (tag,_) in self.TAGFILTERS.iteritems():
287 tag = tag % replacements
290 if hasattr(self, attr):
291 orig_attrs[attr] = getattr(self, attr)
292 setattr(self, attr, value)
294 if 'peer_id' in info:
295 orig_attrs['site'] = self.site
296 self.site = self._api.peer_map[info['peer_id']]
298 if 'interface_ids' in info:
299 self.min_num_external_ifaces = \
300 self.max_num_external_ifaces = len(info['interface_ids'])
302 if 'ssh_rsa_key' in info:
303 orig_attrs['server_key'] = self.server_key
304 self.server_key = info['ssh_rsa_key']
306 self.__orig_attrs = orig_attrs
309 if self.home_path is None:
310 raise AssertionError, "Misconfigured node: missing home path"
311 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
312 raise AssertionError, "Misconfigured node: missing slice SSH key"
313 if self.slicename is None:
314 raise AssertionError, "Misconfigured node: unspecified slice"
316 def install_dependencies(self):
317 if self.required_packages:
318 # TODO: make dependant on the experiment somehow...
319 pidfile = self.DEPENDS_PIDFILE
320 logfile = self.DEPENDS_LOGFILE
322 # If we need rpmfusion, we must install the repo definition and the gpg keys
324 if self.operatingSystem == 'f12':
325 # Fedora 12 requires a different rpmfusion package
326 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
328 # This one works for f13+
329 RPM_FUSION_URL = self.RPM_FUSION_URL
332 '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
334 'RPM_FUSION_URL' : RPM_FUSION_URL
339 # Start process in a "daemonized" way, using nohup and heavy
340 # stdin/out redirection to avoid connection issues
341 (out,err),proc = rspawn.remote_spawn(
342 "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
343 'packages' : ' '.join(self.required_packages),
344 'rpmfusion' : rpmFusion,
348 stderr = rspawn.STDOUT,
350 host = self.hostname,
352 user = self.slicename,
354 ident_key = self.ident_path,
355 server_key = self.server_key,
360 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
362 def wait_provisioning(self, timeout = 20*60):
363 # recently provisioned nodes may not be up yet
366 while not self.is_alive():
367 time.sleep(sleeptime)
368 totaltime += sleeptime
369 sleeptime = min(30.0, sleeptime*1.5)
371 if totaltime > timeout:
372 # PlanetLab has a 15' delay on configuration propagation
373 # If we're above that delay, the unresponsiveness is not due
375 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
377 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
378 if self.required_packages:
379 pidfile = self.DEPENDS_PIDFILE
383 for probenum in xrange(pidmax):
384 pidtuple = rspawn.remote_check_pid(
386 host = self.hostname,
388 user = self.slicename,
390 ident_key = self.ident_path,
391 server_key = self.server_key
399 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
401 # wait for it to finish
402 while rspawn.RUNNING is rspawn.remote_status(
404 host = self.hostname,
406 user = self.slicename,
408 ident_key = self.ident_path,
409 server_key = self.server_key
412 probe = min(probemax, 1.5*probe)
415 logfile = self.DEPENDS_LOGFILE
417 (out,err),proc = server.popen_ssh_command(
418 "cat %s" % (server.shell_escape(logfile),),
419 host = self.hostname,
421 user = self.slicename,
423 ident_key = self.ident_path,
424 server_key = self.server_key
428 raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
430 success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
432 raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
435 # Make sure all the paths are created where
436 # they have to be created for deployment
437 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
439 host = self.hostname,
441 user = self.slicename,
443 ident_key = self.ident_path,
444 server_key = self.server_key
449 elif not err and out.strip() == 'ALIVE':
455 def configure_routes(self, routes, devs):
457 Add the specified routes to the node's routing table
463 if dev.routes_here(route):
465 dest, prefix, nexthop, metric = route
467 "add %s%s gw %s %s" % (
469 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
478 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
479 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
481 self._logger.info("Setting up routes for %s", self.hostname)
483 (out,err),proc = server.popen_ssh_command(
484 "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
485 home = server.shell_escape(self.home_path)),
486 host = self.hostname,
488 user = self.slicename,
490 ident_key = self.ident_path,
491 server_key = self.server_key,
492 stdin = '\n'.join(rules)
495 if proc.wait() or err:
496 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)