2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
26 class TempKeyError(Exception):
29 class TestbedController(testbed_impl.TestbedController):
31 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
32 self._home_directory = None
36 import node, interfaces, application
38 self._interfaces = interfaces
39 self._app = application
41 self._blacklist = set()
42 self._just_provisioned = set()
44 self._load_blacklist()
47 def home_directory(self):
48 return self._home_directory
52 if not hasattr(self, '_plapi'):
56 self._plapi = plcapi.PLCAPI(
57 username = self.authUser,
58 password = self.authString,
59 hostname = self.plcHost,
60 urlpattern = self.plcUrl
63 # anonymous access - may not be enough for much
64 self._plapi = plcapi.PLCAPI()
69 if not hasattr(self, '_slice_id'):
70 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
72 self._slice_id = slices[0]['slice_id']
74 # If it wasn't found, don't remember this failure, keep trying
78 def _load_blacklist(self):
79 blpath = environ.homepath('plblacklist')
82 bl = open(blpath, "r")
84 self._blacklist = set()
88 self._blacklist = set(
90 map(str.strip, bl.readlines())
96 def _save_blacklist(self):
97 blpath = environ.homepath('plblacklist')
98 bl = open(blpath, "w")
101 map('%s\n'.__mod__, self._blacklist))
106 self._home_directory = self._attributes.\
107 get_attribute_value("homeDirectory")
108 self.slicename = self._attributes.\
109 get_attribute_value("slice")
110 self.authUser = self._attributes.\
111 get_attribute_value("authUser")
112 self.authString = self._attributes.\
113 get_attribute_value("authPass")
114 self.sliceSSHKey = self._attributes.\
115 get_attribute_value("sliceSSHKey")
116 self.sliceSSHKeyPass = None
117 self.plcHost = self._attributes.\
118 get_attribute_value("plcHost")
119 self.plcUrl = self._attributes.\
120 get_attribute_value("plcUrl")
121 super(TestbedController, self).do_setup()
123 def do_post_asynclaunch(self, guid):
124 # Dependencies were launched asynchronously,
126 dep = self._elements[guid]
127 if isinstance(dep, self._app.Dependency):
128 dep.async_setup_wait()
130 # Two-phase configuration for asynchronous launch
131 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
132 do_poststep_configure = staticmethod(do_post_asynclaunch)
134 def do_preconfigure(self):
136 # Perform resource discovery if we don't have
137 # specific resources assigned yet
138 self.do_resource_discovery()
140 # Create PlanetLab slivers
141 self.do_provisioning()
144 # Wait for provisioning
149 except self._node.UnresponsiveNodeError:
153 # Plan application deployment
154 self.do_spanning_deployment_plan()
156 # Configure elements per XML data
157 super(TestbedController, self).do_preconfigure()
159 def do_resource_discovery(self):
160 to_provision = self._to_provision = set()
162 reserved = set(self._blacklist)
163 for guid, node in self._elements.iteritems():
164 if isinstance(node, self._node.Node) and node._node_id is not None:
165 reserved.add(node._node_id)
168 # look for perfectly defined nodes
169 # (ie: those with only one candidate)
170 for guid, node in self._elements.iteritems():
171 if isinstance(node, self._node.Node) and node._node_id is None:
172 # Try existing nodes first
173 # If we have only one candidate, simply use it
174 candidates = node.find_candidates(
175 filter_slice_id = self.slice_id)
176 candidates -= reserved
177 if len(candidates) == 1:
178 node_id = iter(candidates).next()
179 node.assign_node_id(node_id)
180 reserved.add(node_id)
182 # Try again including unassigned nodes
183 candidates = node.find_candidates()
184 candidates -= reserved
185 if len(candidates) > 1:
187 if len(candidates) == 1:
188 node_id = iter(candidates).next()
189 node.assign_node_id(node_id)
190 to_provision.add(node_id)
191 reserved.add(node_id)
193 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
194 node.make_filter_description())
196 # Now do the backtracking search for a suitable solution
197 # First with existing slice nodes
200 for guid, node in self._elements.iteritems():
201 if isinstance(node, self._node.Node) and node._node_id is None:
202 # Try existing nodes first
203 # If we have only one candidate, simply use it
204 candidates = node.find_candidates(
205 filter_slice_id = self.slice_id)
206 candidates -= reserved
207 reqs.append(candidates)
212 solution = resourcealloc.alloc(reqs)
213 except resourcealloc.ResourceAllocationError:
214 # Failed, try again with all nodes
217 candidates = node.find_candidates()
218 candidates -= reserved
219 reqs.append(candidates)
221 solution = resourcealloc.alloc(reqs)
222 to_provision.update(solution)
225 for node, node_id in zip(nodes, solution):
226 node.assign_node_id(node_id)
228 def do_provisioning(self):
229 if self._to_provision:
230 # Add new nodes to the slice
231 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
232 new_nodes = list(set(cur_nodes) | self._to_provision)
233 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
236 self._just_provisioned = self._to_provision
237 del self._to_provision
239 def do_wait_nodes(self):
240 for guid, node in self._elements.iteritems():
241 if isinstance(node, self._node.Node):
242 # Just inject configuration stuff
243 node.home_path = "nepi-node-%s" % (guid,)
244 node.ident_path = self.sliceSSHKey
245 node.slicename = self.slicename
248 print >>sys.stderr, "PlanetLab Node", guid, "configured at", node.hostname
251 for guid, node in self._elements.iteritems():
252 if isinstance(node, self._node.Node):
253 print >>sys.stderr, "Waiting for Node", guid, "configured at", node.hostname,
256 node.wait_provisioning(
257 (20*60 if node._node_id in self._just_provisioned else 60)
260 print >>sys.stderr, "READY"
261 except self._node.UnresponsiveNodeError:
263 print >>sys.stderr, "UNRESPONSIVE"
265 # Mark all dead nodes (which are unresponsive) on the blacklist
267 for guid, node in self._elements.iteritems():
268 if isinstance(node, self._node.Node):
269 if not node.is_alive():
270 print >>sys.stderr, "Blacklisting", node.hostname, "for unresponsiveness"
271 self._blacklist.add(node._node_id)
275 self._save_blacklist()
279 traceback.print_exc()
283 def do_spanning_deployment_plan(self):
284 # Create application groups by collecting all applications
285 # based on their hash - the hash should contain everything that
286 # defines them and the platform they're built
290 frozenset((app.depends or "").split(' ')),
291 frozenset((app.sources or "").split(' ')),
294 app.node.architecture,
295 app.node.operatingSystem,
299 depgroups = collections.defaultdict(list)
301 for element in self._elements.itervalues():
302 if isinstance(element, self._app.Dependency):
303 depgroups[dephash(element)].append(element)
305 # Set up spanning deployment for those applications that
306 # have been deployed in several nodes.
307 for dh, group in depgroups.iteritems():
309 # Pick root (deterministically)
310 root = min(group, key=lambda app:app.node.hostname)
312 # Obtain all IPs in numeric format
313 # (which means faster distance computations)
315 dep._ip = socket.gethostbyname(dep.node.hostname)
316 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
319 # NOTE: the plan is an iterator
322 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
326 # Re-sign private key
328 tempprk, temppuk, tmppass = self._make_temp_private_key()
334 for slave, master in plan:
335 slave.set_master(master)
336 slave.install_keys(tempprk, temppuk, tmppass)
338 # We don't need the user's passphrase anymore
339 self.sliceSSHKeyPass = None
341 def _make_temp_private_key(self):
342 # Get the user's key's passphrase
343 if not self.sliceSSHKeyPass:
344 if 'SSH_ASKPASS' in os.environ:
345 proc = subprocess.Popen(
346 [ os.environ['SSH_ASKPASS'],
347 "Please type the passphrase for the %s SSH identity file. "
348 "The passphrase will be used to re-cipher the identity file with "
349 "a random 256-bit key for automated chain deployment on the "
350 "%s PlanetLab slice" % (
351 os.path.basename(self.sliceSSHKey),
354 stdin = open("/dev/null"),
355 stdout = subprocess.PIPE,
356 stderr = subprocess.PIPE)
357 out,err = proc.communicate()
358 self.sliceSSHKeyPass = out.strip()
360 if not self.sliceSSHKeyPass:
363 # Create temporary key files
364 prk = tempfile.NamedTemporaryFile(
365 dir = self.root_directory,
366 prefix = "pl_deploy_tmpk_",
369 puk = tempfile.NamedTemporaryFile(
370 dir = self.root_directory,
371 prefix = "pl_deploy_tmpk_",
374 # Create secure 256-bits temporary passphrase
375 passphrase = ''.join(map(chr,[rng.randint(0,255)
376 for rng in (random.SystemRandom(),)
377 for i in xrange(32)] )).encode("hex")
380 oprk = open(self.sliceSSHKey, "rb")
381 opuk = open(self.sliceSSHKey+".pub", "rb")
382 shutil.copymode(oprk.name, prk.name)
383 shutil.copymode(opuk.name, puk.name)
384 shutil.copyfileobj(oprk, prk)
385 shutil.copyfileobj(opuk, puk)
391 # A descriptive comment
392 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
395 proc = subprocess.Popen(
398 "-P", self.sliceSSHKeyPass,
401 stdout = subprocess.PIPE,
402 stderr = subprocess.PIPE,
403 stdin = subprocess.PIPE
405 out, err = proc.communicate()
408 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
414 # Change comment on public key
415 puklines = puk.readlines()
416 puklines[0] = puklines[0].split(' ')
417 puklines[0][-1] = comment+'\n'
418 puklines[0] = ' '.join(puklines[0])
421 puk.writelines(puklines)
425 return prk, puk, passphrase
427 def set(self, guid, name, value, time = TIME_NOW):
428 super(TestbedController, self).set(guid, name, value, time)
429 # TODO: take on account schedule time for the task
430 element = self._elements[guid]
432 setattr(element, name, value)
434 if hasattr(element, 'refresh'):
435 # invoke attribute refresh hook
438 def get(self, guid, name, time = TIME_NOW):
439 value = super(TestbedController, self).get(guid, name, time)
440 # TODO: take on account schedule time for the task
441 factory_id = self._create[guid]
442 factory = self._factories[factory_id]
443 element = self._elements.get(guid)
445 return getattr(element, name)
446 except (KeyError, AttributeError):
449 def get_address(self, guid, index, attribute='Address'):
453 iface = self._elements.get(guid)
454 if iface and index == 0:
455 if attribute == 'Address':
457 elif attribute == 'NetPrefix':
458 return iface.netprefix
459 elif attribute == 'Broadcast':
460 return iface.broadcast
462 # if all else fails, query box
463 return super(TestbedController, self).get_address(guid, index, attribute)
465 def action(self, time, guid, action):
466 raise NotImplementedError
469 for trace in self._traces.itervalues():
472 runner = ParallelRun(16)
474 for element in self._elements.itervalues():
475 # invoke cleanup hooks
476 if hasattr(element, 'cleanup'):
477 runner.put(element.cleanup)
480 runner = ParallelRun(16)
482 for element in self._elements.itervalues():
483 # invoke destroy hooks
484 if hasattr(element, 'destroy'):
485 runner.put(element.destroy)
488 self._elements.clear()
491 def trace(self, guid, trace_id, attribute='value'):
492 app = self._elements[guid]
494 if attribute == 'value':
495 path = app.sync_trace(self.home_directory, trace_id)
502 elif attribute == 'path':
503 content = app.remote_trace_path(trace_id)
508 def follow_trace(self, trace_id, trace):
509 self._traces[trace_id] = trace
511 def _make_generic(self, parameters, kind):
512 app = kind(self.plapi)
514 # Note: there is 1-to-1 correspondence between attribute names
515 # If that changes, this has to change as well
516 for attr,val in parameters.iteritems():
517 setattr(app, attr, val)
521 def _make_node(self, parameters):
522 return self._make_generic(parameters, self._node.Node)
524 def _make_node_iface(self, parameters):
525 return self._make_generic(parameters, self._interfaces.NodeIface)
527 def _make_tun_iface(self, parameters):
528 return self._make_generic(parameters, self._interfaces.TunIface)
530 def _make_tap_iface(self, parameters):
531 return self._make_generic(parameters, self._interfaces.TapIface)
533 def _make_netpipe(self, parameters):
534 return self._make_generic(parameters, self._interfaces.NetPipe)
536 def _make_internet(self, parameters):
537 return self._make_generic(parameters, self._interfaces.Internet)
539 def _make_application(self, parameters):
540 return self._make_generic(parameters, self._app.Application)
542 def _make_dependency(self, parameters):
543 return self._make_generic(parameters, self._app.Dependency)
545 def _make_nepi_dependency(self, parameters):
546 return self._make_generic(parameters, self._app.NepiDependency)
548 def _make_ns3_dependency(self, parameters):
549 return self._make_generic(parameters, self._app.NS3Dependency)