2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
12 from nepi.util import server
16 # Map Node attribute to plcapi filter name
17 'hostname' : 'hostname',
21 # Map Node attribute to (<tag name>, <plcapi filter expression>)
22 # There are replacements that are applied with string formatting,
23 # so '%' has to be escaped as '%%'.
24 'architecture' : ('arch','value'),
25 'operating_system' : ('fcdistro','value'),
26 'pl_distro' : ('pldistro','value'),
27 'min_reliability' : ('reliability%(timeframe)s', ']value'),
28 'max_reliability' : ('reliability%(timeframe)s', '[value'),
29 'min_bandwidth' : ('bw%(timeframe)s', ']value'),
30 'max_bandwidth' : ('bw%(timeframe)s', '[value'),
33 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
34 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
36 def __init__(self, api=None):
43 self.architecture = None
44 self.operating_system = None
48 self.min_reliability = None
49 self.max_reliability = None
50 self.min_bandwidth = None
51 self.max_bandwidth = None
52 self.min_num_external_ifaces = None
53 self.max_num_external_ifaces = None
56 # Applications and routes add requirements to connected nodes
57 self.required_packages = set()
58 self.required_vsys = set()
60 # Testbed-derived attributes
62 self.ident_path = None
63 self.server_key = None
66 # Those are filled when an actual node is allocated
69 def build_filters(self, target_filters, filter_map):
70 for attr, tag in filter_map.iteritems():
71 value = getattr(self, attr, None)
73 target_filters[tag] = value
77 def applicable_filters(self):
78 has = lambda att : getattr(self,att,None) is not None
80 filter(has, self.BASEFILTERS.iterkeys())
81 + filter(has, self.TAGFILTERS.iterkeys())
84 def find_candidates(self, filter_slice_id=None):
86 replacements = {'timeframe':self.timeframe}
88 # get initial candidates (no tag filters)
89 basefilters = self.build_filters({}, self.BASEFILTERS)
91 basefilters['|slice_ids'] = (filter_slice_id,)
93 # keyword-only "pseudofilters"
96 extra['peer'] = self.site
98 candidates = set(map(operator.itemgetter('node_id'),
99 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
101 # filter by tag, one tag at a time
102 applicable = self.applicable_filters
103 for tagfilter in self.TAGFILTERS.iteritems():
104 attr, (tagname, expr) = tagfilter
106 # don't bother if there's no filter defined
107 if attr in applicable:
108 tagfilter = basefilters.copy()
109 tagfilter['tagname'] = tagname % replacements
110 tagfilter[expr % replacements] = getattr(self,attr)
111 tagfilter['node_id'] = list(candidates)
113 candidates &= set(map(operator.itemgetter('node_id'),
114 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
116 # filter by vsys tags - special case since it doesn't follow
117 # the usual semantics
118 if self.required_vsys:
119 newcandidates = collections.defaultdict(set)
121 vsys_tags = self._api.GetNodeTags(
123 node_id = list(candidates),
124 fields = ['node_id','value'])
127 operator.itemgetter(['node_id','value']),
130 required_vsys = self.required_vsys
131 for node_id, value in vsys_tags:
132 if value in required_vsys:
133 newcandidates[value].add(node_id)
135 # take only those that have all the required vsys tags
136 newcandidates = reduce(
137 lambda accum, new : accum & new,
138 newcandidates.itervalues(),
141 # filter by iface count
142 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
143 # fetch interfaces for all, in one go
144 filters = basefilters.copy()
145 filters['node_id'] = list(candidates)
146 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
147 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
149 # filter candidates by interface count
150 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
151 predicate = ( lambda node_id :
152 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
153 elif self.min_num_external_ifaces is not None:
154 predicate = ( lambda node_id :
155 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
157 predicate = ( lambda node_id :
158 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
160 candidates = set(filter(predicate, candidates))
164 def assign_node_id(self, node_id):
165 self._node_id = node_id
166 self.fetch_node_info()
168 def fetch_node_info(self):
169 info = self._api.GetNodes(self._node_id)[0]
170 tags = dict( (t['tagname'],t['value'])
171 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
173 self.min_num_external_ifaces = None
174 self.max_num_external_ifaces = None
177 replacements = {'timeframe':self.timeframe}
178 for attr, tag in self.BASEFILTERS.iteritems():
181 setattr(self, attr, value)
182 for attr, (tag,_) in self.TAGFILTERS.iteritems():
183 tag = tag % replacements
186 setattr(self, attr, value)
188 if 'peer_id' in info:
189 self.site = self._api.peer_map[info['peer_id']]
191 if 'interface_ids' in info:
192 self.min_num_external_ifaces = \
193 self.max_num_external_ifaces = len(info['interface_ids'])
195 if 'ssh_rsa_key' in info:
196 self.server_key = info['ssh_rsa_key']
199 if self.home_path is None:
200 raise AssertionError, "Misconfigured node: missing home path"
201 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
202 raise AssertionError, "Misconfigured node: missing slice SSH key"
203 if self.slicename is None:
204 raise AssertionError, "Misconfigured node: unspecified slice"
206 def install_dependencies(self):
207 if self.required_packages:
208 # TODO: make dependant on the experiment somehow...
209 pidfile = self.DEPENDS_PIDFILE
210 logfile = self.DEPENDS_LOGFILE
212 # Start process in a "daemonized" way, using nohup and heavy
213 # stdin/out redirection to avoid connection issues
214 (out,err),proc = rspawn.remote_spawn(
215 "yum -y install %(packages)s" % {
216 'packages' : ' '.join(self.required_packages),
220 stderr = rspawn.STDOUT,
222 host = self.hostname,
224 user = self.slicename,
226 ident_key = self.ident_path,
227 server_key = self.server_key,
232 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
234 def wait_dependencies(self, pidprobe=1, probe=10, pidmax=10):
235 if self.required_packages:
236 pidfile = self.DEPENDS_PIDFILE
240 for probenum in xrange(pidmax):
241 pidtuple = rspawn.remote_check_pid(
243 host = self.hostname,
245 user = self.slicename,
247 ident_key = self.ident_path,
248 server_key = self.server_key
256 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
258 # wait for it to finish
259 while rspawn.RUNNING is rspawn.remote_status(
261 host = self.hostname,
263 user = self.slicename,
265 ident_key = self.ident_path,
266 server_key = self.server_key
271 # Make sure all the paths are created where
272 # they have to be created for deployment
273 (out,err),proc = server.popen_ssh_command(
275 host = self.hostname,
277 user = self.slicename,
279 ident_key = self.ident_path,
280 server_key = self.server_key
285 elif not err and out.strip() == 'ALIVE':