2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
23 from nepi.util.constants import TESTBED_STATUS_CONFIGURED
25 class TempKeyError(Exception):
28 class TestbedController(testbed_impl.TestbedController):
29 def __init__(self, testbed_version):
30 super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
31 self._home_directory = None
35 import node, interfaces, application
37 self._interfaces = interfaces
38 self._app = application
41 def home_directory(self):
42 return self._home_directory
46 if not hasattr(self, '_plapi'):
50 self._plapi = plcapi.PLCAPI(
51 username = self.authUser,
52 password = self.authString,
53 hostname = self.plcHost,
54 urlpattern = self.plcUrl
57 # anonymous access - may not be enough for much
58 self._plapi = plcapi.PLCAPI()
63 if not hasattr(self, '_slice_id'):
64 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
66 self._slice_id = slices[0]['slice_id']
68 # If it wasn't found, don't remember this failure, keep trying
73 self._home_directory = self._attributes.\
74 get_attribute_value("homeDirectory")
75 self.slicename = self._attributes.\
76 get_attribute_value("slice")
77 self.authUser = self._attributes.\
78 get_attribute_value("authUser")
79 self.authString = self._attributes.\
80 get_attribute_value("authPass")
81 self.sliceSSHKey = self._attributes.\
82 get_attribute_value("sliceSSHKey")
83 self.sliceSSHKeyPass = None
84 self.plcHost = self._attributes.\
85 get_attribute_value("plcHost")
86 self.plcUrl = self._attributes.\
87 get_attribute_value("plcUrl")
88 super(TestbedController, self).do_setup()
90 def do_post_asynclaunch(self, guid):
91 # Dependencies were launched asynchronously,
93 dep = self._elements[guid]
94 if isinstance(dep, self._app.Dependency):
95 dep.async_setup_wait()
97 # Two-phase configuration for asynchronous launch
98 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
99 do_poststep_configure = staticmethod(do_post_asynclaunch)
101 def do_preconfigure(self):
102 # Perform resource discovery if we don't have
103 # specific resources assigned yet
104 self.do_resource_discovery()
106 # Create PlanetLab slivers
107 self.do_provisioning()
109 # Plan application deployment
110 self.do_spanning_deployment_plan()
112 # Configure elements per XML data
113 super(TestbedController, self).do_preconfigure()
115 def do_resource_discovery(self):
116 to_provision = self._to_provision = set()
119 # look for perfectly defined nodes
120 # (ie: those with only one candidate)
121 for guid, node in self._elements.iteritems():
122 if isinstance(node, self._node.Node) and node._node_id is None:
123 # Try existing nodes first
124 # If we have only one candidate, simply use it
125 candidates = node.find_candidates(
126 filter_slice_id = self.slice_id)
127 if len(candidates) == 1:
128 node.assign_node_id(iter(candidates).next())
130 # Try again including unassigned nodes
131 candidates = node.find_candidates()
132 if len(candidates) > 1:
134 if len(candidates) == 1:
135 node_id = iter(candidates).next()
136 node.assign_node_id(node_id)
137 to_provision.add(node_id)
139 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
140 node.make_filter_description())
142 # Now do the backtracking search for a suitable solution
143 # First with existing slice nodes
146 for guid, node in self._elements.iteritems():
147 if isinstance(node, self._node.Node) and node._node_id is None:
148 # Try existing nodes first
149 # If we have only one candidate, simply use it
150 candidates = node.find_candidates(
151 filter_slice_id = self.slice_id)
152 reqs.append(candidates)
157 solution = resourcealloc.alloc(reqs)
158 except resourcealloc.ResourceAllocationError:
159 # Failed, try again with all nodes
162 candidates = node.find_candidates()
163 reqs.append(candidates)
165 solution = resourcealloc.alloc(reqs)
166 to_provision.update(solution)
169 for node, node_id in zip(nodes, solution):
170 node.assign_node_id(node_id)
172 def do_provisioning(self):
173 if self._to_provision:
174 # Add new nodes to the slice
175 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
176 new_nodes = list(set(cur_nodes) | self._to_provision)
177 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
180 del self._to_provision
182 def do_spanning_deployment_plan(self):
183 # Create application groups by collecting all applications
184 # based on their hash - the hash should contain everything that
185 # defines them and the platform they're built
189 frozenset((app.depends or "").split(' ')),
190 frozenset((app.sources or "").split(' ')),
193 app.node.architecture,
194 app.node.operatingSystem,
198 depgroups = collections.defaultdict(list)
200 for element in self._elements.itervalues():
201 if isinstance(element, self._app.Dependency):
202 depgroups[dephash(element)].append(element)
204 # Set up spanning deployment for those applications that
205 # have been deployed in several nodes.
206 for dh, group in depgroups.iteritems():
208 # Pick root (deterministically)
209 root = min(group, key=lambda app:app.node.hostname)
211 # Obtain all IPs in numeric format
212 # (which means faster distance computations)
214 dep._ip = socket.gethostbyname(dep.node.hostname)
215 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
218 # NOTE: the plan is an iterator
221 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
225 # Re-sign private key
227 tempprk, temppuk, tmppass = self._make_temp_private_key()
233 for slave, master in plan:
234 slave.set_master(master)
235 slave.install_keys(tempprk, temppuk, tmppass)
237 # We don't need the user's passphrase anymore
238 self.sliceSSHKeyPass = None
240 def _make_temp_private_key(self):
241 # Get the user's key's passphrase
242 if not self.sliceSSHKeyPass:
243 if 'SSH_ASKPASS' in os.environ:
244 proc = subprocess.Popen(
245 [ os.environ['SSH_ASKPASS'],
246 "Please type the passphrase for the %s SSH identity file. "
247 "The passphrase will be used to re-cipher the identity file with "
248 "a random 256-bit key for automated chain deployment on the "
249 "%s PlanetLab slice" % (
250 os.path.basename(self.sliceSSHKey),
253 stdin = open("/dev/null"),
254 stdout = subprocess.PIPE,
255 stderr = subprocess.PIPE)
256 out,err = proc.communicate()
257 self.sliceSSHKeyPass = out.strip()
259 if not self.sliceSSHKeyPass:
262 # Create temporary key files
263 prk = tempfile.NamedTemporaryFile(
264 dir = self.root_directory,
265 prefix = "pl_deploy_tmpk_",
268 puk = tempfile.NamedTemporaryFile(
269 dir = self.root_directory,
270 prefix = "pl_deploy_tmpk_",
273 # Create secure 256-bits temporary passphrase
274 passphrase = ''.join(map(chr,[rng.randint(0,255)
275 for rng in (random.SystemRandom(),)
276 for i in xrange(32)] )).encode("hex")
279 oprk = open(self.sliceSSHKey, "rb")
280 opuk = open(self.sliceSSHKey+".pub", "rb")
281 shutil.copymode(oprk.name, prk.name)
282 shutil.copymode(opuk.name, puk.name)
283 shutil.copyfileobj(oprk, prk)
284 shutil.copyfileobj(opuk, puk)
290 # A descriptive comment
291 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
294 proc = subprocess.Popen(
297 "-P", self.sliceSSHKeyPass,
300 stdout = subprocess.PIPE,
301 stderr = subprocess.PIPE,
302 stdin = subprocess.PIPE
304 out, err = proc.communicate()
307 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
313 # Change comment on public key
314 puklines = puk.readlines()
315 puklines[0] = puklines[0].split(' ')
316 puklines[0][-1] = comment+'\n'
317 puklines[0] = ' '.join(puklines[0])
320 puk.writelines(puklines)
324 return prk, puk, passphrase
326 def set(self, guid, name, value, time = TIME_NOW):
327 super(TestbedController, self).set(guid, name, value, time)
328 # TODO: take on account schedule time for the task
329 element = self._elements[guid]
331 setattr(element, name, value)
333 if hasattr(element, 'refresh'):
334 # invoke attribute refresh hook
337 def get(self, guid, name, time = TIME_NOW):
338 value = super(TestbedController, self).get(guid, name, time)
339 # TODO: take on account schedule time for the task
340 factory_id = self._create[guid]
341 factory = self._factories[factory_id]
342 if factory.box_attributes.is_attribute_design_only(name):
344 element = self._elements.get(guid)
346 return getattr(element, name)
347 except KeyError, AttributeError:
350 def get_address(self, guid, index, attribute='Address'):
354 iface = self._elements.get(guid)
355 if iface and index == 0:
356 if attribute == 'Address':
358 elif attribute == 'NetPrefix':
359 return iface.netprefix
360 elif attribute == 'Broadcast':
361 return iface.broadcast
363 # if all else fails, query box
364 return super(TestbedController, self).get_address(guid, index, attribute)
366 def action(self, time, guid, action):
367 raise NotImplementedError
370 for trace in self._traces.itervalues():
372 for element in self._elements.itervalues():
373 # invoke cleanup hooks
374 if hasattr(element, 'cleanup'):
376 for element in self._elements.itervalues():
377 # invoke destroy hooks
378 if hasattr(element, 'destroy'):
380 self._elements.clear()
383 def trace(self, guid, trace_id, attribute='value'):
384 app = self._elements[guid]
386 if attribute == 'value':
387 path = app.sync_trace(self.home_directory, trace_id)
394 elif attribute == 'path':
395 content = app.remote_trace_path(trace_id)
400 def follow_trace(self, trace_id, trace):
401 self._traces[trace_id] = trace
403 def _make_generic(self, parameters, kind):
404 app = kind(self.plapi)
406 # Note: there is 1-to-1 correspondence between attribute names
407 # If that changes, this has to change as well
408 for attr,val in parameters.iteritems():
409 setattr(app, attr, val)
413 def _make_node(self, parameters):
414 node = self._make_generic(parameters, self._node.Node)
416 # If emulation is enabled, we automatically need
417 # some vsys interfaces and packages
419 node.required_vsys.add('ipfw-be')
420 node.required_packages.add('ipfwslice')
424 def _make_node_iface(self, parameters):
425 return self._make_generic(parameters, self._interfaces.NodeIface)
427 def _make_tun_iface(self, parameters):
428 return self._make_generic(parameters, self._interfaces.TunIface)
430 def _make_tap_iface(self, parameters):
431 return self._make_generic(parameters, self._interfaces.TapIface)
433 def _make_netpipe(self, parameters):
434 return self._make_generic(parameters, self._interfaces.NetPipe)
436 def _make_internet(self, parameters):
437 return self._make_generic(parameters, self._interfaces.Internet)
439 def _make_application(self, parameters):
440 return self._make_generic(parameters, self._app.Application)
442 def _make_dependency(self, parameters):
443 return self._make_generic(parameters, self._app.Dependency)
445 def _make_nepi_dependency(self, parameters):
446 return self._make_generic(parameters, self._app.NepiDependency)
448 def _make_ns3_dependency(self, parameters):
449 return self._make_generic(parameters, self._app.NS3Dependency)