2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
14 from nepi.util import server
18 # Map Node attribute to plcapi filter name
19 'hostname' : 'hostname',
23 # Map Node attribute to (<tag name>, <plcapi filter expression>)
24 # There are replacements that are applied with string formatting,
25 # so '%' has to be escaped as '%%'.
26 'architecture' : ('arch','value'),
27 'operatingSystem' : ('fcdistro','value'),
28 'pl_distro' : ('pldistro','value'),
29 'minReliability' : ('reliability%(timeframe)s', ']value'),
30 'maxReliability' : ('reliability%(timeframe)s', '[value'),
31 'minBandwidth' : ('bw%(timeframe)s', ']value'),
32 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
35 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
36 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
38 def __init__(self, api=None):
45 self.architecture = None
46 self.operatingSystem = None
50 self.minReliability = None
51 self.maxReliability = None
52 self.minBandwidth = None
53 self.maxBandwidth = None
54 self.min_num_external_ifaces = None
55 self.max_num_external_ifaces = None
58 # Applications and routes add requirements to connected nodes
59 self.required_packages = set()
60 self.required_vsys = set()
62 self.env = collections.defaultdict(list)
64 # Testbed-derived attributes
66 self.ident_path = None
67 self.server_key = None
70 # Those are filled when an actual node is allocated
74 def _nepi_testbed_environment_setup(self):
75 command = cStringIO.StringIO()
76 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
77 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
79 command.write(' ; export PATH=$PATH:%s' % (
80 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
83 for envkey, envvals in self.env.iteritems():
84 for envval in envvals:
85 command.write(' ; export %s=%s' % (envkey, envval))
86 return command.getvalue()
88 def build_filters(self, target_filters, filter_map):
89 for attr, tag in filter_map.iteritems():
90 value = getattr(self, attr, None)
92 target_filters[tag] = value
96 def applicable_filters(self):
97 has = lambda att : getattr(self,att,None) is not None
99 filter(has, self.BASEFILTERS.iterkeys())
100 + filter(has, self.TAGFILTERS.iterkeys())
103 def find_candidates(self, filter_slice_id=None):
104 fields = ('node_id',)
105 replacements = {'timeframe':self.timeframe}
107 # get initial candidates (no tag filters)
108 basefilters = self.build_filters({}, self.BASEFILTERS)
109 rootfilters = basefilters.copy()
111 basefilters['|slice_ids'] = (filter_slice_id,)
113 # only pick healthy nodes
114 basefilters['run_level'] = 'boot'
115 basefilters['boot_state'] = 'boot'
117 # keyword-only "pseudofilters"
120 extra['peer'] = self.site
122 candidates = set(map(operator.itemgetter('node_id'),
123 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
125 # filter by tag, one tag at a time
126 applicable = self.applicable_filters
127 for tagfilter in self.TAGFILTERS.iteritems():
128 attr, (tagname, expr) = tagfilter
130 # don't bother if there's no filter defined
131 if attr in applicable:
132 tagfilter = rootfilters.copy()
133 tagfilter['tagname'] = tagname % replacements
134 tagfilter[expr % replacements] = getattr(self,attr)
135 tagfilter['node_id'] = list(candidates)
137 candidates &= set(map(operator.itemgetter('node_id'),
138 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
140 # filter by vsys tags - special case since it doesn't follow
141 # the usual semantics
142 if self.required_vsys:
143 newcandidates = collections.defaultdict(set)
145 vsys_tags = self._api.GetNodeTags(
147 node_id = list(candidates),
148 fields = ['node_id','value'])
151 operator.itemgetter(['node_id','value']),
154 required_vsys = self.required_vsys
155 for node_id, value in vsys_tags:
156 if value in required_vsys:
157 newcandidates[value].add(node_id)
159 # take only those that have all the required vsys tags
160 newcandidates = reduce(
161 lambda accum, new : accum & new,
162 newcandidates.itervalues(),
165 # filter by iface count
166 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
167 # fetch interfaces for all, in one go
168 filters = basefilters.copy()
169 filters['node_id'] = list(candidates)
170 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
171 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
173 # filter candidates by interface count
174 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
175 predicate = ( lambda node_id :
176 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
177 elif self.min_num_external_ifaces is not None:
178 predicate = ( lambda node_id :
179 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
181 predicate = ( lambda node_id :
182 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
184 candidates = set(filter(predicate, candidates))
188 def make_filter_description(self):
190 Makes a human-readable description of filtering conditions
194 # get initial candidates (no tag filters)
195 filters = self.build_filters({}, self.BASEFILTERS)
197 # keyword-only "pseudofilters"
199 filters['peer'] = self.site
201 # filter by tag, one tag at a time
202 applicable = self.applicable_filters
203 for tagfilter in self.TAGFILTERS.iteritems():
204 attr, (tagname, expr) = tagfilter
206 # don't bother if there's no filter defined
207 if attr in applicable:
208 filters[attr] = getattr(self,attr)
210 # filter by vsys tags - special case since it doesn't follow
211 # the usual semantics
212 if self.required_vsys:
213 filters['vsys'] = ','.join(list(self.required_vsys))
215 # filter by iface count
216 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
217 filters['num_ifaces'] = '-'.join([
218 str(self.min_num_external_ifaces or '0'),
219 str(self.max_num_external_ifaces or 'inf')
222 return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
224 def assign_node_id(self, node_id):
225 self._node_id = node_id
226 self.fetch_node_info()
228 def fetch_node_info(self):
229 info = self._api.GetNodes(self._node_id)[0]
230 tags = dict( (t['tagname'],t['value'])
231 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
233 self.min_num_external_ifaces = None
234 self.max_num_external_ifaces = None
237 replacements = {'timeframe':self.timeframe}
238 for attr, tag in self.BASEFILTERS.iteritems():
241 setattr(self, attr, value)
242 for attr, (tag,_) in self.TAGFILTERS.iteritems():
243 tag = tag % replacements
246 setattr(self, attr, value)
248 if 'peer_id' in info:
249 self.site = self._api.peer_map[info['peer_id']]
251 if 'interface_ids' in info:
252 self.min_num_external_ifaces = \
253 self.max_num_external_ifaces = len(info['interface_ids'])
255 if 'ssh_rsa_key' in info:
256 self.server_key = info['ssh_rsa_key']
259 if self.home_path is None:
260 raise AssertionError, "Misconfigured node: missing home path"
261 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
262 raise AssertionError, "Misconfigured node: missing slice SSH key"
263 if self.slicename is None:
264 raise AssertionError, "Misconfigured node: unspecified slice"
266 def install_dependencies(self):
267 if self.required_packages:
268 # TODO: make dependant on the experiment somehow...
269 pidfile = self.DEPENDS_PIDFILE
270 logfile = self.DEPENDS_LOGFILE
272 # Start process in a "daemonized" way, using nohup and heavy
273 # stdin/out redirection to avoid connection issues
274 (out,err),proc = rspawn.remote_spawn(
275 "yum -y install %(packages)s" % {
276 'packages' : ' '.join(self.required_packages),
280 stderr = rspawn.STDOUT,
282 host = self.hostname,
284 user = self.slicename,
286 ident_key = self.ident_path,
287 server_key = self.server_key,
292 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
294 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
295 if self.required_packages:
296 pidfile = self.DEPENDS_PIDFILE
300 for probenum in xrange(pidmax):
301 pidtuple = rspawn.remote_check_pid(
303 host = self.hostname,
305 user = self.slicename,
307 ident_key = self.ident_path,
308 server_key = self.server_key
316 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
318 # wait for it to finish
319 while rspawn.RUNNING is rspawn.remote_status(
321 host = self.hostname,
323 user = self.slicename,
325 ident_key = self.ident_path,
326 server_key = self.server_key
329 probe = min(probemax, 1.5*probe)
332 # Make sure all the paths are created where
333 # they have to be created for deployment
334 (out,err),proc = server.popen_ssh_command(
336 host = self.hostname,
338 user = self.slicename,
340 ident_key = self.ident_path,
341 server_key = self.server_key
346 elif not err and out.strip() == 'ALIVE':
352 def configure_routes(self, routes, devs):
354 Add the specified routes to the node's routing table
360 if dev.routes_here(route):
362 dest, prefix, nexthop = route
364 "add %s%s gw %s %s" % (
366 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
375 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
376 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
378 (out,err),proc = server.popen_ssh_command(
379 "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
380 home = server.shell_escape(self.home_path)),
381 host = self.hostname,
383 user = self.slicename,
385 ident_key = self.ident_path,
386 server_key = self.server_key,
387 stdin = '\n'.join(rules)
390 if proc.wait() or err:
391 raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err)