Make integration tests also use the new environment variables
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10 import collections
11
12 from nepi.util import server
13
14 class Node(object):
15     BASEFILTERS = {
16         # Map Node attribute to plcapi filter name
17         'hostname' : 'hostname',
18     }
19     
20     TAGFILTERS = {
21         # Map Node attribute to (<tag name>, <plcapi filter expression>)
22         #   There are replacements that are applied with string formatting,
23         #   so '%' has to be escaped as '%%'.
24         'architecture' : ('arch','value'),
25         'operating_system' : ('fcdistro','value'),
26         'pl_distro' : ('pldistro','value'),
27         'min_reliability' : ('reliability%(timeframe)s', ']value'),
28         'max_reliability' : ('reliability%(timeframe)s', '[value'),
29         'min_bandwidth' : ('bw%(timeframe)s', ']value'),
30         'max_bandwidth' : ('bw%(timeframe)s', '[value'),
31     }    
32     
33     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
34     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
35     
36     def __init__(self, api=None):
37         if not api:
38             api = plcapi.PLCAPI()
39         self._api = api
40         
41         # Attributes
42         self.hostname = None
43         self.architecture = None
44         self.operating_system = None
45         self.pl_distro = None
46         self.site = None
47         self.emulation = None
48         self.min_reliability = None
49         self.max_reliability = None
50         self.min_bandwidth = None
51         self.max_bandwidth = None
52         self.min_num_external_ifaces = None
53         self.max_num_external_ifaces = None
54         self.timeframe = 'm'
55         
56         # Applications and routes add requirements to connected nodes
57         self.required_packages = set()
58         self.required_vsys = set()
59         self.pythonpath = []
60         
61         # Testbed-derived attributes
62         self.slicename = None
63         self.ident_path = None
64         self.server_key = None
65         self.home_path = None
66         
67         # Those are filled when an actual node is allocated
68         self._node_id = None
69     
70     def build_filters(self, target_filters, filter_map):
71         for attr, tag in filter_map.iteritems():
72             value = getattr(self, attr, None)
73             if value is not None:
74                 target_filters[tag] = value
75         return target_filters
76     
77     @property
78     def applicable_filters(self):
79         has = lambda att : getattr(self,att,None) is not None
80         return (
81             filter(has, self.BASEFILTERS.iterkeys())
82             + filter(has, self.TAGFILTERS.iterkeys())
83         )
84     
85     def find_candidates(self, filter_slice_id=None):
86         fields = ('node_id',)
87         replacements = {'timeframe':self.timeframe}
88         
89         # get initial candidates (no tag filters)
90         basefilters = self.build_filters({}, self.BASEFILTERS)
91         if filter_slice_id:
92             basefilters['|slice_ids'] = (filter_slice_id,)
93         
94         # keyword-only "pseudofilters"
95         extra = {}
96         if self.site:
97             extra['peer'] = self.site
98             
99         candidates = set(map(operator.itemgetter('node_id'), 
100             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
101         
102         # filter by tag, one tag at a time
103         applicable = self.applicable_filters
104         for tagfilter in self.TAGFILTERS.iteritems():
105             attr, (tagname, expr) = tagfilter
106             
107             # don't bother if there's no filter defined
108             if attr in applicable:
109                 tagfilter = basefilters.copy()
110                 tagfilter['tagname'] = tagname % replacements
111                 tagfilter[expr % replacements] = getattr(self,attr)
112                 tagfilter['node_id'] = list(candidates)
113                 
114                 candidates &= set(map(operator.itemgetter('node_id'),
115                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
116         
117         # filter by vsys tags - special case since it doesn't follow
118         # the usual semantics
119         if self.required_vsys:
120             newcandidates = collections.defaultdict(set)
121             
122             vsys_tags = self._api.GetNodeTags(
123                 tagname='vsys', 
124                 node_id = list(candidates), 
125                 fields = ['node_id','value'])
126             
127             vsys_tags = map(
128                 operator.itemgetter(['node_id','value']),
129                 vsys_tags)
130             
131             required_vsys = self.required_vsys
132             for node_id, value in vsys_tags:
133                 if value in required_vsys:
134                     newcandidates[value].add(node_id)
135             
136             # take only those that have all the required vsys tags
137             newcandidates = reduce(
138                 lambda accum, new : accum & new,
139                 newcandidates.itervalues(),
140                 candidates)
141         
142         # filter by iface count
143         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
144             # fetch interfaces for all, in one go
145             filters = basefilters.copy()
146             filters['node_id'] = list(candidates)
147             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
148                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
149             
150             # filter candidates by interface count
151             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
152                 predicate = ( lambda node_id : 
153                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
154             elif self.min_num_external_ifaces is not None:
155                 predicate = ( lambda node_id : 
156                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
157             else:
158                 predicate = ( lambda node_id : 
159                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
160             
161             candidates = set(filter(predicate, candidates))
162             
163         return candidates
164
165     def assign_node_id(self, node_id):
166         self._node_id = node_id
167         self.fetch_node_info()
168     
169     def fetch_node_info(self):
170         info = self._api.GetNodes(self._node_id)[0]
171         tags = dict( (t['tagname'],t['value'])
172                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
173
174         self.min_num_external_ifaces = None
175         self.max_num_external_ifaces = None
176         self.timeframe = 'm'
177         
178         replacements = {'timeframe':self.timeframe}
179         for attr, tag in self.BASEFILTERS.iteritems():
180             if tag in info:
181                 value = info[tag]
182                 setattr(self, attr, value)
183         for attr, (tag,_) in self.TAGFILTERS.iteritems():
184             tag = tag % replacements
185             if tag in tags:
186                 value = tags[tag]
187                 setattr(self, attr, value)
188         
189         if 'peer_id' in info:
190             self.site = self._api.peer_map[info['peer_id']]
191         
192         if 'interface_ids' in info:
193             self.min_num_external_ifaces = \
194             self.max_num_external_ifaces = len(info['interface_ids'])
195         
196         if 'ssh_rsa_key' in info:
197             self.server_key = info['ssh_rsa_key']
198
199     def validate(self):
200         if self.home_path is None:
201             raise AssertionError, "Misconfigured node: missing home path"
202         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
203             raise AssertionError, "Misconfigured node: missing slice SSH key"
204         if self.slicename is None:
205             raise AssertionError, "Misconfigured node: unspecified slice"
206
207     def install_dependencies(self):
208         if self.required_packages:
209             # TODO: make dependant on the experiment somehow...
210             pidfile = self.DEPENDS_PIDFILE
211             logfile = self.DEPENDS_LOGFILE
212             
213             # Start process in a "daemonized" way, using nohup and heavy
214             # stdin/out redirection to avoid connection issues
215             (out,err),proc = rspawn.remote_spawn(
216                 "yum -y install %(packages)s" % {
217                     'packages' : ' '.join(self.required_packages),
218                 },
219                 pidfile = pidfile,
220                 stdout = logfile,
221                 stderr = rspawn.STDOUT,
222                 
223                 host = self.hostname,
224                 port = None,
225                 user = self.slicename,
226                 agent = None,
227                 ident_key = self.ident_path,
228                 server_key = self.server_key,
229                 sudo = True
230                 )
231             
232             if proc.wait():
233                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
234     
235     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
236         if self.required_packages:
237             pidfile = self.DEPENDS_PIDFILE
238             
239             # get PID
240             pid = ppid = None
241             for probenum in xrange(pidmax):
242                 pidtuple = rspawn.remote_check_pid(
243                     pidfile = pidfile,
244                     host = self.hostname,
245                     port = None,
246                     user = self.slicename,
247                     agent = None,
248                     ident_key = self.ident_path,
249                     server_key = self.server_key
250                     )
251                 if pidtuple:
252                     pid, ppid = pidtuple
253                     break
254                 else:
255                     time.sleep(pidprobe)
256             else:
257                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
258         
259             # wait for it to finish
260             while rspawn.RUNNING is rspawn.remote_status(
261                     pid, ppid,
262                     host = self.hostname,
263                     port = None,
264                     user = self.slicename,
265                     agent = None,
266                     ident_key = self.ident_path,
267                     server_key = self.server_key
268                     ):
269                 time.sleep(probe)
270                 probe = min(probemax, 1.5*probe)
271         
272     def is_alive(self):
273         # Make sure all the paths are created where 
274         # they have to be created for deployment
275         (out,err),proc = server.popen_ssh_command(
276             "echo 'ALIVE'",
277             host = self.hostname,
278             port = None,
279             user = self.slicename,
280             agent = None,
281             ident_key = self.ident_path,
282             server_key = self.server_key
283             )
284         
285         if proc.wait():
286             return False
287         elif not err and out.strip() == 'ALIVE':
288             return True
289         else:
290             return False
291     
292