Add useful and important tag filters for node selection:
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13 import socket
14 import sys
15 import logging
16
17 from nepi.util import server
18 from nepi.util import parallel
19
20 import application
21
22 class UnresponsiveNodeError(RuntimeError):
23     pass
24
25 def _castproperty(typ, propattr):
26     def _get(self):
27         return getattr(self, propattr)
28     def _set(self, value):
29         if value is not None or (isinstance(value, basestring) and not value):
30             value = typ(value)
31         return setattr(self, propattr, value)
32     def _del(self, value):
33         return delattr(self, propattr)
34     _get.__name__ = propattr + '_get'
35     _set.__name__ = propattr + '_set'
36     _del.__name__ = propattr + '_del'
37     return property(_get, _set, _del)
38
39 class Node(object):
40     BASEFILTERS = {
41         # Map Node attribute to plcapi filter name
42         'hostname' : 'hostname',
43     }
44     
45     TAGFILTERS = {
46         # Map Node attribute to (<tag name>, <plcapi filter expression>)
47         #   There are replacements that are applied with string formatting,
48         #   so '%' has to be escaped as '%%'.
49         'architecture' : ('arch','value'),
50         'operatingSystem' : ('fcdistro','value'),
51         'pl_distro' : ('pldistro','value'),
52         'city' : ('city','value'),
53         'country' : ('country','value'),
54         'region' : ('region','value'),
55         'minReliability' : ('reliability%(timeframe)s', ']value'),
56         'maxReliability' : ('reliability%(timeframe)s', '[value'),
57         'minBandwidth' : ('bw%(timeframe)s', ']value'),
58         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
59         'minLoad' : ('load%(timeframe)s', ']value'),
60         'maxLoad' : ('load%(timeframe)s', '[value'),
61         'minCpu' : ('cpu%(timeframe)s', ']value'),
62         'maxCpu' : ('cpu%(timeframe)s', '[value'),
63     }    
64     
65     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
66     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
67     RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
68     RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
69     
70     minReliability = _castproperty(float, '_minReliability')
71     maxReliability = _castproperty(float, '_maxReliability')
72     minBandwidth = _castproperty(float, '_minBandwidth')
73     maxBandwidth = _castproperty(float, '_maxBandwidth')
74     minCpu = _castproperty(float, '_minCpu')
75     maxCpu = _castproperty(float, '_maxCpu')
76     minLoad = _castproperty(float, '_minLoad')
77     maxLoad = _castproperty(float, '_maxLoad')
78     
79     def __init__(self, api=None):
80         if not api:
81             api = plcapi.PLCAPI()
82         self._api = api
83         
84         # Attributes
85         self.hostname = None
86         self.architecture = None
87         self.operatingSystem = None
88         self.pl_distro = None
89         self.site = None
90         self.city = None
91         self.country = None
92         self.region = None
93         self.minReliability = None
94         self.maxReliability = None
95         self.minBandwidth = None
96         self.maxBandwidth = None
97         self.minCpu = None
98         self.maxCpu = None
99         self.minLoad = None
100         self.maxLoad = None
101         self.min_num_external_ifaces = None
102         self.max_num_external_ifaces = None
103         self.timeframe = 'm'
104         
105         # Applications and routes add requirements to connected nodes
106         self.required_packages = set()
107         self.required_vsys = set()
108         self.pythonpath = []
109         self.rpmFusion = False
110         self.env = collections.defaultdict(list)
111         
112         # Testbed-derived attributes
113         self.slicename = None
114         self.ident_path = None
115         self.server_key = None
116         self.home_path = None
117         
118         # Those are filled when an actual node is allocated
119         self._node_id = None
120         self._yum_dependencies = None
121         self._installed = False
122
123         # Logging
124         self._logger = logging.getLogger('nepi.testbeds.planetlab')
125     
126     def _nepi_testbed_environment_setup_get(self):
127         command = cStringIO.StringIO()
128         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
129             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
130         ))
131         command.write(' ; export PATH=$PATH:%s' % (
132             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
133         ))
134         if self.env:
135             for envkey, envvals in self.env.iteritems():
136                 for envval in envvals:
137                     command.write(' ; export %s=%s' % (envkey, envval))
138         return command.getvalue()
139     def _nepi_testbed_environment_setup_set(self, value):
140         pass
141     _nepi_testbed_environment_setup = property(
142         _nepi_testbed_environment_setup_get,
143         _nepi_testbed_environment_setup_set)
144     
145     def build_filters(self, target_filters, filter_map):
146         for attr, tag in filter_map.iteritems():
147             value = getattr(self, attr, None)
148             if value is not None:
149                 target_filters[tag] = value
150         return target_filters
151     
152     @property
153     def applicable_filters(self):
154         has = lambda att : getattr(self,att,None) is not None
155         return (
156             filter(has, self.BASEFILTERS.iterkeys())
157             + filter(has, self.TAGFILTERS.iterkeys())
158         )
159     
160     def find_candidates(self, filter_slice_id=None):
161         self._logger.info("Finding candidates for %s", self.make_filter_description())
162         
163         fields = ('node_id',)
164         replacements = {'timeframe':self.timeframe}
165         
166         # get initial candidates (no tag filters)
167         basefilters = self.build_filters({}, self.BASEFILTERS)
168         rootfilters = basefilters.copy()
169         if filter_slice_id:
170             basefilters['|slice_ids'] = (filter_slice_id,)
171         
172         # only pick healthy nodes
173         basefilters['run_level'] = 'boot'
174         basefilters['boot_state'] = 'boot'
175         basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
176         basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
177         
178         # keyword-only "pseudofilters"
179         extra = {}
180         if self.site:
181             extra['peer'] = self.site
182             
183         candidates = set(map(operator.itemgetter('node_id'), 
184             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
185         
186         # filter by tag, one tag at a time
187         applicable = self.applicable_filters
188         for tagfilter in self.TAGFILTERS.iteritems():
189             attr, (tagname, expr) = tagfilter
190             
191             # don't bother if there's no filter defined
192             if attr in applicable:
193                 tagfilter = rootfilters.copy()
194                 tagfilter['tagname'] = tagname % replacements
195                 tagfilter[expr % replacements] = getattr(self,attr)
196                 tagfilter['node_id'] = list(candidates)
197                 
198                 candidates &= set(map(operator.itemgetter('node_id'),
199                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
200         
201         # filter by vsys tags - special case since it doesn't follow
202         # the usual semantics
203         if self.required_vsys:
204             newcandidates = collections.defaultdict(set)
205             
206             vsys_tags = self._api.GetNodeTags(
207                 tagname='vsys', 
208                 node_id = list(candidates), 
209                 fields = ['node_id','value'])
210             
211             vsys_tags = map(
212                 operator.itemgetter(['node_id','value']),
213                 vsys_tags)
214             
215             required_vsys = self.required_vsys
216             for node_id, value in vsys_tags:
217                 if value in required_vsys:
218                     newcandidates[value].add(node_id)
219             
220             # take only those that have all the required vsys tags
221             newcandidates = reduce(
222                 lambda accum, new : accum & new,
223                 newcandidates.itervalues(),
224                 candidates)
225         
226         # filter by iface count
227         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
228             # fetch interfaces for all, in one go
229             filters = basefilters.copy()
230             filters['node_id'] = list(candidates)
231             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
232                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
233             
234             # filter candidates by interface count
235             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
236                 predicate = ( lambda node_id : 
237                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
238             elif self.min_num_external_ifaces is not None:
239                 predicate = ( lambda node_id : 
240                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
241             else:
242                 predicate = ( lambda node_id : 
243                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
244             
245             candidates = set(filter(predicate, candidates))
246         
247         # make sure hostnames are resolvable
248         if candidates:
249             self._logger.info("  Found %s candidates. Checking for reachability...", len(candidates))
250             
251             hostnames = dict(map(operator.itemgetter('node_id','hostname'),
252                 self._api.GetNodes(list(candidates), ['node_id','hostname'])
253             ))
254             def resolvable(node_id):
255                 try:
256                     addr = socket.gethostbyname(hostnames[node_id])
257                     return addr is not None
258                 except:
259                     return False
260             candidates = set(parallel.pfilter(resolvable, candidates,
261                 maxthreads = 16))
262
263             self._logger.info("  Found %s reachable candidates.", len(candidates))
264             
265         return candidates
266     
267     def make_filter_description(self):
268         """
269         Makes a human-readable description of filtering conditions
270         for find_candidates.
271         """
272         
273         # get initial candidates (no tag filters)
274         filters = self.build_filters({}, self.BASEFILTERS)
275         
276         # keyword-only "pseudofilters"
277         if self.site:
278             filters['peer'] = self.site
279             
280         # filter by tag, one tag at a time
281         applicable = self.applicable_filters
282         for tagfilter in self.TAGFILTERS.iteritems():
283             attr, (tagname, expr) = tagfilter
284             
285             # don't bother if there's no filter defined
286             if attr in applicable:
287                 filters[attr] = getattr(self,attr)
288         
289         # filter by vsys tags - special case since it doesn't follow
290         # the usual semantics
291         if self.required_vsys:
292             filters['vsys'] = ','.join(list(self.required_vsys))
293         
294         # filter by iface count
295         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
296             filters['num_ifaces'] = '-'.join([
297                 str(self.min_num_external_ifaces or '0'),
298                 str(self.max_num_external_ifaces or 'inf')
299             ])
300             
301         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
302
303     def assign_node_id(self, node_id):
304         self._node_id = node_id
305         self.fetch_node_info()
306     
307     def unassign_node(self):
308         self._node_id = None
309         self.__dict__.update(self.__orig_attrs)
310     
311     def fetch_node_info(self):
312         orig_attrs = {}
313         
314         info = self._api.GetNodes(self._node_id)[0]
315         tags = dict( (t['tagname'],t['value'])
316                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
317
318         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
319         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
320         self.min_num_external_ifaces = None
321         self.max_num_external_ifaces = None
322         self.timeframe = 'm'
323         
324         replacements = {'timeframe':self.timeframe}
325         for attr, tag in self.BASEFILTERS.iteritems():
326             if tag in info:
327                 value = info[tag]
328                 if hasattr(self, attr):
329                     orig_attrs[attr] = getattr(self, attr)
330                 setattr(self, attr, value)
331         for attr, (tag,_) in self.TAGFILTERS.iteritems():
332             tag = tag % replacements
333             if tag in tags:
334                 value = tags[tag]
335                 if hasattr(self, attr):
336                     orig_attrs[attr] = getattr(self, attr)
337                 setattr(self, attr, value)
338         
339         if 'peer_id' in info:
340             orig_attrs['site'] = self.site
341             self.site = self._api.peer_map[info['peer_id']]
342         
343         if 'interface_ids' in info:
344             self.min_num_external_ifaces = \
345             self.max_num_external_ifaces = len(info['interface_ids'])
346         
347         if 'ssh_rsa_key' in info:
348             orig_attrs['server_key'] = self.server_key
349             self.server_key = info['ssh_rsa_key']
350         
351         self.__orig_attrs = orig_attrs
352
353     def validate(self):
354         if self.home_path is None:
355             raise AssertionError, "Misconfigured node: missing home path"
356         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
357             raise AssertionError, "Misconfigured node: missing slice SSH key"
358         if self.slicename is None:
359             raise AssertionError, "Misconfigured node: unspecified slice"
360
361     def recover(self):
362         # Mark dependencies installed
363         self._installed = True
364         
365         # Clear load attributes, they impair re-discovery
366         self.minReliability = \
367         self.maxReliability = \
368         self.minBandwidth = \
369         self.maxBandwidth = \
370         self.minCpu = \
371         self.maxCpu = \
372         self.minLoad = \
373         self.maxLoad = None
374
375     def install_dependencies(self):
376         if self.required_packages and not self._installed:
377             # If we need rpmfusion, we must install the repo definition and the gpg keys
378             if self.rpmFusion:
379                 if self.operatingSystem == 'f12':
380                     # Fedora 12 requires a different rpmfusion package
381                     RPM_FUSION_URL = self.RPM_FUSION_URL_F12
382                 else:
383                     # This one works for f13+
384                     RPM_FUSION_URL = self.RPM_FUSION_URL
385                     
386                 rpmFusion = (
387                   '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
388                 ) % {
389                     'RPM_FUSION_URL' : RPM_FUSION_URL
390                 }
391             else:
392                 rpmFusion = ''
393             
394             if rpmFusion:
395                 (out,err),proc = server.popen_ssh_command(
396                     rpmFusion,
397                     host = self.hostname,
398                     port = None,
399                     user = self.slicename,
400                     agent = None,
401                     ident_key = self.ident_path,
402                     server_key = self.server_key
403                     )
404                 
405                 if proc.wait():
406                     raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
407             
408             # Launch p2p yum dependency installer
409             self._yum_dependencies.async_setup()
410     
411     def wait_provisioning(self, timeout = 20*60):
412         # Wait for the p2p installer
413         sleeptime = 1.0
414         totaltime = 0.0
415         while not self.is_alive():
416             time.sleep(sleeptime)
417             totaltime += sleeptime
418             sleeptime = min(30.0, sleeptime*1.5)
419             
420             if totaltime > timeout:
421                 # PlanetLab has a 15' delay on configuration propagation
422                 # If we're above that delay, the unresponsiveness is not due
423                 # to this delay.
424                 raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
425     
426     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
427         # Wait for the p2p installer
428         if self._yum_dependencies and not self._installed:
429             self._yum_dependencies.async_setup_wait()
430             self._installed = True
431         
432     def is_alive(self):
433         # Make sure all the paths are created where 
434         # they have to be created for deployment
435         (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
436             "echo 'ALIVE'",
437             host = self.hostname,
438             port = None,
439             user = self.slicename,
440             agent = None,
441             ident_key = self.ident_path,
442             server_key = self.server_key
443             )
444         
445         if proc.wait():
446             return False
447         elif not err and out.strip() == 'ALIVE':
448             return True
449         else:
450             return False
451     
452     def prepare_dependencies(self):
453         # Configure p2p yum dependency installer
454         if self.required_packages and not self._installed:
455             self._yum_dependencies = application.YumDependency(self._api)
456             self._yum_dependencies.node = self
457             self._yum_dependencies.home_path = "nepi-yumdep"
458             self._yum_dependencies.depends = ' '.join(self.required_packages)
459
460     def configure_routes(self, routes, devs):
461         """
462         Add the specified routes to the node's routing table
463         """
464         rules = []
465         
466         for route in routes:
467             for dev in devs:
468                 if dev.routes_here(route):
469                     # Schedule rule
470                     dest, prefix, nexthop, metric = route
471                     rules.append(
472                         "add %s%s gw %s %s" % (
473                             dest,
474                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
475                             nexthop,
476                             dev.if_name,
477                         )
478                     )
479                     
480                     # Stop checking
481                     break
482             else:
483                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
484                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
485         
486         self._logger.info("Setting up routes for %s", self.hostname)
487         
488         (out,err),proc = server.popen_ssh_command(
489             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
490                 home = server.shell_escape(self.home_path)),
491             host = self.hostname,
492             port = None,
493             user = self.slicename,
494             agent = None,
495             ident_key = self.ident_path,
496             server_key = self.server_key,
497             stdin = '\n'.join(rules)
498             )
499         
500         if proc.wait() or err:
501             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
502         
503         
504