2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
27 class TempKeyError(Exception):
30 class TestbedController(testbed_impl.TestbedController):
32 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
33 self._home_directory = None
37 import node, interfaces, application
39 self._interfaces = interfaces
40 self._app = application
42 self._blacklist = set()
43 self._just_provisioned = set()
45 self._load_blacklist()
47 self._logger = logging.getLogger('nepi.testbeds.planetlab')
50 def home_directory(self):
51 return self._home_directory
55 if not hasattr(self, '_plapi'):
59 self._plapi = plcapi.PLCAPI(
60 username = self.authUser,
61 password = self.authString,
62 hostname = self.plcHost,
63 urlpattern = self.plcUrl
66 # anonymous access - may not be enough for much
67 self._plapi = plcapi.PLCAPI()
72 if not hasattr(self, '_slice_id'):
73 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
75 self._slice_id = slices[0]['slice_id']
77 # If it wasn't found, don't remember this failure, keep trying
81 def _load_blacklist(self):
82 blpath = environ.homepath('plblacklist')
85 bl = open(blpath, "r")
87 self._blacklist = set()
91 self._blacklist = set(
93 map(str.strip, bl.readlines())
99 def _save_blacklist(self):
100 blpath = environ.homepath('plblacklist')
101 bl = open(blpath, "w")
104 map('%s\n'.__mod__, self._blacklist))
109 self._home_directory = self._attributes.\
110 get_attribute_value("homeDirectory")
111 self.slicename = self._attributes.\
112 get_attribute_value("slice")
113 self.authUser = self._attributes.\
114 get_attribute_value("authUser")
115 self.authString = self._attributes.\
116 get_attribute_value("authPass")
117 self.sliceSSHKey = self._attributes.\
118 get_attribute_value("sliceSSHKey")
119 self.sliceSSHKeyPass = None
120 self.plcHost = self._attributes.\
121 get_attribute_value("plcHost")
122 self.plcUrl = self._attributes.\
123 get_attribute_value("plcUrl")
124 self.logLevel = self._attributes.\
125 get_attribute_value("plLogLevel")
126 self.tapPortBase = self._attributes.\
127 get_attribute_value("tapPortBase")
128 self.p2pDeployment = self._attributes.\
129 get_attribute_value("p2pDeployment")
131 self._logger.setLevel(getattr(logging,self.logLevel))
133 super(TestbedController, self).do_setup()
135 def do_post_asynclaunch(self, guid):
136 # Dependencies were launched asynchronously,
138 dep = self._elements[guid]
139 if isinstance(dep, self._app.Dependency):
140 dep.async_setup_wait()
142 # Two-phase configuration for asynchronous launch
143 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
144 do_poststep_configure = staticmethod(do_post_asynclaunch)
146 def do_preconfigure(self):
148 # Perform resource discovery if we don't have
149 # specific resources assigned yet
150 self.do_resource_discovery()
152 # Create PlanetLab slivers
153 self.do_provisioning()
156 # Wait for provisioning
161 except self._node.UnresponsiveNodeError:
165 if self.p2pDeployment:
166 # Plan application deployment
167 self.do_spanning_deployment_plan()
169 # Configure elements per XML data
170 super(TestbedController, self).do_preconfigure()
172 def do_resource_discovery(self):
173 to_provision = self._to_provision = set()
175 reserved = set(self._blacklist)
176 for guid, node in self._elements.iteritems():
177 if isinstance(node, self._node.Node) and node._node_id is not None:
178 reserved.add(node._node_id)
181 # look for perfectly defined nodes
182 # (ie: those with only one candidate)
183 for guid, node in self._elements.iteritems():
184 if isinstance(node, self._node.Node) and node._node_id is None:
185 # Try existing nodes first
186 # If we have only one candidate, simply use it
187 candidates = node.find_candidates(
188 filter_slice_id = self.slice_id)
189 candidates -= reserved
190 if len(candidates) == 1:
191 node_id = iter(candidates).next()
192 node.assign_node_id(node_id)
193 reserved.add(node_id)
195 # Try again including unassigned nodes
196 candidates = node.find_candidates()
197 candidates -= reserved
198 if len(candidates) > 1:
200 if len(candidates) == 1:
201 node_id = iter(candidates).next()
202 node.assign_node_id(node_id)
203 to_provision.add(node_id)
204 reserved.add(node_id)
206 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
207 node.make_filter_description())
209 # Now do the backtracking search for a suitable solution
210 # First with existing slice nodes
213 for guid, node in self._elements.iteritems():
214 if isinstance(node, self._node.Node) and node._node_id is None:
215 # Try existing nodes first
216 # If we have only one candidate, simply use it
217 candidates = node.find_candidates(
218 filter_slice_id = self.slice_id)
219 candidates -= reserved
220 reqs.append(candidates)
225 solution = resourcealloc.alloc(reqs)
226 except resourcealloc.ResourceAllocationError:
227 # Failed, try again with all nodes
230 candidates = node.find_candidates()
231 candidates -= reserved
232 reqs.append(candidates)
234 solution = resourcealloc.alloc(reqs)
235 to_provision.update(solution)
238 for node, node_id in zip(nodes, solution):
239 node.assign_node_id(node_id)
241 def do_provisioning(self):
242 if self._to_provision:
243 # Add new nodes to the slice
244 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
245 new_nodes = list(set(cur_nodes) | self._to_provision)
246 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
249 self._just_provisioned = self._to_provision
250 del self._to_provision
252 def do_wait_nodes(self):
253 for guid, node in self._elements.iteritems():
254 if isinstance(node, self._node.Node):
255 # Just inject configuration stuff
256 node.home_path = "nepi-node-%s" % (guid,)
257 node.ident_path = self.sliceSSHKey
258 node.slicename = self.slicename
261 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
264 for guid, node in self._elements.iteritems():
265 if isinstance(node, self._node.Node):
266 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
268 node.wait_provisioning(
269 (20*60 if node._node_id in self._just_provisioned else 60)
272 self._logger.info("READY Node %s at %s", guid, node.hostname)
274 # Prepare dependency installer now
275 node.prepare_dependencies()
276 except self._node.UnresponsiveNodeError:
278 self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
280 # Mark all dead nodes (which are unresponsive) on the blacklist
282 for guid, node in self._elements.iteritems():
283 if isinstance(node, self._node.Node):
284 if not node.is_alive():
285 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
286 self._blacklist.add(node._node_id)
290 self._save_blacklist()
294 traceback.print_exc()
298 def do_spanning_deployment_plan(self):
299 # Create application groups by collecting all applications
300 # based on their hash - the hash should contain everything that
301 # defines them and the platform they're built
305 frozenset((app.depends or "").split(' ')),
306 frozenset((app.sources or "").split(' ')),
309 app.node.architecture,
310 app.node.operatingSystem,
314 depgroups = collections.defaultdict(list)
316 for element in self._elements.itervalues():
317 if isinstance(element, self._app.Dependency):
318 depgroups[dephash(element)].append(element)
319 elif isinstance(element, self._node.Node):
320 deps = element._yum_dependencies
322 depgroups[dephash(deps)].append(deps)
324 # Set up spanning deployment for those applications that
325 # have been deployed in several nodes.
326 for dh, group in depgroups.iteritems():
328 # Pick root (deterministically)
329 root = min(group, key=lambda app:app.node.hostname)
331 # Obtain all IPs in numeric format
332 # (which means faster distance computations)
334 dep._ip = socket.gethostbyname(dep.node.hostname)
335 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
338 # NOTE: the plan is an iterator
341 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
345 # Re-sign private key
347 tempprk, temppuk, tmppass = self._make_temp_private_key()
353 for slave, master in plan:
354 slave.set_master(master)
355 slave.install_keys(tempprk, temppuk, tmppass)
357 # We don't need the user's passphrase anymore
358 self.sliceSSHKeyPass = None
360 def _make_temp_private_key(self):
361 # Get the user's key's passphrase
362 if not self.sliceSSHKeyPass:
363 if 'SSH_ASKPASS' in os.environ:
364 proc = subprocess.Popen(
365 [ os.environ['SSH_ASKPASS'],
366 "Please type the passphrase for the %s SSH identity file. "
367 "The passphrase will be used to re-cipher the identity file with "
368 "a random 256-bit key for automated chain deployment on the "
369 "%s PlanetLab slice" % (
370 os.path.basename(self.sliceSSHKey),
373 stdin = open("/dev/null"),
374 stdout = subprocess.PIPE,
375 stderr = subprocess.PIPE)
376 out,err = proc.communicate()
377 self.sliceSSHKeyPass = out.strip()
379 if not self.sliceSSHKeyPass:
382 # Create temporary key files
383 prk = tempfile.NamedTemporaryFile(
384 dir = self.root_directory,
385 prefix = "pl_deploy_tmpk_",
388 puk = tempfile.NamedTemporaryFile(
389 dir = self.root_directory,
390 prefix = "pl_deploy_tmpk_",
393 # Create secure 256-bits temporary passphrase
394 passphrase = ''.join(map(chr,[rng.randint(0,255)
395 for rng in (random.SystemRandom(),)
396 for i in xrange(32)] )).encode("hex")
399 oprk = open(self.sliceSSHKey, "rb")
400 opuk = open(self.sliceSSHKey+".pub", "rb")
401 shutil.copymode(oprk.name, prk.name)
402 shutil.copymode(opuk.name, puk.name)
403 shutil.copyfileobj(oprk, prk)
404 shutil.copyfileobj(opuk, puk)
410 # A descriptive comment
411 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
414 proc = subprocess.Popen(
417 "-P", self.sliceSSHKeyPass,
420 stdout = subprocess.PIPE,
421 stderr = subprocess.PIPE,
422 stdin = subprocess.PIPE
424 out, err = proc.communicate()
427 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
433 # Change comment on public key
434 puklines = puk.readlines()
435 puklines[0] = puklines[0].split(' ')
436 puklines[0][-1] = comment+'\n'
437 puklines[0] = ' '.join(puklines[0])
440 puk.writelines(puklines)
444 return prk, puk, passphrase
446 def set(self, guid, name, value, time = TIME_NOW):
447 super(TestbedController, self).set(guid, name, value, time)
448 # TODO: take on account schedule time for the task
449 element = self._elements[guid]
451 setattr(element, name, value)
453 if hasattr(element, 'refresh'):
454 # invoke attribute refresh hook
457 def get(self, guid, name, time = TIME_NOW):
458 value = super(TestbedController, self).get(guid, name, time)
459 # TODO: take on account schedule time for the task
460 factory_id = self._create[guid]
461 factory = self._factories[factory_id]
462 element = self._elements.get(guid)
464 return getattr(element, name)
465 except (KeyError, AttributeError):
468 def get_address(self, guid, index, attribute='Address'):
472 iface = self._elements.get(guid)
473 if iface and index == 0:
474 if attribute == 'Address':
476 elif attribute == 'NetPrefix':
477 return iface.netprefix
478 elif attribute == 'Broadcast':
479 return iface.broadcast
481 # if all else fails, query box
482 return super(TestbedController, self).get_address(guid, index, attribute)
484 def action(self, time, guid, action):
485 raise NotImplementedError
488 for trace in self._traces.itervalues():
491 runner = ParallelRun(16)
493 for element in self._elements.itervalues():
494 # invoke cleanup hooks
495 if hasattr(element, 'cleanup'):
496 runner.put(element.cleanup)
499 runner = ParallelRun(16)
501 for element in self._elements.itervalues():
502 # invoke destroy hooks
503 if hasattr(element, 'destroy'):
504 runner.put(element.destroy)
507 self._elements.clear()
510 def trace(self, guid, trace_id, attribute='value'):
511 app = self._elements[guid]
513 if attribute == 'value':
514 path = app.sync_trace(self.home_directory, trace_id)
521 elif attribute == 'path':
522 content = app.remote_trace_path(trace_id)
527 def follow_trace(self, trace_id, trace):
528 self._traces[trace_id] = trace
530 def _make_generic(self, parameters, kind):
531 app = kind(self.plapi)
533 # Note: there is 1-to-1 correspondence between attribute names
534 # If that changes, this has to change as well
535 for attr,val in parameters.iteritems():
536 setattr(app, attr, val)
540 def _make_node(self, parameters):
541 return self._make_generic(parameters, self._node.Node)
543 def _make_node_iface(self, parameters):
544 return self._make_generic(parameters, self._interfaces.NodeIface)
546 def _make_tun_iface(self, parameters):
547 return self._make_generic(parameters, self._interfaces.TunIface)
549 def _make_tap_iface(self, parameters):
550 return self._make_generic(parameters, self._interfaces.TapIface)
552 def _make_netpipe(self, parameters):
553 return self._make_generic(parameters, self._interfaces.NetPipe)
555 def _make_internet(self, parameters):
556 return self._make_generic(parameters, self._interfaces.Internet)
558 def _make_application(self, parameters):
559 return self._make_generic(parameters, self._app.Application)
561 def _make_dependency(self, parameters):
562 return self._make_generic(parameters, self._app.Dependency)
564 def _make_nepi_dependency(self, parameters):
565 return self._make_generic(parameters, self._app.NepiDependency)
567 def _make_ns3_dependency(self, parameters):
568 return self._make_generic(parameters, self._app.NS3Dependency)