2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
13 from nepi.util import server
17 # Map Node attribute to plcapi filter name
18 'hostname' : 'hostname',
22 # Map Node attribute to (<tag name>, <plcapi filter expression>)
23 # There are replacements that are applied with string formatting,
24 # so '%' has to be escaped as '%%'.
25 'architecture' : ('arch','value'),
26 'operating_system' : ('fcdistro','value'),
27 'pl_distro' : ('pldistro','value'),
28 'min_reliability' : ('reliability%(timeframe)s', ']value'),
29 'max_reliability' : ('reliability%(timeframe)s', '[value'),
30 'min_bandwidth' : ('bw%(timeframe)s', ']value'),
31 'max_bandwidth' : ('bw%(timeframe)s', '[value'),
34 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
35 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
37 def __init__(self, api=None):
44 self.architecture = None
45 self.operating_system = None
49 self.min_reliability = None
50 self.max_reliability = None
51 self.min_bandwidth = None
52 self.max_bandwidth = None
53 self.min_num_external_ifaces = None
54 self.max_num_external_ifaces = None
57 # Applications and routes add requirements to connected nodes
58 self.required_packages = set()
59 self.required_vsys = set()
61 self.env = collections.defaultdict(list)
63 # Testbed-derived attributes
65 self.ident_path = None
66 self.server_key = None
69 # Those are filled when an actual node is allocated
73 def _nepi_testbed_environment_setup(self):
74 command = cStringIO.StringIO()
75 command.write('export PYTHONPATH=$PYTHONPATH:%s' % (
76 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
78 command.write(' ; export PATH=$PATH:%s' % (
79 ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath])
82 for envkey, envvals in self.env.iteritems():
83 for envval in envvals:
84 command.write(' ; export %s=%s' % (envkey, envval))
85 return command.getvalue()
87 def build_filters(self, target_filters, filter_map):
88 for attr, tag in filter_map.iteritems():
89 value = getattr(self, attr, None)
91 target_filters[tag] = value
95 def applicable_filters(self):
96 has = lambda att : getattr(self,att,None) is not None
98 filter(has, self.BASEFILTERS.iterkeys())
99 + filter(has, self.TAGFILTERS.iterkeys())
102 def find_candidates(self, filter_slice_id=None):
103 fields = ('node_id',)
104 replacements = {'timeframe':self.timeframe}
106 # get initial candidates (no tag filters)
107 basefilters = self.build_filters({}, self.BASEFILTERS)
109 basefilters['|slice_ids'] = (filter_slice_id,)
111 # keyword-only "pseudofilters"
114 extra['peer'] = self.site
116 candidates = set(map(operator.itemgetter('node_id'),
117 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
119 # filter by tag, one tag at a time
120 applicable = self.applicable_filters
121 for tagfilter in self.TAGFILTERS.iteritems():
122 attr, (tagname, expr) = tagfilter
124 # don't bother if there's no filter defined
125 if attr in applicable:
126 tagfilter = basefilters.copy()
127 tagfilter['tagname'] = tagname % replacements
128 tagfilter[expr % replacements] = getattr(self,attr)
129 tagfilter['node_id'] = list(candidates)
131 candidates &= set(map(operator.itemgetter('node_id'),
132 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
134 # filter by vsys tags - special case since it doesn't follow
135 # the usual semantics
136 if self.required_vsys:
137 newcandidates = collections.defaultdict(set)
139 vsys_tags = self._api.GetNodeTags(
141 node_id = list(candidates),
142 fields = ['node_id','value'])
145 operator.itemgetter(['node_id','value']),
148 required_vsys = self.required_vsys
149 for node_id, value in vsys_tags:
150 if value in required_vsys:
151 newcandidates[value].add(node_id)
153 # take only those that have all the required vsys tags
154 newcandidates = reduce(
155 lambda accum, new : accum & new,
156 newcandidates.itervalues(),
159 # filter by iface count
160 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
161 # fetch interfaces for all, in one go
162 filters = basefilters.copy()
163 filters['node_id'] = list(candidates)
164 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
165 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
167 # filter candidates by interface count
168 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
169 predicate = ( lambda node_id :
170 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
171 elif self.min_num_external_ifaces is not None:
172 predicate = ( lambda node_id :
173 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
175 predicate = ( lambda node_id :
176 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
178 candidates = set(filter(predicate, candidates))
182 def assign_node_id(self, node_id):
183 self._node_id = node_id
184 self.fetch_node_info()
186 def fetch_node_info(self):
187 info = self._api.GetNodes(self._node_id)[0]
188 tags = dict( (t['tagname'],t['value'])
189 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
191 self.min_num_external_ifaces = None
192 self.max_num_external_ifaces = None
195 replacements = {'timeframe':self.timeframe}
196 for attr, tag in self.BASEFILTERS.iteritems():
199 setattr(self, attr, value)
200 for attr, (tag,_) in self.TAGFILTERS.iteritems():
201 tag = tag % replacements
204 setattr(self, attr, value)
206 if 'peer_id' in info:
207 self.site = self._api.peer_map[info['peer_id']]
209 if 'interface_ids' in info:
210 self.min_num_external_ifaces = \
211 self.max_num_external_ifaces = len(info['interface_ids'])
213 if 'ssh_rsa_key' in info:
214 self.server_key = info['ssh_rsa_key']
217 if self.home_path is None:
218 raise AssertionError, "Misconfigured node: missing home path"
219 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
220 raise AssertionError, "Misconfigured node: missing slice SSH key"
221 if self.slicename is None:
222 raise AssertionError, "Misconfigured node: unspecified slice"
224 def install_dependencies(self):
225 if self.required_packages:
226 # TODO: make dependant on the experiment somehow...
227 pidfile = self.DEPENDS_PIDFILE
228 logfile = self.DEPENDS_LOGFILE
230 # Start process in a "daemonized" way, using nohup and heavy
231 # stdin/out redirection to avoid connection issues
232 (out,err),proc = rspawn.remote_spawn(
233 "yum -y install %(packages)s" % {
234 'packages' : ' '.join(self.required_packages),
238 stderr = rspawn.STDOUT,
240 host = self.hostname,
242 user = self.slicename,
244 ident_key = self.ident_path,
245 server_key = self.server_key,
250 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
252 def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
253 if self.required_packages:
254 pidfile = self.DEPENDS_PIDFILE
258 for probenum in xrange(pidmax):
259 pidtuple = rspawn.remote_check_pid(
261 host = self.hostname,
263 user = self.slicename,
265 ident_key = self.ident_path,
266 server_key = self.server_key
274 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
276 # wait for it to finish
277 while rspawn.RUNNING is rspawn.remote_status(
279 host = self.hostname,
281 user = self.slicename,
283 ident_key = self.ident_path,
284 server_key = self.server_key
287 probe = min(probemax, 1.5*probe)
290 # Make sure all the paths are created where
291 # they have to be created for deployment
292 (out,err),proc = server.popen_ssh_command(
294 host = self.hostname,
296 user = self.slicename,
298 ident_key = self.ident_path,
299 server_key = self.server_key
304 elif not err and out.strip() == 'ALIVE':
310 def configure_routes(self, routes, devs):
312 Add the specified routes to the node's routing table
318 if dev.routes_here(route):
320 dest, prefix, nexthop = route
322 "add %s%s gw %s %s" % (
324 (("/%d" % (prefix,)) if prefix and prefix != 32 else ""),
333 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
334 "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
336 (out,err),proc = server.popen_ssh_command(
337 "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in'" % dict(
338 home = server.shell_escape(self.home_path)),
339 host = self.hostname,
341 user = self.slicename,
343 ident_key = self.ident_path,
344 server_key = self.server_key,
345 stdin = '\n'.join(rules)
348 if proc.wait() or err:
349 raise RuntimeError, "Could not set routes: %s%s" % (out,err)