Ticket #22: complitid
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11 import cStringIO
12 import resourcealloc
13
14 from nepi.util import server
15
16 class Node(object):
17     BASEFILTERS = {
18         # Map Node attribute to plcapi filter name
19         'hostname' : 'hostname',
20     }
21     
22     TAGFILTERS = {
23         # Map Node attribute to (<tag name>, <plcapi filter expression>)
24         #   There are replacements that are applied with string formatting,
25         #   so '%' has to be escaped as '%%'.
26         'architecture' : ('arch','value'),
27         'operating_system' : ('fcdistro','value'),
28         'pl_distro' : ('pldistro','value'),
29         'min_reliability' : ('reliability%(timeframe)s', ']value'),
30         'max_reliability' : ('reliability%(timeframe)s', '[value'),
31         'min_bandwidth' : ('bw%(timeframe)s', ']value'),
32         'max_bandwidth' : ('bw%(timeframe)s', '[value'),
33     }    
34     
35     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
36     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
37     
38     def __init__(self, api=None):
39         if not api:
40             api = plcapi.PLCAPI()
41         self._api = api
42         
43         # Attributes
44         self.hostname = None
45         self.architecture = None
46         self.operating_system = None
47         self.pl_distro = None
48         self.site = None
49         self.emulation = None
50         self.min_reliability = None
51         self.max_reliability = None
52         self.min_bandwidth = None
53         self.max_bandwidth = None
54         self.min_num_external_ifaces = None
55         self.max_num_external_ifaces = None
56         self.timeframe = 'm'
57         
58         # Applications and routes add requirements to connected nodes
59         self.required_packages = set()
60         self.required_vsys = set()
61         self.pythonpath = []
62         self.env = collections.defaultdict(list)
63         
64         # Testbed-derived attributes
65         self.slicename = None
66         self.ident_path = None
67         self.server_key = None
68         self.home_path = None
69         
70         # Those are filled when an actual node is allocated
71         self._node_id = None
72     
73     @property
74     def _nepi_testbed_environment_setup(self):
75         command = cStringIO.StringIO()
76         command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
77             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
78         ))
79         command.write(' ; export PATH=$PATH:%s' % (
80             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
81         ))
82         if self.env:
83             for envkey, envvals in self.env.iteritems():
84                 for envval in envvals:
85                     command.write(' ; export %s=%s' % (envkey, envval))
86         return command.getvalue()
87     
88     def build_filters(self, target_filters, filter_map):
89         for attr, tag in filter_map.iteritems():
90             value = getattr(self, attr, None)
91             if value is not None:
92                 target_filters[tag] = value
93         return target_filters
94     
95     @property
96     def applicable_filters(self):
97         has = lambda att : getattr(self,att,None) is not None
98         return (
99             filter(has, self.BASEFILTERS.iterkeys())
100             + filter(has, self.TAGFILTERS.iterkeys())
101         )
102     
103     def find_candidates(self, filter_slice_id=None):
104         fields = ('node_id',)
105         replacements = {'timeframe':self.timeframe}
106         
107         # get initial candidates (no tag filters)
108         basefilters = self.build_filters({}, self.BASEFILTERS)
109         if filter_slice_id:
110             basefilters['|slice_ids'] = (filter_slice_id,)
111         
112         # only pick healthy nodes
113         basefilters['run_level'] = 'boot'
114         basefilters['boot_state'] = 'boot'
115         
116         # keyword-only "pseudofilters"
117         extra = {}
118         if self.site:
119             extra['peer'] = self.site
120             
121         candidates = set(map(operator.itemgetter('node_id'), 
122             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
123         
124         # filter by tag, one tag at a time
125         applicable = self.applicable_filters
126         for tagfilter in self.TAGFILTERS.iteritems():
127             attr, (tagname, expr) = tagfilter
128             
129             # don't bother if there's no filter defined
130             if attr in applicable:
131                 tagfilter = basefilters.copy()
132                 tagfilter['tagname'] = tagname % replacements
133                 tagfilter[expr % replacements] = getattr(self,attr)
134                 tagfilter['node_id'] = list(candidates)
135                 
136                 candidates &= set(map(operator.itemgetter('node_id'),
137                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
138         
139         # filter by vsys tags - special case since it doesn't follow
140         # the usual semantics
141         if self.required_vsys:
142             newcandidates = collections.defaultdict(set)
143             
144             vsys_tags = self._api.GetNodeTags(
145                 tagname='vsys', 
146                 node_id = list(candidates), 
147                 fields = ['node_id','value'])
148             
149             vsys_tags = map(
150                 operator.itemgetter(['node_id','value']),
151                 vsys_tags)
152             
153             required_vsys = self.required_vsys
154             for node_id, value in vsys_tags:
155                 if value in required_vsys:
156                     newcandidates[value].add(node_id)
157             
158             # take only those that have all the required vsys tags
159             newcandidates = reduce(
160                 lambda accum, new : accum & new,
161                 newcandidates.itervalues(),
162                 candidates)
163         
164         # filter by iface count
165         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
166             # fetch interfaces for all, in one go
167             filters = basefilters.copy()
168             filters['node_id'] = list(candidates)
169             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
170                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
171             
172             # filter candidates by interface count
173             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
174                 predicate = ( lambda node_id : 
175                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
176             elif self.min_num_external_ifaces is not None:
177                 predicate = ( lambda node_id : 
178                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
179             else:
180                 predicate = ( lambda node_id : 
181                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
182             
183             candidates = set(filter(predicate, candidates))
184             
185         return candidates
186
187     def assign_node_id(self, node_id):
188         self._node_id = node_id
189         self.fetch_node_info()
190     
191     def fetch_node_info(self):
192         info = self._api.GetNodes(self._node_id)[0]
193         tags = dict( (t['tagname'],t['value'])
194                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
195
196         self.min_num_external_ifaces = None
197         self.max_num_external_ifaces = None
198         self.timeframe = 'm'
199         
200         replacements = {'timeframe':self.timeframe}
201         for attr, tag in self.BASEFILTERS.iteritems():
202             if tag in info:
203                 value = info[tag]
204                 setattr(self, attr, value)
205         for attr, (tag,_) in self.TAGFILTERS.iteritems():
206             tag = tag % replacements
207             if tag in tags:
208                 value = tags[tag]
209                 setattr(self, attr, value)
210         
211         if 'peer_id' in info:
212             self.site = self._api.peer_map[info['peer_id']]
213         
214         if 'interface_ids' in info:
215             self.min_num_external_ifaces = \
216             self.max_num_external_ifaces = len(info['interface_ids'])
217         
218         if 'ssh_rsa_key' in info:
219             self.server_key = info['ssh_rsa_key']
220
221     def validate(self):
222         if self.home_path is None:
223             raise AssertionError, "Misconfigured node: missing home path"
224         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
225             raise AssertionError, "Misconfigured node: missing slice SSH key"
226         if self.slicename is None:
227             raise AssertionError, "Misconfigured node: unspecified slice"
228
229     def install_dependencies(self):
230         if self.required_packages:
231             # TODO: make dependant on the experiment somehow...
232             pidfile = self.DEPENDS_PIDFILE
233             logfile = self.DEPENDS_LOGFILE
234             
235             # Start process in a "daemonized" way, using nohup and heavy
236             # stdin/out redirection to avoid connection issues
237             (out,err),proc = rspawn.remote_spawn(
238                 "yum -y install %(packages)s" % {
239                     'packages' : ' '.join(self.required_packages),
240                 },
241                 pidfile = pidfile,
242                 stdout = logfile,
243                 stderr = rspawn.STDOUT,
244                 
245                 host = self.hostname,
246                 port = None,
247                 user = self.slicename,
248                 agent = None,
249                 ident_key = self.ident_path,
250                 server_key = self.server_key,
251                 sudo = True
252                 )
253             
254             if proc.wait():
255                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
256     
257     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
258         if self.required_packages:
259             pidfile = self.DEPENDS_PIDFILE
260             
261             # get PID
262             pid = ppid = None
263             for probenum in xrange(pidmax):
264                 pidtuple = rspawn.remote_check_pid(
265                     pidfile = pidfile,
266                     host = self.hostname,
267                     port = None,
268                     user = self.slicename,
269                     agent = None,
270                     ident_key = self.ident_path,
271                     server_key = self.server_key
272                     )
273                 if pidtuple:
274                     pid, ppid = pidtuple
275                     break
276                 else:
277                     time.sleep(pidprobe)
278             else:
279                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
280         
281             # wait for it to finish
282             while rspawn.RUNNING is rspawn.remote_status(
283                     pid, ppid,
284                     host = self.hostname,
285                     port = None,
286                     user = self.slicename,
287                     agent = None,
288                     ident_key = self.ident_path,
289                     server_key = self.server_key
290                     ):
291                 time.sleep(probe)
292                 probe = min(probemax, 1.5*probe)
293         
294     def is_alive(self):
295         # Make sure all the paths are created where 
296         # they have to be created for deployment
297         (out,err),proc = server.popen_ssh_command(
298             "echo 'ALIVE'",
299             host = self.hostname,
300             port = None,
301             user = self.slicename,
302             agent = None,
303             ident_key = self.ident_path,
304             server_key = self.server_key
305             )
306         
307         if proc.wait():
308             return False
309         elif not err and out.strip() == 'ALIVE':
310             return True
311         else:
312             return False
313     
314
315     def configure_routes(self, routes, devs):
316         """
317         Add the specified routes to the node's routing table
318         """
319         rules = []
320         
321         for route in routes:
322             for dev in devs:
323                 if dev.routes_here(route):
324                     # Schedule rule
325                     dest, prefix, nexthop = route
326                     rules.append(
327                         "add %s%s gw %s %s" % (
328                             dest,
329                             (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
330                             nexthop,
331                             dev.if_name,
332                         )
333                     )
334                     
335                     # Stop checking
336                     break
337             else:
338                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
339                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
340         
341         (out,err),proc = server.popen_ssh_command(
342             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
343                 home = server.shell_escape(self.home_path)),
344             host = self.hostname,
345             port = None,
346             user = self.slicename,
347             agent = None,
348             ident_key = self.ident_path,
349             server_key = self.server_key,
350             stdin = '\n'.join(rules)
351             )
352         
353         if proc.wait() or err:
354             raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)
355         
356         
357