2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
25 class TempKeyError(Exception):
28 class TestbedController(testbed_impl.TestbedController):
29 def __init__(self, testbed_version):
30 super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
31 self._home_directory = None
35 import node, interfaces, application
37 self._interfaces = interfaces
38 self._app = application
40 self._blacklist = set()
41 self._just_provisioned = set()
43 self._load_blacklist()
46 def home_directory(self):
47 return self._home_directory
51 if not hasattr(self, '_plapi'):
55 self._plapi = plcapi.PLCAPI(
56 username = self.authUser,
57 password = self.authString,
58 hostname = self.plcHost,
59 urlpattern = self.plcUrl
62 # anonymous access - may not be enough for much
63 self._plapi = plcapi.PLCAPI()
68 if not hasattr(self, '_slice_id'):
69 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
71 self._slice_id = slices[0]['slice_id']
73 # If it wasn't found, don't remember this failure, keep trying
77 def _load_blacklist(self):
78 blpath = environ.homepath('plblacklist')
81 bl = open(blpath, "r")
83 self._blacklist = set()
87 self._blacklist = set(
89 map(str.strip, bl.readlines())
95 def _save_blacklist(self):
96 blpath = environ.homepath('plblacklist')
97 bl = open(blpath, "w")
100 map('%s\n'.__mod__, self._blacklist))
105 self._home_directory = self._attributes.\
106 get_attribute_value("homeDirectory")
107 self.slicename = self._attributes.\
108 get_attribute_value("slice")
109 self.authUser = self._attributes.\
110 get_attribute_value("authUser")
111 self.authString = self._attributes.\
112 get_attribute_value("authPass")
113 self.sliceSSHKey = self._attributes.\
114 get_attribute_value("sliceSSHKey")
115 self.sliceSSHKeyPass = None
116 self.plcHost = self._attributes.\
117 get_attribute_value("plcHost")
118 self.plcUrl = self._attributes.\
119 get_attribute_value("plcUrl")
120 super(TestbedController, self).do_setup()
122 def do_post_asynclaunch(self, guid):
123 # Dependencies were launched asynchronously,
125 dep = self._elements[guid]
126 if isinstance(dep, self._app.Dependency):
127 dep.async_setup_wait()
129 # Two-phase configuration for asynchronous launch
130 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
131 do_poststep_configure = staticmethod(do_post_asynclaunch)
133 def do_preconfigure(self):
135 # Perform resource discovery if we don't have
136 # specific resources assigned yet
137 self.do_resource_discovery()
139 # Create PlanetLab slivers
140 self.do_provisioning()
143 # Wait for provisioning
148 except self._node.UnresponsiveNodeError:
152 # Plan application deployment
153 self.do_spanning_deployment_plan()
155 # Configure elements per XML data
156 super(TestbedController, self).do_preconfigure()
158 def do_resource_discovery(self):
159 to_provision = self._to_provision = set()
161 reserved = set(self._blacklist)
162 for guid, node in self._elements.iteritems():
163 if isinstance(node, self._node.Node) and node._node_id is not None:
164 reserved.add(node._node_id)
167 # look for perfectly defined nodes
168 # (ie: those with only one candidate)
169 for guid, node in self._elements.iteritems():
170 if isinstance(node, self._node.Node) and node._node_id is None:
171 # Try existing nodes first
172 # If we have only one candidate, simply use it
173 candidates = node.find_candidates(
174 filter_slice_id = self.slice_id)
175 candidates -= reserved
176 if len(candidates) == 1:
177 node_id = iter(candidates).next()
178 node.assign_node_id(node_id)
179 reserved.add(node_id)
181 # Try again including unassigned nodes
182 candidates = node.find_candidates()
183 candidates -= reserved
184 if len(candidates) > 1:
186 if len(candidates) == 1:
187 node_id = iter(candidates).next()
188 node.assign_node_id(node_id)
189 to_provision.add(node_id)
190 reserved.add(node_id)
192 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
193 node.make_filter_description())
195 # Now do the backtracking search for a suitable solution
196 # First with existing slice nodes
199 for guid, node in self._elements.iteritems():
200 if isinstance(node, self._node.Node) and node._node_id is None:
201 # Try existing nodes first
202 # If we have only one candidate, simply use it
203 candidates = node.find_candidates(
204 filter_slice_id = self.slice_id)
205 candidates -= reserved
206 reqs.append(candidates)
211 solution = resourcealloc.alloc(reqs)
212 except resourcealloc.ResourceAllocationError:
213 # Failed, try again with all nodes
216 candidates = node.find_candidates()
217 reqs.append(candidates)
219 solution = resourcealloc.alloc(reqs)
220 to_provision.update(solution)
223 for node, node_id in zip(nodes, solution):
224 node.assign_node_id(node_id)
226 def do_provisioning(self):
227 if self._to_provision:
228 # Add new nodes to the slice
229 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
230 new_nodes = list(set(cur_nodes) | self._to_provision)
231 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
234 self._just_provisioned = self._to_provision
235 del self._to_provision
237 def do_wait_nodes(self):
238 for guid, node in self._elements.iteritems():
239 if isinstance(node, self._node.Node):
240 # Just inject configuration stuff
241 node.home_path = "nepi-node-%s" % (guid,)
242 node.ident_path = self.sliceSSHKey
243 node.slicename = self.slicename
246 print "PlanetLab Node", guid, "configured at", node.hostname
249 for guid, node in self._elements.iteritems():
250 if isinstance(node, self._node.Node):
251 print "Waiting for Node", guid, "configured at", node.hostname,
254 node.wait_provisioning(
255 (20*60 if node._node_id in self._just_provisioned else 60)
259 except self._node.UnresponsiveNodeError:
263 # Mark all dead nodes (which are unresponsive) on the blacklist
265 for guid, node in self._elements.iteritems():
266 if isinstance(node, self._node.Node):
267 if not node.is_alive():
268 print "Blacklisting", node.hostname, "for unresponsiveness"
269 self._blacklist.add(node._node_id)
273 self._save_blacklist()
277 traceback.print_exc()
281 def do_spanning_deployment_plan(self):
282 # Create application groups by collecting all applications
283 # based on their hash - the hash should contain everything that
284 # defines them and the platform they're built
288 frozenset((app.depends or "").split(' ')),
289 frozenset((app.sources or "").split(' ')),
292 app.node.architecture,
293 app.node.operatingSystem,
297 depgroups = collections.defaultdict(list)
299 for element in self._elements.itervalues():
300 if isinstance(element, self._app.Dependency):
301 depgroups[dephash(element)].append(element)
303 # Set up spanning deployment for those applications that
304 # have been deployed in several nodes.
305 for dh, group in depgroups.iteritems():
307 # Pick root (deterministically)
308 root = min(group, key=lambda app:app.node.hostname)
310 # Obtain all IPs in numeric format
311 # (which means faster distance computations)
313 dep._ip = socket.gethostbyname(dep.node.hostname)
314 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
317 # NOTE: the plan is an iterator
320 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
324 # Re-sign private key
326 tempprk, temppuk, tmppass = self._make_temp_private_key()
332 for slave, master in plan:
333 slave.set_master(master)
334 slave.install_keys(tempprk, temppuk, tmppass)
336 # We don't need the user's passphrase anymore
337 self.sliceSSHKeyPass = None
339 def _make_temp_private_key(self):
340 # Get the user's key's passphrase
341 if not self.sliceSSHKeyPass:
342 if 'SSH_ASKPASS' in os.environ:
343 proc = subprocess.Popen(
344 [ os.environ['SSH_ASKPASS'],
345 "Please type the passphrase for the %s SSH identity file. "
346 "The passphrase will be used to re-cipher the identity file with "
347 "a random 256-bit key for automated chain deployment on the "
348 "%s PlanetLab slice" % (
349 os.path.basename(self.sliceSSHKey),
352 stdin = open("/dev/null"),
353 stdout = subprocess.PIPE,
354 stderr = subprocess.PIPE)
355 out,err = proc.communicate()
356 self.sliceSSHKeyPass = out.strip()
358 if not self.sliceSSHKeyPass:
361 # Create temporary key files
362 prk = tempfile.NamedTemporaryFile(
363 dir = self.root_directory,
364 prefix = "pl_deploy_tmpk_",
367 puk = tempfile.NamedTemporaryFile(
368 dir = self.root_directory,
369 prefix = "pl_deploy_tmpk_",
372 # Create secure 256-bits temporary passphrase
373 passphrase = ''.join(map(chr,[rng.randint(0,255)
374 for rng in (random.SystemRandom(),)
375 for i in xrange(32)] )).encode("hex")
378 oprk = open(self.sliceSSHKey, "rb")
379 opuk = open(self.sliceSSHKey+".pub", "rb")
380 shutil.copymode(oprk.name, prk.name)
381 shutil.copymode(opuk.name, puk.name)
382 shutil.copyfileobj(oprk, prk)
383 shutil.copyfileobj(opuk, puk)
389 # A descriptive comment
390 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
393 proc = subprocess.Popen(
396 "-P", self.sliceSSHKeyPass,
399 stdout = subprocess.PIPE,
400 stderr = subprocess.PIPE,
401 stdin = subprocess.PIPE
403 out, err = proc.communicate()
406 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
412 # Change comment on public key
413 puklines = puk.readlines()
414 puklines[0] = puklines[0].split(' ')
415 puklines[0][-1] = comment+'\n'
416 puklines[0] = ' '.join(puklines[0])
419 puk.writelines(puklines)
423 return prk, puk, passphrase
425 def set(self, guid, name, value, time = TIME_NOW):
426 super(TestbedController, self).set(guid, name, value, time)
427 # TODO: take on account schedule time for the task
428 element = self._elements[guid]
430 setattr(element, name, value)
432 if hasattr(element, 'refresh'):
433 # invoke attribute refresh hook
436 def get(self, guid, name, time = TIME_NOW):
437 value = super(TestbedController, self).get(guid, name, time)
438 # TODO: take on account schedule time for the task
439 factory_id = self._create[guid]
440 factory = self._factories[factory_id]
441 if factory.box_attributes.is_attribute_design_only(name):
443 element = self._elements.get(guid)
445 return getattr(element, name)
446 except KeyError, AttributeError:
449 def get_address(self, guid, index, attribute='Address'):
453 iface = self._elements.get(guid)
454 if iface and index == 0:
455 if attribute == 'Address':
457 elif attribute == 'NetPrefix':
458 return iface.netprefix
459 elif attribute == 'Broadcast':
460 return iface.broadcast
462 # if all else fails, query box
463 return super(TestbedController, self).get_address(guid, index, attribute)
465 def action(self, time, guid, action):
466 raise NotImplementedError
469 for trace in self._traces.itervalues():
471 for element in self._elements.itervalues():
472 # invoke cleanup hooks
473 if hasattr(element, 'cleanup'):
475 for element in self._elements.itervalues():
476 # invoke destroy hooks
477 if hasattr(element, 'destroy'):
479 self._elements.clear()
482 def trace(self, guid, trace_id, attribute='value'):
483 app = self._elements[guid]
485 if attribute == 'value':
486 path = app.sync_trace(self.home_directory, trace_id)
493 elif attribute == 'path':
494 content = app.remote_trace_path(trace_id)
499 def follow_trace(self, trace_id, trace):
500 self._traces[trace_id] = trace
502 def _make_generic(self, parameters, kind):
503 app = kind(self.plapi)
505 # Note: there is 1-to-1 correspondence between attribute names
506 # If that changes, this has to change as well
507 for attr,val in parameters.iteritems():
508 setattr(app, attr, val)
512 def _make_node(self, parameters):
513 node = self._make_generic(parameters, self._node.Node)
515 # If emulation is enabled, we automatically need
516 # some vsys interfaces and packages
518 node.required_vsys.add('ipfw-be')
519 node.required_packages.add('ipfwslice')
523 def _make_node_iface(self, parameters):
524 return self._make_generic(parameters, self._interfaces.NodeIface)
526 def _make_tun_iface(self, parameters):
527 return self._make_generic(parameters, self._interfaces.TunIface)
529 def _make_tap_iface(self, parameters):
530 return self._make_generic(parameters, self._interfaces.TapIface)
532 def _make_netpipe(self, parameters):
533 return self._make_generic(parameters, self._interfaces.NetPipe)
535 def _make_internet(self, parameters):
536 return self._make_generic(parameters, self._interfaces.Internet)
538 def _make_application(self, parameters):
539 return self._make_generic(parameters, self._app.Application)
541 def _make_dependency(self, parameters):
542 return self._make_generic(parameters, self._app.Dependency)
544 def _make_nepi_dependency(self, parameters):
545 return self._make_generic(parameters, self._app.NepiDependency)
547 def _make_ns3_dependency(self, parameters):
548 return self._make_generic(parameters, self._app.NS3Dependency)