2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
13 # Map Node attribute to plcapi filter name
14 'hostname' : 'hostname',
18 # Map Node attribute to (<tag name>, <plcapi filter expression>)
19 # There are replacements that are applied with string formatting,
20 # so '%' has to be escaped as '%%'.
21 'architecture' : ('arch','value'),
22 'operating_system' : ('fcdistro','value'),
23 'pl_distro' : ('pldistro','value'),
24 'min_reliability' : ('reliability%(timeframe)s', ']value'),
25 'max_reliability' : ('reliability%(timeframe)s', '[value'),
26 'min_bandwidth' : ('bw%(timeframe)s', ']value'),
27 'max_bandwidth' : ('bw%(timeframe)s', '[value'),
30 DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
31 DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
33 def __init__(self, api=None):
40 self.architecture = None
41 self.operating_system = None
45 self.min_reliability = None
46 self.max_reliability = None
47 self.min_bandwidth = None
48 self.max_bandwidth = None
49 self.min_num_external_ifaces = None
50 self.max_num_external_ifaces = None
53 # Applications add requirements to connected nodes
54 self.required_packages = set()
56 # Testbed-derived attributes
58 self.ident_path = None
61 # Those are filled when an actual node is allocated
64 def build_filters(self, target_filters, filter_map):
65 for attr, tag in filter_map.iteritems():
66 value = getattr(self, attr, None)
68 target_filters[tag] = value
72 def applicable_filters(self):
73 has = lambda att : getattr(self,att,None) is not None
75 filter(has, self.BASEFILTERS.iterkeys())
76 + filter(has, self.TAGFILTERS.iterkeys())
79 def find_candidates(self, filter_slice_id=None):
81 replacements = {'timeframe':self.timeframe}
83 # get initial candidates (no tag filters)
84 basefilters = self.build_filters({}, self.BASEFILTERS)
86 basefilters['|slice_ids'] = (filter_slice_id,)
88 # keyword-only "pseudofilters"
91 extra['peer'] = self.site
93 candidates = set(map(operator.itemgetter('node_id'),
94 self._api.GetNodes(filters=basefilters, fields=fields, **extra)))
96 # filter by tag, one tag at a time
97 applicable = self.applicable_filters
98 for tagfilter in self.TAGFILTERS.iteritems():
99 attr, (tagname, expr) = tagfilter
101 # don't bother if there's no filter defined
102 if attr in applicable:
103 tagfilter = basefilters.copy()
104 tagfilter['tagname'] = tagname % replacements
105 tagfilter[expr % replacements] = getattr(self,attr)
106 tagfilter['node_id'] = list(candidates)
108 candidates &= set(map(operator.itemgetter('node_id'),
109 self._api.GetNodeTags(filters=tagfilter, fields=fields)))
111 # filter by iface count
112 if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
113 # fetch interfaces for all, in one go
114 filters = basefilters.copy()
115 filters['node_id'] = list(candidates)
116 ifaces = dict(map(operator.itemgetter('node_id','interface_ids'),
117 self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) ))
119 # filter candidates by interface count
120 if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None:
121 predicate = ( lambda node_id :
122 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
123 elif self.min_num_external_ifaces is not None:
124 predicate = ( lambda node_id :
125 self.min_num_external_ifaces <= len(ifaces.get(node_id,())) )
127 predicate = ( lambda node_id :
128 len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
130 candidates = set(filter(predicate, candidates))
134 def assign_node_id(self, node_id):
135 self._node_id = node_id
136 self.fetch_node_info()
138 def fetch_node_info(self):
139 info = self._api.GetNodes(self._node_id)
140 tags = dict( (t['tagname'],t['value'])
141 for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
143 self.min_num_external_ifaces = None
144 self.max_num_external_ifaces = None
147 replacements = {'timeframe':self.timeframe}
148 for attr, tag in self.BASEFILTERS.iteritems():
151 setattr(self, attr, value)
152 for attr, (tag,_) in self.TAGFILTERS.iteritems():
153 tag = tag % replacements
156 setattr(self, attr, value)
158 if 'peer_id' in info:
159 self.site = self._api.peer_map[info['peer_id']]
161 if 'interface_ids' in info:
162 self.min_num_external_ifaces = \
163 self.max_num_external_ifaces = len(info['interface_ids'])
166 if self.home_path is None:
167 raise AssertionError, "Misconfigured node: missing home path"
168 if self.ident_path is None or not os.access(self.ident_path, os.R_OK):
169 raise AssertionError, "Misconfigured node: missing slice SSH key"
170 if self.slicename is None:
171 raise AssertionError, "Misconfigured node: unspecified slice"
173 def install_dependencies(self):
174 if self.required_packages:
175 # TODO: make dependant on the experiment somehow...
176 pidfile = self.DEPENDS_PIDFILE
177 logfile = self.DEPENDS_LOGFILE
179 # Start process in a "daemonized" way, using nohup and heavy
180 # stdin/out redirection to avoid connection issues
181 (out,err),proc = rspawn.remote_spawn(
182 "yum -y install %(packages)s" % {
183 'packages' : ' '.join(self.required_packages),
187 stderr = rspawn.STDOUT,
189 host = self.hostname,
191 user = self.slicename,
193 ident_key = self.ident_path,
198 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
200 def wait_dependencies(self, pidprobe=1, probe=10, pidmax=10):
201 if self.required_packages:
202 pidfile = self.DEPENDS_PIDFILE
206 for probenum in xrange(pidmax):
207 pidtuple = rspawn.remote_check_pid(
209 host = self.hostname,
211 user = self.slicename,
213 ident_key = self.ident_path
221 raise RuntimeError, "Failed to obtain pidfile for dependency installer"
223 # wait for it to finish
224 while rspawn.RUNNING is rspawn.remote_status(
226 host = self.hostname,
228 user = self.slicename,
230 ident_key = self.ident_path