* Some refactoring, modularizing daemonized remote spawning with log capture and...
[nepi.git] / src / nepi / testbeds / planetlab / node.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 import plcapi
6 import operator
7 import rspawn
8 import time
9
10 class Node(object):
11     BASEFILTERS = {
12         # Map Node attribute to plcapi filter name
13         'hostname' : 'hostname',
14     }
15     
16     TAGFILTERS = {
17         # Map Node attribute to (<tag name>, <plcapi filter expression>)
18         #   There are replacements that are applied with string formatting,
19         #   so '%' has to be escaped as '%%'.
20         'architecture' : ('arch','value'),
21         'operating_system' : ('fcdistro','value'),
22         'pl_distro' : ('pldistro','value'),
23         'min_reliability' : ('reliability%(timeframe)s', ']value'),
24         'max_reliability' : ('reliability%(timeframe)s', '[value'),
25         'min_bandwidth' : ('bw%(timeframe)s', ']value'),
26         'max_bandwidth' : ('bw%(timeframe)s', '[value'),
27     }    
28     
29     def __init__(self, api=None):
30         if not api:
31             api = plcapi.PLCAPI()
32         self._api = api
33         
34         # Attributes
35         self.hostname = None
36         self.architecture = None
37         self.operating_system = None
38         self.pl_distro = None
39         self.site = None
40         self.emulation = None
41         self.min_reliability = None
42         self.max_reliability = None
43         self.min_bandwidth = None
44         self.max_bandwidth = None
45         self.min_num_external_ifaces = None
46         self.max_num_external_ifaces = None
47         self.timeframe = 'm'
48         
49         # Applications add requirements to connected nodes
50         self.required_packages = set()
51         
52         # Testbed-derived attributes
53         self.slicename = None
54         self.ident_path = None
55         
56         # Those are filled when an actual node is allocated
57         self._node_id = None
58     
59     def build_filters(self, target_filters, filter_map):
60         for attr, tag in filter_map.iteritems():
61             value = getattr(self, attr, None)
62             if value is not None:
63                 target_filters[tag] = value
64         return target_filters
65     
66     @property
67     def applicable_filters(self):
68         has = lambda att : getattr(self,att,None) is not None
69         return (
70             filter(has, self.BASEFILTERS.iterkeys())
71             + filter(has, self.TAGFILTERS.iterkeys())
72         )
73     
74     def find_candidates(self, filter_slice_id=None):
75         fields = ('node_id',)
76         replacements = {'timeframe':self.timeframe}
77         
78         # get initial candidates (no tag filters)
79         basefilters = self.build_filters({}, self.BASEFILTERS)
80         if filter_slice_id:
81             basefilters['|slice_ids'] = (filter_slice_id,)
82         
83         # keyword-only "pseudofilters"
84         extra = {}
85         if self.site:
86             extra['peer'] = self.site
87             
88         candidates = set(map(operator.itemgetter('node_id'), 
89             self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
90         
91         # filter by tag, one tag at a time
92         applicable = self.applicable_filters
93         for tagfilter in self.TAGFILTERS.iteritems():
94             attr, (tagname, expr) = tagfilter
95             
96             # don't bother if there's no filter defined
97             if attr in applicable:
98                 tagfilter = basefilters.copy()
99                 tagfilter['tagname'] = tagname % replacements
100                 tagfilter[expr % replacements] = getattr(self,attr)
101                 tagfilter['node_id'] = list(candidates)
102                 
103                 candidates &= set(map(operator.itemgetter('node_id'),
104                     self._api.GetNodeTags(filters=tagfilter, fields=fields)))
105         
106         # filter by iface count
107         if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
108             # fetch interfaces for all, in one go
109             filters = basefilters.copy()
110             filters['node_id'] = list(candidates)
111             ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
112                 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
113             
114             # filter candidates by interface count
115             if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
116                 predicate = ( lambda node_id : 
117                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
118             elif self.min_num_external_ifaces is not None:
119                 predicate = ( lambda node_id : 
120                     self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
121             else:
122                 predicate = ( lambda node_id : 
123                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
124             
125             candidates = set(filter(predicate, candidates))
126             
127         return candidates
128
129     def assign_node_id(self, node_id):
130         self._node_id = node_id
131         self.fetch_node_info()
132     
133     def fetch_node_info(self):
134         info = self._api.GetNodes(self._node_id)
135         tags = dict( (t['tagname'],t['value'])
136                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
137
138         self.min_num_external_ifaces = None
139         self.max_num_external_ifaces = None
140         self.timeframe = 'm'
141         
142         replacements = {'timeframe':self.timeframe}
143         for attr, tag in self.BASEFILTERS.iteritems():
144             if tag in info:
145                 value = info[tag]
146                 setattr(self, attr, value)
147         for attr, (tag,_) in self.TAGFILTERS.iteritems():
148             tag = tag % replacements
149             if tag in tags:
150                 value = tags[tag]
151                 setattr(self, attr, value)
152         
153         if 'peer_id' in info:
154             self.site = self._api.peer_map[info['peer_id']]
155         
156         if 'interface_ids' in info:
157             self.min_num_external_ifaces = \
158             self.max_num_external_ifaces = len(info['interface_ids'])
159
160     def validate(self):
161         pass
162
163     def install_dependencies(self):
164         if self.required_packages:
165             # TODO: make dependant on the experiment somehow...
166             pidfile = '/tmp/nepi-depends.pid'
167             logfile = '/tmp/nepi-depends.log'
168             
169             # Start process in a "daemonized" way, using nohup and heavy
170             # stdin/out redirection to avoid connection issues
171             (out,err),proc = rspawn.remote_spawn(
172                 "yum -y install %(packages)s" % {
173                     'packages' : ' '.join(self.required_packages),
174                 },
175                 pidfile = pidfile,
176                 stdout = logfile,
177                 stderr = rspawn.STDOUT,
178                 
179                 host = self.hostname,
180                 port = None,
181                 user = self.slicename,
182                 agent = None,
183                 ident_key = self.ident_path,
184                 sudo = True
185                 )
186             
187             if proc.wait():
188                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
189     
190     def wait_dependencies(self, pidprobe=1, probe=10, pidmax=10):
191         if self.required_packages:
192             # get PID
193             pid = ppid = None
194             for probenum in xrange(pidmax):
195                 pidtuple = rspawn.remote_check_pid(
196                     pidfile = pidfile,
197                     host = self.hostname,
198                     port = None,
199                     user = self.slicename,
200                     agent = None,
201                     ident_key = self.ident_path
202                     )
203                 if pidtuple:
204                     pid, ppid = pidtuple
205                     break
206                 else:
207                     time.sleep(pidprobe)
208             else:
209                 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
210         
211             # wait for it to finish
212             while rspawn.RUNNING is rspawn.remote_status(
213                     pid, ppid,
214                     host = self.hostname,
215                     port = None,
216                     user = self.slicename,
217                     agent = None,
218                     ident_key = self.ident_path
219                     ):
220                 time.sleep(probe)
221         
222
223