2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
12 # Map Node attribute to plcapi filter name
13 'hostname' : 'hostname',
17 # Map Node attribute to (<tag name>, <plcapi filter expression>)
18 # There are replacements that are applied with string formatting,
19 # so '%' has to be escaped as '%%'.
20 'architecture' : ('arch','value'),
21 'operating_system' : ('fcdistro','value'),
22 'pl_distro' : ('pldistro','value'),
23 'min_reliability' : ('reliability%(timeframe)s', ']value'),
24 'max_reliability' : ('reliability%(timeframe)s', '[value'),
25 'min_bandwidth' : ('bw%(timeframe)s', ']value'),
26 'max_bandwidth' : ('bw%(timeframe)s', '[value'),
29 def __init__(self, api=None):
36 self.architecture = None
37 self.operating_system = None
41 self.min_reliability = None
42 self.max_reliability = None
43 self.min_bandwidth = None
44 self.max_bandwidth = None
45 self.min_num_external_ifaces = None
46 self.max_num_external_ifaces = None
49 # Applications add requirements to connected nodes
50 self.required_packages = set()
52 # Testbed-derived attributes
54 self.ident_path = None
56 # Those are filled when an actual node is allocated
59 def build_filters(self, target_filters, filter_map):
60 for attr, tag in filter_map.iteritems():
61 value = getattr(self, attr, None)
63 target_filters[tag] = value
67 def applicable_filters(self):
68 has = lambda att : getattr(self,att,None) is not None
70 filter(has, self.BASEFILTERS.iterkeys())
71 + filter(has, self.TAGFILTERS.iterkeys())
74 def find_candidates(self, filter_slice_id=None):
76 replacements = {'timeframe':self.timeframe}
78 # get initial candidates (no tag filters)
79 basefilters = self.build_filters({}, self.BASEFILTERS)
81 basefilters['|slice_ids'] = (filter_slice_id,)
83 # keyword-only "pseudofilters"
86 extra['peer'] = self.site
88 candidates = set(map(operator.itemgetter('node_id'),
89 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
91 # filter by tag, one tag at a time
92 applicable = self.applicable_filters
93 for tagfilter in self.TAGFILTERS.iteritems():
94 attr, (tagname, expr) = tagfilter
96 # don't bother if there's no filter defined
97 if attr in applicable:
98 tagfilter = basefilters.copy()
99 tagfilter['tagname'] = tagname % replacements
100 tagfilter[expr % replacements] = getattr(self,attr)
101 tagfilter['node_id'] = list(candidates)
103 candidates &= set(map(operator.itemgetter('node_id'),
104 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
106 # filter by iface count
107 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
108 # fetch interfaces for all, in one go
109 filters = basefilters.copy()
110 filters['node_id'] = list(candidates)
111 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
112 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
114 # filter candidates by interface count
115 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
116 predicate = ( lambda node_id :
117 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
118 elif self.min_num_external_ifaces is not None:
119 predicate = ( lambda node_id :
120 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
122 predicate = ( lambda node_id :
123 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
125 candidates = set(filter(predicate, candidates))
129 def assign_node_id(self, node_id):
130 self._node_id = node_id
131 self.fetch_node_info()
133 def fetch_node_info(self):
134 info = self._api.GetNodes(self._node_id)
135 tags = dict( (t['tagname'],t['value'])
136 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
138 self.min_num_external_ifaces = None
139 self.max_num_external_ifaces = None
142 replacements = {'timeframe':self.timeframe}
143 for attr, tag in self.BASEFILTERS.iteritems():
146 setattr(self, attr, value)
147 for attr, (tag,_) in self.TAGFILTERS.iteritems():
148 tag = tag % replacements
151 setattr(self, attr, value)
153 if 'peer_id' in info:
154 self.site = self._api.peer_map[info['peer_id']]
156 if 'interface_ids' in info:
157 self.min_num_external_ifaces = \
158 self.max_num_external_ifaces = len(info['interface_ids'])
163 def install_dependencies(self):
164 if self.required_packages:
165 # TODO: make dependant on the experiment somehow...
166 pidfile = '/tmp/nepi-depends.pid'
167 logfile = '/tmp/nepi-depends.log'
169 # Start process in a "daemonized" way, using nohup and heavy
170 # stdin/out redirection to avoid connection issues
171 (out,err),proc = rspawn.remote_spawn(
172 "yum -y install %(packages)s" % {
173 'packages' : ' '.join(self.required_packages),
177 stderr = rspawn.STDOUT,
179 host = self.hostname,
181 user = self.slicename,
183 ident_key = self.ident_path,
188 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
190 def wait_dependencies(self, pidprobe=1, probe=10, pidmax=10):
191 if self.required_packages:
194 for probenum in xrange(pidmax):
195 pidtuple = rspawn.remote_check_pid(
197 host = self.hostname,
199 user = self.slicename,
201 ident_key = self.ident_path
209 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
211 # wait for it to finish
212 while rspawn.RUNNING is rspawn.remote_status(
214 host = self.hostname,
216 user = self.slicename,
218 ident_key = self.ident_path