Dependency support fixes:
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9 import os
10
11 class Node(object):
12     BASEFILTERS = {
13         # Map Node attribute to plcapi filter name
14         'hostname' : 'hostname',
15     }
16     
17     TAGFILTERS = {
18         # Map Node attribute to (<tag name>, <plcapi filter expression>)
19         #   There are replacements that are applied with string formatting,
20         #   so '%' has to be escaped as '%%'.
21         'architecture' : ('arch','value'),
22         'operating_system' : ('fcdistro','value'),
23         'pl_distro' : ('pldistro','value'),
24         'min_reliability' : ('reliability%(timeframe)s', ']value'),
25         'max_reliability' : ('reliability%(timeframe)s', '[value'),
26         'min_bandwidth' : ('bw%(timeframe)s', ']value'),
27         'max_bandwidth' : ('bw%(timeframe)s', '[value'),
28     }    
29     
30     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
31     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
32     
33     def __init__(self, api=None):
34         if not api:
35             api = plcapi.PLCAPI()
36         self._api = api
37         
38         # Attributes
39         self.hostname = None
40         self.architecture = None
41         self.operating_system = None
42         self.pl_distro = None
43         self.site = None
44         self.emulation = None
45         self.min_reliability = None
46         self.max_reliability = None
47         self.min_bandwidth = None
48         self.max_bandwidth = None
49         self.min_num_external_ifaces = None
50         self.max_num_external_ifaces = None
51         self.timeframe = 'm'
52         
53         # Applications add requirements to connected nodes
54         self.required_packages = set()
55         
56         # Testbed-derived attributes
57         self.slicename = None
58         self.ident_path = None
59         self.home_path = None
60         
61         # Those are filled when an actual node is allocated
62         self._node_id = None
63     
64     def build_filters(self, target_filters, filter_map):
65         for attr, tag in filter_map.iteritems():
66             value = getattr(self, attr, None)
67             if value is not None:
68                 target_filters[tag] = value
69         return target_filters
70     
71     @property
72     def applicable_filters(self):
73         has = lambda att : getattr(self,att,None) is not None
74         return (
75             filter(has, self.BASEFILTERS.iterkeys())
76             + filter(has, self.TAGFILTERS.iterkeys())
77         )
78     
79     def find_candidates(self, filter_slice_id=None):
80         fields = ('node_id',)
81         replacements = {'timeframe':self.timeframe}
82         
83         # get initial candidates (no tag filters)
84         basefilters = self.build_filters({}, self.BASEFILTERS)
85         if filter_slice_id:
86             basefilters['|slice_ids'] = (filter_slice_id,)
87         
88         # keyword-only "pseudofilters"
89         extra = {}
90         if self.site:
91             extra['peer'] = self.site
92             
93         candidates = set(map(operator.itemgetter('node_id'), 
94             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
95         
96         # filter by tag, one tag at a time
97         applicable = self.applicable_filters
98         for tagfilter in self.TAGFILTERS.iteritems():
99             attr, (tagname, expr) = tagfilter
100             
101             # don't bother if there's no filter defined
102             if attr in applicable:
103                 tagfilter = basefilters.copy()
104                 tagfilter['tagname'] = tagname % replacements
105                 tagfilter[expr % replacements] = getattr(self,attr)
106                 tagfilter['node_id'] = list(candidates)
107                 
108                 candidates &= set(map(operator.itemgetter('node_id'),
109                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
110         
111         # filter by iface count
112         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
113             # fetch interfaces for all, in one go
114             filters = basefilters.copy()
115             filters['node_id'] = list(candidates)
116             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
117                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
118             
119             # filter candidates by interface count
120             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
121                 predicate = ( lambda node_id : 
122                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
123             elif self.min_num_external_ifaces is not None:
124                 predicate = ( lambda node_id : 
125                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
126             else:
127                 predicate = ( lambda node_id : 
128                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
129             
130             candidates = set(filter(predicate, candidates))
131             
132         return candidates
133
134     def assign_node_id(self, node_id):
135         self._node_id = node_id
136         self.fetch_node_info()
137     
138     def fetch_node_info(self):
139         info = self._api.GetNodes(self._node_id)
140         tags = dict( (t['tagname'],t['value'])
141                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
142
143         self.min_num_external_ifaces = None
144         self.max_num_external_ifaces = None
145         self.timeframe = 'm'
146         
147         replacements = {'timeframe':self.timeframe}
148         for attr, tag in self.BASEFILTERS.iteritems():
149             if tag in info:
150                 value = info[tag]
151                 setattr(self, attr, value)
152         for attr, (tag,_) in self.TAGFILTERS.iteritems():
153             tag = tag % replacements
154             if tag in tags:
155                 value = tags[tag]
156                 setattr(self, attr, value)
157         
158         if 'peer_id' in info:
159             self.site = self._api.peer_map[info['peer_id']]
160         
161         if 'interface_ids' in info:
162             self.min_num_external_ifaces = \
163             self.max_num_external_ifaces = len(info['interface_ids'])
164
165     def validate(self):
166         if self.home_path is None:
167             raise AssertionError, "Misconfigured node: missing home path"
168         if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
169             raise AssertionError, "Misconfigured node: missing slice SSH key"
170         if self.slicename is None:
171             raise AssertionError, "Misconfigured node: unspecified slice"
172
173     def install_dependencies(self):
174         if self.required_packages:
175             # TODO: make dependant on the experiment somehow...
176             pidfile = self.DEPENDS_PIDFILE
177             logfile = self.DEPENDS_LOGFILE
178             
179             # Start process in a "daemonized" way, using nohup and heavy
180             # stdin/out redirection to avoid connection issues
181             (out,err),proc = rspawn.remote_spawn(
182                 "yum -y install %(packages)s" % {
183                     'packages' : ' '.join(self.required_packages),
184                 },
185                 pidfile = pidfile,
186                 stdout = logfile,
187                 stderr = rspawn.STDOUT,
188                 
189                 host = self.hostname,
190                 port = None,
191                 user = self.slicename,
192                 agent = None,
193                 ident_key = self.ident_path,
194                 sudo = True
195                 )
196             
197             if proc.wait():
198                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
199     
200     def wait_dependencies(self, pidprobe=1, probe=10, pidmax=10):
201         if self.required_packages:
202             pidfile = self.DEPENDS_PIDFILE
203             
204             # get PID
205             pid = ppid = None
206             for probenum in xrange(pidmax):
207                 pidtuple = rspawn.remote_check_pid(
208                     pidfile = pidfile,
209                     host = self.hostname,
210                     port = None,
211                     user = self.slicename,
212                     agent = None,
213                     ident_key = self.ident_path
214                     )
215                 if pidtuple:
216                     pid, ppid = pidtuple
217                     break
218                 else:
219                     time.sleep(pidprobe)
220             else:
221                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
222         
223             # wait for it to finish
224             while rspawn.RUNNING is rspawn.remote_status(
225                     pid, ppid,
226                     host = self.hostname,
227                     port = None,
228                     user = self.slicename,
229                     agent = None,
230                     ident_key = self.ident_path
231                     ):
232                 time.sleep(probe)
233         
234
235