Do check dependency install after the fact.
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13
14 from nepi.util import server
15
16 class Node(object):
17     BASEFILTERS = {
18         # Map Node attribute to plcapi filter name
19         'hostname' : 'hostname',
20     }
21     
22     TAGFILTERS = {
23         # Map Node attribute to (<tag name>, <plcapi filter expression>)
24         #   There are replacements that are applied with string formatting,
25         #   so '%' has to be escaped as '%%'.
26         'architecture' : ('arch','value'),
27         'operatingSystem' : ('fcdistro','value'),
28         'pl_distro' : ('pldistro','value'),
29         'minReliability' : ('reliability%(timeframe)s', ']value'),
30         'maxReliability' : ('reliability%(timeframe)s', '[value'),
31         'minBandwidth' : ('bw%(timeframe)s', ']value'),
32         'maxBandwidth' : ('bw%(timeframe)s', '[value'),
33     }    
34     
35     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
36     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
37     
38     def __init__(self, api=None):
39         if not api:
40             api = plcapi.PLCAPI()
41         self._api = api
42         
43         # Attributes
44         self.hostname = None
45         self.architecture = None
46         self.operatingSystem = None
47         self.pl_distro = None
48         self.site = None
49         self.emulation = None
50         self.minReliability = None
51         self.maxReliability = None
52         self.minBandwidth = None
53         self.maxBandwidth = None
54         self.min_num_external_ifaces = None
55         self.max_num_external_ifaces = None
56         self.timeframe = 'm'
57         
58         # Applications and routes add requirements to connected nodes
59         self.required_packages = set()
60         self.required_vsys = set()
61         self.pythonpath = []
62         self.env = collections.defaultdict(list)
63         
64         # Testbed-derived attributes
65         self.slicename = None
66         self.ident_path = None
67         self.server_key = None
68         self.home_path = None
69         
70         # Those are filled when an actual node is allocated
71         self._node_id = None
72     
73     @property
74     def _nepi_testbed_environment_setup(self):
75         command = cStringIO.StringIO()
76         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
77             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
78         ))
79         command.write(' ; export PATH=$PATH:%s' % (
80             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
81         ))
82         if self.env:
83             for envkey, envvals in self.env.iteritems():
84                 for envval in envvals:
85                     command.write(' ; export %s=%s' % (envkey, envval))
86         return command.getvalue()
87     
88     def build_filters(self, target_filters, filter_map):
89         for attr, tag in filter_map.iteritems():
90             value = getattr(self, attr, None)
91             if value is not None:
92                 target_filters[tag] = value
93         return target_filters
94     
95     @property
96     def applicable_filters(self):
97         has = lambda att : getattr(self,att,None) is not None
98         return (
99             filter(has, self.BASEFILTERS.iterkeys())
100             + filter(has, self.TAGFILTERS.iterkeys())
101         )
102     
103     def find_candidates(self, filter_slice_id=None):
104         fields = ('node_id',)
105         replacements = {'timeframe':self.timeframe}
106         
107         # get initial candidates (no tag filters)
108         basefilters = self.build_filters({}, self.BASEFILTERS)
109         rootfilters = basefilters.copy()
110         if filter_slice_id:
111             basefilters['|slice_ids'] = (filter_slice_id,)
112         
113         # only pick healthy nodes
114         basefilters['run_level'] = 'boot'
115         basefilters['boot_state'] = 'boot'
116         
117         # keyword-only "pseudofilters"
118         extra = {}
119         if self.site:
120             extra['peer'] = self.site
121             
122         candidates = set(map(operator.itemgetter('node_id'), 
123             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
124         
125         # filter by tag, one tag at a time
126         applicable = self.applicable_filters
127         for tagfilter in self.TAGFILTERS.iteritems():
128             attr, (tagname, expr) = tagfilter
129             
130             # don't bother if there's no filter defined
131             if attr in applicable:
132                 tagfilter = rootfilters.copy()
133                 tagfilter['tagname'] = tagname % replacements
134                 tagfilter[expr % replacements] = getattr(self,attr)
135                 tagfilter['node_id'] = list(candidates)
136                 
137                 candidates &= set(map(operator.itemgetter('node_id'),
138                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
139         
140         # filter by vsys tags - special case since it doesn't follow
141         # the usual semantics
142         if self.required_vsys:
143             newcandidates = collections.defaultdict(set)
144             
145             vsys_tags = self._api.GetNodeTags(
146                 tagname='vsys', 
147                 node_id = list(candidates), 
148                 fields = ['node_id','value'])
149             
150             vsys_tags = map(
151                 operator.itemgetter(['node_id','value']),
152                 vsys_tags)
153             
154             required_vsys = self.required_vsys
155             for node_id, value in vsys_tags:
156                 if value in required_vsys:
157                     newcandidates[value].add(node_id)
158             
159             # take only those that have all the required vsys tags
160             newcandidates = reduce(
161                 lambda accum, new : accum & new,
162                 newcandidates.itervalues(),
163                 candidates)
164         
165         # filter by iface count
166         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
167             # fetch interfaces for all, in one go
168             filters = basefilters.copy()
169             filters['node_id'] = list(candidates)
170             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
171                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
172             
173             # filter candidates by interface count
174             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
175                 predicate = ( lambda node_id : 
176                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
177             elif self.min_num_external_ifaces is not None:
178                 predicate = ( lambda node_id : 
179                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
180             else:
181                 predicate = ( lambda node_id : 
182                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
183             
184             candidates = set(filter(predicate, candidates))
185             
186         return candidates
187     
188     def make_filter_description(self):
189         """
190         Makes a human-readable description of filtering conditions
191         for find_candidates.
192         """
193         
194         # get initial candidates (no tag filters)
195         filters = self.build_filters({}, self.BASEFILTERS)
196         
197         # keyword-only "pseudofilters"
198         if self.site:
199             filters['peer'] = self.site
200             
201         # filter by tag, one tag at a time
202         applicable = self.applicable_filters
203         for tagfilter in self.TAGFILTERS.iteritems():
204             attr, (tagname, expr) = tagfilter
205             
206             # don't bother if there's no filter defined
207             if attr in applicable:
208                 filters[attr] = getattr(self,attr)
209         
210         # filter by vsys tags - special case since it doesn't follow
211         # the usual semantics
212         if self.required_vsys:
213             filters['vsys'] = ','.join(list(self.required_vsys))
214         
215         # filter by iface count
216         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
217             filters['num_ifaces'] = '-'.join([
218                 str(self.min_num_external_ifaces or '0'),
219                 str(self.max_num_external_ifaces or 'inf')
220             ])
221             
222         return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
223
224     def assign_node_id(self, node_id):
225         self._node_id = node_id
226         self.fetch_node_info()
227     
228     def fetch_node_info(self):
229         info = self._api.GetNodes(self._node_id)[0]
230         tags = dict( (t['tagname'],t['value'])
231                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
232
233         self.min_num_external_ifaces = None
234         self.max_num_external_ifaces = None
235         self.timeframe = 'm'
236         
237         replacements = {'timeframe':self.timeframe}
238         for attr, tag in self.BASEFILTERS.iteritems():
239             if tag in info:
240                 value = info[tag]
241                 setattr(self, attr, value)
242         for attr, (tag,_) in self.TAGFILTERS.iteritems():
243             tag = tag % replacements
244             if tag in tags:
245                 value = tags[tag]
246                 setattr(self, attr, value)
247         
248         if 'peer_id' in info:
249             self.site = self._api.peer_map[info['peer_id']]
250         
251         if 'interface_ids' in info:
252             self.min_num_external_ifaces = \
253             self.max_num_external_ifaces = len(info['interface_ids'])
254         
255         if 'ssh_rsa_key' in info:
256             self.server_key = info['ssh_rsa_key']
257
258     def validate(self):
259         if self.home_path is None:
260             raise AssertionError, "Misconfigured node: missing home path"
261         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
262             raise AssertionError, "Misconfigured node: missing slice SSH key"
263         if self.slicename is None:
264             raise AssertionError, "Misconfigured node: unspecified slice"
265
266     def install_dependencies(self):
267         if self.required_packages:
268             # TODO: make dependant on the experiment somehow...
269             pidfile = self.DEPENDS_PIDFILE
270             logfile = self.DEPENDS_LOGFILE
271             
272             # Start process in a "daemonized" way, using nohup and heavy
273             # stdin/out redirection to avoid connection issues
274             (out,err),proc = rspawn.remote_spawn(
275                 "( yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
276                     'packages' : ' '.join(self.required_packages),
277                 },
278                 pidfile = pidfile,
279                 stdout = logfile,
280                 stderr = rspawn.STDOUT,
281                 
282                 host = self.hostname,
283                 port = None,
284                 user = self.slicename,
285                 agent = None,
286                 ident_key = self.ident_path,
287                 server_key = self.server_key,
288                 sudo = True
289                 )
290             
291             if proc.wait():
292                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
293     
294     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
295         if self.required_packages:
296             pidfile = self.DEPENDS_PIDFILE
297             
298             # get PID
299             pid = ppid = None
300             for probenum in xrange(pidmax):
301                 pidtuple = rspawn.remote_check_pid(
302                     pidfile = pidfile,
303                     host = self.hostname,
304                     port = None,
305                     user = self.slicename,
306                     agent = None,
307                     ident_key = self.ident_path,
308                     server_key = self.server_key
309                     )
310                 if pidtuple:
311                     pid, ppid = pidtuple
312                     break
313                 else:
314                     time.sleep(pidprobe)
315             else:
316                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
317         
318             # wait for it to finish
319             while rspawn.RUNNING is rspawn.remote_status(
320                     pid, ppid,
321                     host = self.hostname,
322                     port = None,
323                     user = self.slicename,
324                     agent = None,
325                     ident_key = self.ident_path,
326                     server_key = self.server_key
327                     ):
328                 time.sleep(probe)
329                 probe = min(probemax, 1.5*probe)
330             
331             # check results
332             logfile = self.DEPENDS_LOGFILE
333             
334             (out,err),proc = server.popen_ssh_command(
335                 "cat %s" % (server.shell_escape(logfile),),
336                 host = self.hostname,
337                 port = None,
338                 user = self.slicename,
339                 agent = None,
340                 ident_key = self.ident_path,
341                 server_key = self.server_key
342                 )
343             
344             if proc.wait():
345                 raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,)
346             
347             success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS'
348             if not success:
349                 raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,)
350         
351     def is_alive(self):
352         # Make sure all the paths are created where 
353         # they have to be created for deployment
354         (out,err),proc = server.popen_ssh_command(
355             "echo 'ALIVE'",
356             host = self.hostname,
357             port = None,
358             user = self.slicename,
359             agent = None,
360             ident_key = self.ident_path,
361             server_key = self.server_key
362             )
363         
364         if proc.wait():
365             return False
366         elif not err and out.strip() == 'ALIVE':
367             return True
368         else:
369             return False
370     
371
372     def configure_routes(self, routes, devs):
373         """
374         Add the specified routes to the node's routing table
375         """
376         rules = []
377         
378         for route in routes:
379             for dev in devs:
380                 if dev.routes_here(route):
381                     # Schedule rule
382                     dest, prefix, nexthop = route
383                     rules.append(
384                         "add %s%s gw %s %s" % (
385                             dest,
386                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
387                             nexthop,
388                             dev.if_name,
389                         )
390                     )
391                     
392                     # Stop checking
393                     break
394             else:
395                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
396                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
397         
398         (out,err),proc = server.popen_ssh_command(
399             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
400                 home = server.shell_escape(self.home_path)),
401             host = self.hostname,
402             port = None,
403             user = self.slicename,
404             agent = None,
405             ident_key = self.ident_path,
406             server_key = self.server_key,
407             stdin = '\n'.join(rules)
408             )
409         
410         if proc.wait() or err:
411             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
412         
413         
414