2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
16 from nepi.util import server
17 from nepi.util import parallel
19 class UnresponsiveNodeError(RuntimeError):
24 # Map Node attribute to plcapi filter name
25 'hostname' : 'hostname',
29 # Map Node attribute to (<tag name>, <plcapi filter expression>)
30 # There are replacements that are applied with string formatting,
31 # so '%' has to be escaped as '%%'.
32 'architecture' : ('arch','value'),
33 'operatingSystem' : ('fcdistro','value'),
34 'pl_distro' : ('pldistro','value'),
35 'minReliability' : ('reliability%(timeframe)s', ']value'),
36 'maxReliability' : ('reliability%(timeframe)s', '[value'),
37 'minBandwidth' : ('bw%(timeframe)s', ']value'),
38 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
41 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
42 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
44 def __init__(self, api=None):
51 self.architecture = None
52 self.operatingSystem = None
56 self.minReliability = None
57 self.maxReliability = None
58 self.minBandwidth = None
59 self.maxBandwidth = None
60 self.min_num_external_ifaces = None
61 self.max_num_external_ifaces = None
64 # Applications and routes add requirements to connected nodes
65 self.required_packages = set()
66 self.required_vsys = set()
68 self.env = collections.defaultdict(list)
70 # Testbed-derived attributes
72 self.ident_path = None
73 self.server_key = None
76 # Those are filled when an actual node is allocated
80 def _nepi_testbed_environment_setup(self):
81 command = cStringIO.StringIO()
82 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
83 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
85 command.write(' ; export PATH=$PATH:%s' % (
86 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
89 for envkey, envvals in self.env.iteritems():
90 for envval in envvals:
91 command.write(' ; export %s=%s' % (envkey, envval))
92 return command.getvalue()
94 def build_filters(self, target_filters, filter_map):
95 for attr, tag in filter_map.iteritems():
96 value = getattr(self, attr, None)
98 target_filters[tag] = value
102 def applicable_filters(self):
103 has = lambda att : getattr(self,att,None) is not None
105 filter(has, self.BASEFILTERS.iterkeys())
106 + filter(has, self.TAGFILTERS.iterkeys())
109 def find_candidates(self, filter_slice_id=None):
110 print >>sys.stderr, "Finding candidates for", self.make_filter_description()
112 fields = ('node_id',)
113 replacements = {'timeframe':self.timeframe}
115 # get initial candidates (no tag filters)
116 basefilters = self.build_filters({}, self.BASEFILTERS)
117 rootfilters = basefilters.copy()
119 basefilters['|slice_ids'] = (filter_slice_id,)
121 # only pick healthy nodes
122 basefilters['run_level'] = 'boot'
123 basefilters['boot_state'] = 'boot'
124 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
125 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
127 # keyword-only "pseudofilters"
130 extra['peer'] = self.site
132 candidates = set(map(operator.itemgetter('node_id'),
133 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
135 # filter by tag, one tag at a time
136 applicable = self.applicable_filters
137 for tagfilter in self.TAGFILTERS.iteritems():
138 attr, (tagname, expr) = tagfilter
140 # don't bother if there's no filter defined
141 if attr in applicable:
142 tagfilter = rootfilters.copy()
143 tagfilter['tagname'] = tagname % replacements
144 tagfilter[expr % replacements] = getattr(self,attr)
145 tagfilter['node_id'] = list(candidates)
147 candidates &= set(map(operator.itemgetter('node_id'),
148 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
150 # filter by vsys tags - special case since it doesn't follow
151 # the usual semantics
152 if self.required_vsys:
153 newcandidates = collections.defaultdict(set)
155 vsys_tags = self._api.GetNodeTags(
157 node_id = list(candidates),
158 fields = ['node_id','value'])
161 operator.itemgetter(['node_id','value']),
164 required_vsys = self.required_vsys
165 for node_id, value in vsys_tags:
166 if value in required_vsys:
167 newcandidates[value].add(node_id)
169 # take only those that have all the required vsys tags
170 newcandidates = reduce(
171 lambda accum, new : accum & new,
172 newcandidates.itervalues(),
175 # filter by iface count
176 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
177 # fetch interfaces for all, in one go
178 filters = basefilters.copy()
179 filters['node_id'] = list(candidates)
180 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
181 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
183 # filter candidates by interface count
184 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
185 predicate = ( lambda node_id :
186 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
187 elif self.min_num_external_ifaces is not None:
188 predicate = ( lambda node_id :
189 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
191 predicate = ( lambda node_id :
192 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
194 candidates = set(filter(predicate, candidates))
196 # make sure hostnames are resolvable
198 print >>sys.stderr, " Found", len(candidates), "candidates. Checking for reachability..."
200 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
201 self._api.GetNodes(list(candidates), ['node_id','hostname'])
203 def resolvable(node_id):
205 addr = socket.gethostbyname(hostnames[node_id])
206 return addr is not None
209 candidates = set(parallel.pfilter(resolvable, candidates,
212 print >>sys.stderr, " Found", len(candidates), "reachable candidates."
216 def make_filter_description(self):
218 Makes a human-readable description of filtering conditions
222 # get initial candidates (no tag filters)
223 filters = self.build_filters({}, self.BASEFILTERS)
225 # keyword-only "pseudofilters"
227 filters['peer'] = self.site
229 # filter by tag, one tag at a time
230 applicable = self.applicable_filters
231 for tagfilter in self.TAGFILTERS.iteritems():
232 attr, (tagname, expr) = tagfilter
234 # don't bother if there's no filter defined
235 if attr in applicable:
236 filters[attr] = getattr(self,attr)
238 # filter by vsys tags - special case since it doesn't follow
239 # the usual semantics
240 if self.required_vsys:
241 filters['vsys'] = ','.join(list(self.required_vsys))
243 # filter by iface count
244 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
245 filters['num_ifaces'] = '-'.join([
246 str(self.min_num_external_ifaces or '0'),
247 str(self.max_num_external_ifaces or 'inf')
250 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
252 def assign_node_id(self, node_id):
253 self._node_id = node_id
254 self.fetch_node_info()
256 def unassign_node(self):
258 self.__dict__.update(self.__orig_attrs)
260 def fetch_node_info(self):
263 info = self._api.GetNodes(self._node_id)[0]
264 tags = dict( (t['tagname'],t['value'])
265 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
267 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
268 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
269 self.min_num_external_ifaces = None
270 self.max_num_external_ifaces = None
273 replacements = {'timeframe':self.timeframe}
274 for attr, tag in self.BASEFILTERS.iteritems():
277 if hasattr(self, attr):
278 orig_attrs[attr] = getattr(self, attr)
279 setattr(self, attr, value)
280 for attr, (tag,_) in self.TAGFILTERS.iteritems():
281 tag = tag % replacements
284 if hasattr(self, attr):
285 orig_attrs[attr] = getattr(self, attr)
286 setattr(self, attr, value)
288 if 'peer_id' in info:
289 orig_attrs['site'] = self.site
290 self.site = self._api.peer_map[info['peer_id']]
292 if 'interface_ids' in info:
293 self.min_num_external_ifaces = \
294 self.max_num_external_ifaces = len(info['interface_ids'])
296 if 'ssh_rsa_key' in info:
297 orig_attrs['server_key'] = self.server_key
298 self.server_key = info['ssh_rsa_key']
300 self.__orig_attrs = orig_attrs
303 if self.home_path is None:
304 raise AssertionError, "Misconfigured node: missing home path"
305 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
306 raise AssertionError, "Misconfigured node: missing slice SSH key"
307 if self.slicename is None:
308 raise AssertionError, "Misconfigured node: unspecified slice"
310 def install_dependencies(self):
311 if self.required_packages:
312 # TODO: make dependant on the experiment somehow...
313 pidfile = self.DEPENDS_PIDFILE
314 logfile = self.DEPENDS_LOGFILE
316 # Start process in a "daemonized" way, using nohup and heavy
317 # stdin/out redirection to avoid connection issues
318 (out,err),proc = rspawn.remote_spawn(
319 "( yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
320 'packages' : ' '.join(self.required_packages),
324 stderr = rspawn.STDOUT,
326 host = self.hostname,
328 user = self.slicename,
330 ident_key = self.ident_path,
331 server_key = self.server_key,
336 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
338 def wait_provisioning(self):
339 # recently provisioned nodes may not be up yet
342 while not self.is_alive():
343 time.sleep(sleeptime)
344 totaltime += sleeptime
345 sleeptime = min(30.0, sleeptime*1.5)
347 if totaltime > 20*60:
348 # PlanetLab has a 15' delay on configuration propagation
349 # If we're above that delay, the unresponsiveness is not due
351 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
353 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
354 if self.required_packages:
355 pidfile = self.DEPENDS_PIDFILE
359 for probenum in xrange(pidmax):
360 pidtuple = rspawn.remote_check_pid(
362 host = self.hostname,
364 user = self.slicename,
366 ident_key = self.ident_path,
367 server_key = self.server_key
375 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
377 # wait for it to finish
378 while rspawn.RUNNING is rspawn.remote_status(
380 host = self.hostname,
382 user = self.slicename,
384 ident_key = self.ident_path,
385 server_key = self.server_key
388 probe = min(probemax, 1.5*probe)
391 logfile = self.DEPENDS_LOGFILE
393 (out,err),proc = server.popen_ssh_command(
394 "cat %s" % (server.shell_escape(logfile),),
395 host = self.hostname,
397 user = self.slicename,
399 ident_key = self.ident_path,
400 server_key = self.server_key
404 raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
406 success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
408 raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
411 # Make sure all the paths are created where
412 # they have to be created for deployment
413 (out,err),proc = server.popen_ssh_command(
415 host = self.hostname,
417 user = self.slicename,
419 ident_key = self.ident_path,
420 server_key = self.server_key
425 elif not err and out.strip() == 'ALIVE':
431 def configure_routes(self, routes, devs):
433 Add the specified routes to the node's routing table
439 if dev.routes_here(route):
441 dest, prefix, nexthop = route
443 "add %s%s gw %s %s" % (
445 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
454 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
455 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
457 (out,err),proc = server.popen_ssh_command(
458 "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
459 home = server.shell_escape(self.home_path)),
460 host = self.hostname,
462 user = self.slicename,
464 ident_key = self.ident_path,
465 server_key = self.server_key,
466 stdin = '\n'.join(rules)
469 if proc.wait() or err:
470 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)