2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
17 from nepi.util import server
18 from nepi.util import parallel
22 class UnresponsiveNodeError(RuntimeError):
27 # Map Node attribute to plcapi filter name
28 'hostname' : 'hostname',
32 # Map Node attribute to (<tag name>, <plcapi filter expression>)
33 # There are replacements that are applied with string formatting,
34 # so '%' has to be escaped as '%%'.
35 'architecture' : ('arch','value'),
36 'operatingSystem' : ('fcdistro','value'),
37 'pl_distro' : ('pldistro','value'),
38 'minReliability' : ('reliability%(timeframe)s', ']value'),
39 'maxReliability' : ('reliability%(timeframe)s', '[value'),
40 'minBandwidth' : ('bw%(timeframe)s', ']value'),
41 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
44 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
45 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
46 RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
47 RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
49 def __init__(self, api=None):
56 self.architecture = None
57 self.operatingSystem = None
60 self.minReliability = None
61 self.maxReliability = None
62 self.minBandwidth = None
63 self.maxBandwidth = None
64 self.min_num_external_ifaces = None
65 self.max_num_external_ifaces = None
68 # Applications and routes add requirements to connected nodes
69 self.required_packages = set()
70 self.required_vsys = set()
72 self.rpmFusion = False
73 self.env = collections.defaultdict(list)
75 # Testbed-derived attributes
77 self.ident_path = None
78 self.server_key = None
81 # Those are filled when an actual node is allocated
83 self._yum_dependencies = None
84 self._installed = False
87 self._logger = logging.getLogger('nepi.testbeds.planetlab')
89 def _nepi_testbed_environment_setup_get(self):
90 command = cStringIO.StringIO()
91 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
92 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
94 command.write(' ; export PATH=$PATH:%s' % (
95 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
98 for envkey, envvals in self.env.iteritems():
99 for envval in envvals:
100 command.write(' ; export %s=%s' % (envkey, envval))
101 return command.getvalue()
102 def _nepi_testbed_environment_setup_set(self, value):
104 _nepi_testbed_environment_setup = property(
105 _nepi_testbed_environment_setup_get,
106 _nepi_testbed_environment_setup_set)
108 def build_filters(self, target_filters, filter_map):
109 for attr, tag in filter_map.iteritems():
110 value = getattr(self, attr, None)
111 if value is not None:
112 target_filters[tag] = value
113 return target_filters
116 def applicable_filters(self):
117 has = lambda att : getattr(self,att,None) is not None
119 filter(has, self.BASEFILTERS.iterkeys())
120 + filter(has, self.TAGFILTERS.iterkeys())
123 def find_candidates(self, filter_slice_id=None):
124 self._logger.info("Finding candidates for %s", self.make_filter_description())
126 fields = ('node_id',)
127 replacements = {'timeframe':self.timeframe}
129 # get initial candidates (no tag filters)
130 basefilters = self.build_filters({}, self.BASEFILTERS)
131 rootfilters = basefilters.copy()
133 basefilters['|slice_ids'] = (filter_slice_id,)
135 # only pick healthy nodes
136 basefilters['run_level'] = 'boot'
137 basefilters['boot_state'] = 'boot'
138 basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
139 basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
141 # keyword-only "pseudofilters"
144 extra['peer'] = self.site
146 candidates = set(map(operator.itemgetter('node_id'),
147 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
149 # filter by tag, one tag at a time
150 applicable = self.applicable_filters
151 for tagfilter in self.TAGFILTERS.iteritems():
152 attr, (tagname, expr) = tagfilter
154 # don't bother if there's no filter defined
155 if attr in applicable:
156 tagfilter = rootfilters.copy()
157 tagfilter['tagname'] = tagname % replacements
158 tagfilter[expr % replacements] = getattr(self,attr)
159 tagfilter['node_id'] = list(candidates)
161 candidates &= set(map(operator.itemgetter('node_id'),
162 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
164 # filter by vsys tags - special case since it doesn't follow
165 # the usual semantics
166 if self.required_vsys:
167 newcandidates = collections.defaultdict(set)
169 vsys_tags = self._api.GetNodeTags(
171 node_id = list(candidates),
172 fields = ['node_id','value'])
175 operator.itemgetter(['node_id','value']),
178 required_vsys = self.required_vsys
179 for node_id, value in vsys_tags:
180 if value in required_vsys:
181 newcandidates[value].add(node_id)
183 # take only those that have all the required vsys tags
184 newcandidates = reduce(
185 lambda accum, new : accum & new,
186 newcandidates.itervalues(),
189 # filter by iface count
190 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
191 # fetch interfaces for all, in one go
192 filters = basefilters.copy()
193 filters['node_id'] = list(candidates)
194 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
195 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
197 # filter candidates by interface count
198 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
199 predicate = ( lambda node_id :
200 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
201 elif self.min_num_external_ifaces is not None:
202 predicate = ( lambda node_id :
203 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
205 predicate = ( lambda node_id :
206 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
208 candidates = set(filter(predicate, candidates))
210 # make sure hostnames are resolvable
212 self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates))
214 hostnames = dict(map(operator.itemgetter('node_id','hostname'),
215 self._api.GetNodes(list(candidates), ['node_id','hostname'])
217 def resolvable(node_id):
219 addr = socket.gethostbyname(hostnames[node_id])
220 return addr is not None
223 candidates = set(parallel.pfilter(resolvable, candidates,
226 self._logger.info(" Found %s reachable candidates.", len(candidates))
230 def make_filter_description(self):
232 Makes a human-readable description of filtering conditions
236 # get initial candidates (no tag filters)
237 filters = self.build_filters({}, self.BASEFILTERS)
239 # keyword-only "pseudofilters"
241 filters['peer'] = self.site
243 # filter by tag, one tag at a time
244 applicable = self.applicable_filters
245 for tagfilter in self.TAGFILTERS.iteritems():
246 attr, (tagname, expr) = tagfilter
248 # don't bother if there's no filter defined
249 if attr in applicable:
250 filters[attr] = getattr(self,attr)
252 # filter by vsys tags - special case since it doesn't follow
253 # the usual semantics
254 if self.required_vsys:
255 filters['vsys'] = ','.join(list(self.required_vsys))
257 # filter by iface count
258 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
259 filters['num_ifaces'] = '-'.join([
260 str(self.min_num_external_ifaces or '0'),
261 str(self.max_num_external_ifaces or 'inf')
264 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
266 def assign_node_id(self, node_id):
267 self._node_id = node_id
268 self.fetch_node_info()
270 def unassign_node(self):
272 self.__dict__.update(self.__orig_attrs)
274 def fetch_node_info(self):
277 info = self._api.GetNodes(self._node_id)[0]
278 tags = dict( (t['tagname'],t['value'])
279 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
281 orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
282 orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
283 self.min_num_external_ifaces = None
284 self.max_num_external_ifaces = None
287 replacements = {'timeframe':self.timeframe}
288 for attr, tag in self.BASEFILTERS.iteritems():
291 if hasattr(self, attr):
292 orig_attrs[attr] = getattr(self, attr)
293 setattr(self, attr, value)
294 for attr, (tag,_) in self.TAGFILTERS.iteritems():
295 tag = tag % replacements
298 if hasattr(self, attr):
299 orig_attrs[attr] = getattr(self, attr)
300 setattr(self, attr, value)
302 if 'peer_id' in info:
303 orig_attrs['site'] = self.site
304 self.site = self._api.peer_map[info['peer_id']]
306 if 'interface_ids' in info:
307 self.min_num_external_ifaces = \
308 self.max_num_external_ifaces = len(info['interface_ids'])
310 if 'ssh_rsa_key' in info:
311 orig_attrs['server_key'] = self.server_key
312 self.server_key = info['ssh_rsa_key']
314 self.__orig_attrs = orig_attrs
317 if self.home_path is None:
318 raise AssertionError, "Misconfigured node: missing home path"
319 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
320 raise AssertionError, "Misconfigured node: missing slice SSH key"
321 if self.slicename is None:
322 raise AssertionError, "Misconfigured node: unspecified slice"
325 # Just mark dependencies installed
326 self._installed = True
328 def install_dependencies(self):
329 if self.required_packages and not self._installed:
330 # If we need rpmfusion, we must install the repo definition and the gpg keys
332 if self.operatingSystem == 'f12':
333 # Fedora 12 requires a different rpmfusion package
334 RPM_FUSION_URL = self.RPM_FUSION_URL_F12
336 # This one works for f13+
337 RPM_FUSION_URL = self.RPM_FUSION_URL
340 '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
342 'RPM_FUSION_URL' : RPM_FUSION_URL
348 (out,err),proc = server.popen_ssh_command(
350 host = self.hostname,
352 user = self.slicename,
354 ident_key = self.ident_path,
355 server_key = self.server_key
359 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
361 # Launch p2p yum dependency installer
362 self._yum_dependencies.async_setup()
364 def wait_provisioning(self, timeout = 20*60):
365 # Wait for the p2p installer
368 while not self.is_alive():
369 time.sleep(sleeptime)
370 totaltime += sleeptime
371 sleeptime = min(30.0, sleeptime*1.5)
373 if totaltime > timeout:
374 # PlanetLab has a 15' delay on configuration propagation
375 # If we're above that delay, the unresponsiveness is not due
377 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
379 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
380 # Wait for the p2p installer
381 if self._yum_dependencies and not self._installed:
382 self._yum_dependencies.async_setup_wait()
383 self._installed = True
386 # Make sure all the paths are created where
387 # they have to be created for deployment
388 (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
390 host = self.hostname,
392 user = self.slicename,
394 ident_key = self.ident_path,
395 server_key = self.server_key
400 elif not err and out.strip() == 'ALIVE':
405 def prepare_dependencies(self):
406 # Configure p2p yum dependency installer
407 if self.required_packages and not self._installed:
408 self._yum_dependencies = application.YumDependency(self._api)
409 self._yum_dependencies.node = self
410 self._yum_dependencies.home_path = "nepi-yumdep"
411 self._yum_dependencies.depends = ' '.join(self.required_packages)
413 def configure_routes(self, routes, devs):
415 Add the specified routes to the node's routing table
421 if dev.routes_here(route):
423 dest, prefix, nexthop, metric = route
425 "add %s%s gw %s %s" % (
427 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
436 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
437 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
439 self._logger.info("Setting up routes for %s", self.hostname)
441 (out,err),proc = server.popen_ssh_command(
442 "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
443 home = server.shell_escape(self.home_path)),
444 host = self.hostname,
446 user = self.slicename,
448 ident_key = self.ident_path,
449 server_key = self.server_key,
450 stdin = '\n'.join(rules)
453 if proc.wait() or err:
454 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)