Ticket #73: part 1, enable spanning tree deployment of package dependencies
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16
17 from nepi.util import server
18 from nepi.util import parallel
19
20 import application
21
22 class UnresponsiveNodeError(RuntimeError):
23     pass
24
25 class Node(object):
26     BASEFILTERS = {
27         # Map Node attribute to plcapi filter name
28         'hostname' : 'hostname',
29     }
30     
31     TAGFILTERS = {
32         # Map Node attribute to (<tag name>, <plcapi filter expression>)
33         #   There are replacements that are applied with string formatting,
34         #   so '%' has to be escaped as '%%'.
35         'architecture' : ('arch','value'),
36         'operatingSystem' : ('fcdistro','value'),
37         'pl_distro' : ('pldistro','value'),
38         'minReliability' : ('reliability%(timeframe)s', ']value'),
39         'maxReliability' : ('reliability%(timeframe)s', '[value'),
40         'minBandwidth' : ('bw%(timeframe)s', ']value'),
41         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
42     }    
43     
44     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
45     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
46     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
47     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
48     
49     def __init__(self, api=None):
50         if not api:
51             api = plcapi.PLCAPI()
52         self._api = api
53         
54         # Attributes
55         self.hostname = None
56         self.architecture = None
57         self.operatingSystem = None
58         self.pl_distro = None
59         self.site = None
60         self.minReliability = None
61         self.maxReliability = None
62         self.minBandwidth = None
63         self.maxBandwidth = None
64         self.min_num_external_ifaces = None
65         self.max_num_external_ifaces = None
66         self.timeframe = 'm'
67         
68         # Applications and routes add requirements to connected nodes
69         self.required_packages = set()
70         self.required_vsys = set()
71         self.pythonpath = []
72         self.rpmFusion = False
73         self.env = collections.defaultdict(list)
74         
75         # Testbed-derived attributes
76         self.slicename = None
77         self.ident_path = None
78         self.server_key = None
79         self.home_path = None
80         
81         # Those are filled when an actual node is allocated
82         self._node_id = None
83         self._yum_dependencies = None
84
85         # Logging
86         self._logger = logging.getLogger('nepi.testbeds.planetlab')
87     
88     @property
89     def _nepi_testbed_environment_setup(self):
90         command = cStringIO.StringIO()
91         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
92             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
93         ))
94         command.write(' ; export PATH=$PATH:%s' % (
95             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
96         ))
97         if self.env:
98             for envkey, envvals in self.env.iteritems():
99                 for envval in envvals:
100                     command.write(' ; export %s=%s' % (envkey, envval))
101         return command.getvalue()
102     
103     def build_filters(self, target_filters, filter_map):
104         for attr, tag in filter_map.iteritems():
105             value = getattr(self, attr, None)
106             if value is not None:
107                 target_filters[tag] = value
108         return target_filters
109     
110     @property
111     def applicable_filters(self):
112         has = lambda att : getattr(self,att,None) is not None
113         return (
114             filter(has, self.BASEFILTERS.iterkeys())
115             + filter(has, self.TAGFILTERS.iterkeys())
116         )
117     
118     def find_candidates(self, filter_slice_id=None):
119         self._logger.info("Finding candidates for %s", self.make_filter_description())
120         
121         fields = ('node_id',)
122         replacements = {'timeframe':self.timeframe}
123         
124         # get initial candidates (no tag filters)
125         basefilters = self.build_filters({}, self.BASEFILTERS)
126         rootfilters = basefilters.copy()
127         if filter_slice_id:
128             basefilters['|slice_ids'] = (filter_slice_id,)
129         
130         # only pick healthy nodes
131         basefilters['run_level'] = 'boot'
132         basefilters['boot_state'] = 'boot'
133         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
134         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
135         
136         # keyword-only "pseudofilters"
137         extra = {}
138         if self.site:
139             extra['peer'] = self.site
140             
141         candidates = set(map(operator.itemgetter('node_id'), 
142             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
143         
144         # filter by tag, one tag at a time
145         applicable = self.applicable_filters
146         for tagfilter in self.TAGFILTERS.iteritems():
147             attr, (tagname, expr) = tagfilter
148             
149             # don't bother if there's no filter defined
150             if attr in applicable:
151                 tagfilter = rootfilters.copy()
152                 tagfilter['tagname'] = tagname % replacements
153                 tagfilter[expr % replacements] = getattr(self,attr)
154                 tagfilter['node_id'] = list(candidates)
155                 
156                 candidates &= set(map(operator.itemgetter('node_id'),
157                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
158         
159         # filter by vsys tags - special case since it doesn't follow
160         # the usual semantics
161         if self.required_vsys:
162             newcandidates = collections.defaultdict(set)
163             
164             vsys_tags = self._api.GetNodeTags(
165                 tagname='vsys', 
166                 node_id = list(candidates), 
167                 fields = ['node_id','value'])
168             
169             vsys_tags = map(
170                 operator.itemgetter(['node_id','value']),
171                 vsys_tags)
172             
173             required_vsys = self.required_vsys
174             for node_id, value in vsys_tags:
175                 if value in required_vsys:
176                     newcandidates[value].add(node_id)
177             
178             # take only those that have all the required vsys tags
179             newcandidates = reduce(
180                 lambda accum, new : accum & new,
181                 newcandidates.itervalues(),
182                 candidates)
183         
184         # filter by iface count
185         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
186             # fetch interfaces for all, in one go
187             filters = basefilters.copy()
188             filters['node_id'] = list(candidates)
189             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
190                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
191             
192             # filter candidates by interface count
193             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
194                 predicate = ( lambda node_id : 
195                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
196             elif self.min_num_external_ifaces is not None:
197                 predicate = ( lambda node_id : 
198                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
199             else:
200                 predicate = ( lambda node_id : 
201                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
202             
203             candidates = set(filter(predicate, candidates))
204         
205         # make sure hostnames are resolvable
206         if candidates:
207             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
208             
209             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
210                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
211             ))
212             def resolvable(node_id):
213                 try:
214                     addr = socket.gethostbyname(hostnames[node_id])
215                     return addr is not None
216                 except:
217                     return False
218             candidates = set(parallel.pfilter(resolvable, candidates,
219                 maxthreads = 16))
220
221             self._logger.info("  Found %s reachable candidates.", len(candidates))
222             
223         return candidates
224     
225     def make_filter_description(self):
226         """
227         Makes a human-readable description of filtering conditions
228         for find_candidates.
229         """
230         
231         # get initial candidates (no tag filters)
232         filters = self.build_filters({}, self.BASEFILTERS)
233         
234         # keyword-only "pseudofilters"
235         if self.site:
236             filters['peer'] = self.site
237             
238         # filter by tag, one tag at a time
239         applicable = self.applicable_filters
240         for tagfilter in self.TAGFILTERS.iteritems():
241             attr, (tagname, expr) = tagfilter
242             
243             # don't bother if there's no filter defined
244             if attr in applicable:
245                 filters[attr] = getattr(self,attr)
246         
247         # filter by vsys tags - special case since it doesn't follow
248         # the usual semantics
249         if self.required_vsys:
250             filters['vsys'] = ','.join(list(self.required_vsys))
251         
252         # filter by iface count
253         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
254             filters['num_ifaces'] = '-'.join([
255                 str(self.min_num_external_ifaces or '0'),
256                 str(self.max_num_external_ifaces or 'inf')
257             ])
258             
259         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
260
261     def assign_node_id(self, node_id):
262         self._node_id = node_id
263         self.fetch_node_info()
264     
265     def unassign_node(self):
266         self._node_id = None
267         self.__dict__.update(self.__orig_attrs)
268     
269     def fetch_node_info(self):
270         orig_attrs = {}
271         
272         info = self._api.GetNodes(self._node_id)[0]
273         tags = dict( (t['tagname'],t['value'])
274                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
275
276         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
277         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
278         self.min_num_external_ifaces = None
279         self.max_num_external_ifaces = None
280         self.timeframe = 'm'
281         
282         replacements = {'timeframe':self.timeframe}
283         for attr, tag in self.BASEFILTERS.iteritems():
284             if tag in info:
285                 value = info[tag]
286                 if hasattr(self, attr):
287                     orig_attrs[attr] = getattr(self, attr)
288                 setattr(self, attr, value)
289         for attr, (tag,_) in self.TAGFILTERS.iteritems():
290             tag = tag % replacements
291             if tag in tags:
292                 value = tags[tag]
293                 if hasattr(self, attr):
294                     orig_attrs[attr] = getattr(self, attr)
295                 setattr(self, attr, value)
296         
297         if 'peer_id' in info:
298             orig_attrs['site'] = self.site
299             self.site = self._api.peer_map[info['peer_id']]
300         
301         if 'interface_ids' in info:
302             self.min_num_external_ifaces = \
303             self.max_num_external_ifaces = len(info['interface_ids'])
304         
305         if 'ssh_rsa_key' in info:
306             orig_attrs['server_key'] = self.server_key
307             self.server_key = info['ssh_rsa_key']
308         
309         self.__orig_attrs = orig_attrs
310
311     def validate(self):
312         if self.home_path is None:
313             raise AssertionError, "Misconfigured node: missing home path"
314         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
315             raise AssertionError, "Misconfigured node: missing slice SSH key"
316         if self.slicename is None:
317             raise AssertionError, "Misconfigured node: unspecified slice"
318
319     def install_dependencies(self):
320         if self.required_packages:
321             # If we need rpmfusion, we must install the repo definition and the gpg keys
322             if self.rpmFusion:
323                 if self.operatingSystem == 'f12':
324                     # Fedora 12 requires a different rpmfusion package
325                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
326                 else:
327                     # This one works for f13+
328                     RPM_FUSION_URL = self.RPM_FUSION_URL
329                     
330                 rpmFusion = (
331                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
332                 ) % {
333                     'RPM_FUSION_URL' : RPM_FUSION_URL
334                 }
335             else:
336                 rpmFusion = ''
337             
338             if rpmFusion:
339                 (out,err),proc = server.popen_ssh_command(
340                     rpmFusion,
341                     host = self.hostname,
342                     port = None,
343                     user = self.slicename,
344                     agent = None,
345                     ident_key = self.ident_path,
346                     server_key = self.server_key
347                     )
348                 
349                 if proc.wait():
350                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
351             
352             # Launch p2p yum dependency installer
353             self._yum_dependencies.async_setup()
354     
355     def wait_provisioning(self, timeout = 20*60):
356         # Wait for the p2p installer
357         sleeptime = 1.0
358         totaltime = 0.0
359         while not self.is_alive():
360             time.sleep(sleeptime)
361             totaltime += sleeptime
362             sleeptime = min(30.0, sleeptime*1.5)
363             
364             if totaltime > timeout:
365                 # PlanetLab has a 15' delay on configuration propagation
366                 # If we're above that delay, the unresponsiveness is not due
367                 # to this delay.
368                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
369     
370     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
371         # Wait for the p2p installer
372         if self._yum_dependencies:
373             self._yum_dependencies.async_setup_wait()
374         
375     def is_alive(self):
376         # Make sure all the paths are created where 
377         # they have to be created for deployment
378         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
379             "echo 'ALIVE'",
380             host = self.hostname,
381             port = None,
382             user = self.slicename,
383             agent = None,
384             ident_key = self.ident_path,
385             server_key = self.server_key
386             )
387         
388         if proc.wait():
389             return False
390         elif not err and out.strip() == 'ALIVE':
391             return True
392         else:
393             return False
394     
395     def prepare_dependencies(self):
396         # Configure p2p yum dependency installer
397         if self.required_packages:
398             self._yum_dependencies = application.YumDependency(self._api)
399             self._yum_dependencies.node = self
400             self._yum_dependencies.home_path = "nepi-yumdep"
401             self._yum_dependencies.depends = ' '.join(self.required_packages)
402
403     def configure_routes(self, routes, devs):
404         """
405         Add the specified routes to the node's routing table
406         """
407         rules = []
408         
409         for route in routes:
410             for dev in devs:
411                 if dev.routes_here(route):
412                     # Schedule rule
413                     dest, prefix, nexthop, metric = route
414                     rules.append(
415                         "add %s%s gw %s %s" % (
416                             dest,
417                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
418                             nexthop,
419                             dev.if_name,
420                         )
421                     )
422                     
423                     # Stop checking
424                     break
425             else:
426                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
427                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
428         
429         self._logger.info("Setting up routes for %s", self.hostname)
430         
431         (out,err),proc = server.popen_ssh_command(
432             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
433                 home = server.shell_escape(self.home_path)),
434             host = self.hostname,
435             port = None,
436             user = self.slicename,
437             agent = None,
438             ident_key = self.ident_path,
439             server_key = self.server_key,
440             stdin = '\n'.join(rules)
441             )
442         
443         if proc.wait() or err:
444             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
445         
446         
447