2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
29 class TempKeyError(Exception):
32 class TestbedController(testbed_impl.TestbedController):
34 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
35 self._home_directory = None
39 import node, interfaces, application
41 self._interfaces = interfaces
42 self._app = application
44 self._blacklist = set()
45 self._just_provisioned = set()
47 self._load_blacklist()
49 self._logger = logging.getLogger('nepi.testbeds.planetlab')
52 def home_directory(self):
53 return self._home_directory
57 if not hasattr(self, '_plapi'):
61 self._plapi = plcapi.PLCAPI(
62 username = self.authUser,
63 password = self.authString,
64 hostname = self.plcHost,
65 urlpattern = self.plcUrl
68 # anonymous access - may not be enough for much
69 self._plapi = plcapi.PLCAPI()
74 if not hasattr(self, '_slice_id'):
75 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
77 self._slice_id = slices[0]['slice_id']
79 # If it wasn't found, don't remember this failure, keep trying
85 if not hasattr(self, '_vsys_vnet'):
86 slicetags = self.plapi.GetSliceTags(
87 name = self.slicename,
88 tagname = 'vsys_vnet',
91 self._vsys_vnet = slicetags[0]['value']
93 # If it wasn't found, don't remember this failure, keep trying
95 return self._vsys_vnet
97 def _load_blacklist(self):
98 blpath = environ.homepath('plblacklist')
101 bl = open(blpath, "r")
103 self._blacklist = set()
107 self._blacklist = set(
109 map(str.strip, bl.readlines())
115 def _save_blacklist(self):
116 blpath = environ.homepath('plblacklist')
117 bl = open(blpath, "w")
120 map('%s\n'.__mod__, self._blacklist))
125 self._home_directory = self._attributes.\
126 get_attribute_value("homeDirectory")
127 self.slicename = self._attributes.\
128 get_attribute_value("slice")
129 self.authUser = self._attributes.\
130 get_attribute_value("authUser")
131 self.authString = self._attributes.\
132 get_attribute_value("authPass")
133 self.sliceSSHKey = self._attributes.\
134 get_attribute_value("sliceSSHKey")
135 self.sliceSSHKeyPass = None
136 self.plcHost = self._attributes.\
137 get_attribute_value("plcHost")
138 self.plcUrl = self._attributes.\
139 get_attribute_value("plcUrl")
140 self.logLevel = self._attributes.\
141 get_attribute_value("plLogLevel")
142 self.tapPortBase = self._attributes.\
143 get_attribute_value("tapPortBase")
144 self.p2pDeployment = self._attributes.\
145 get_attribute_value("p2pDeployment")
146 self.dedicatedSlice = self._attributes.\
147 get_attribute_value("dedicatedSlice")
149 self._logger.setLevel(getattr(logging,self.logLevel))
151 super(TestbedController, self).do_setup()
153 def do_post_asynclaunch(self, guid):
154 # Dependencies were launched asynchronously,
156 dep = self._elements[guid]
157 if isinstance(dep, self._app.Dependency):
158 dep.async_setup_wait()
160 # Two-phase configuration for asynchronous launch
161 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
162 do_poststep_configure = staticmethod(do_post_asynclaunch)
164 def do_preconfigure(self):
166 # Perform resource discovery if we don't have
167 # specific resources assigned yet
168 self.do_resource_discovery()
170 # Create PlanetLab slivers
171 self.do_provisioning()
174 # Wait for provisioning
179 except self._node.UnresponsiveNodeError:
183 if self.p2pDeployment:
184 # Plan application deployment
185 self.do_spanning_deployment_plan()
187 # Configure elements per XML data
188 super(TestbedController, self).do_preconfigure()
190 def do_resource_discovery(self, recover = False):
191 to_provision = self._to_provision = set()
193 reserved = set(self._blacklist)
194 for guid, node in self._elements.iteritems():
195 if isinstance(node, self._node.Node) and node._node_id is not None:
196 reserved.add(node._node_id)
199 # look for perfectly defined nodes
200 # (ie: those with only one candidate)
201 for guid, node in self._elements.iteritems():
202 if isinstance(node, self._node.Node) and node._node_id is None:
203 # Try existing nodes first
204 # If we have only one candidate, simply use it
205 candidates = node.find_candidates(
206 filter_slice_id = self.slice_id)
207 candidates -= reserved
208 if len(candidates) == 1:
209 node_id = iter(candidates).next()
210 node.assign_node_id(node_id)
211 reserved.add(node_id)
213 # Try again including unassigned nodes
214 candidates = node.find_candidates()
215 candidates -= reserved
216 if len(candidates) > 1:
218 if len(candidates) == 1:
219 node_id = iter(candidates).next()
220 node.assign_node_id(node_id)
221 to_provision.add(node_id)
222 reserved.add(node_id)
224 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
225 node.make_filter_description())
227 # Now do the backtracking search for a suitable solution
228 # First with existing slice nodes
231 for guid, node in self._elements.iteritems():
232 if isinstance(node, self._node.Node) and node._node_id is None:
233 # Try existing nodes first
234 # If we have only one candidate, simply use it
235 candidates = node.find_candidates(
236 filter_slice_id = self.slice_id)
237 candidates -= reserved
238 reqs.append(candidates)
243 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
246 solution = resourcealloc.alloc(reqs)
247 except resourcealloc.ResourceAllocationError:
248 # Failed, try again with all nodes
251 candidates = node.find_candidates()
252 candidates -= reserved
253 reqs.append(candidates)
255 solution = resourcealloc.alloc(reqs)
256 to_provision.update(solution)
259 for node, node_id in zip(nodes, solution):
260 node.assign_node_id(node_id)
262 def do_provisioning(self):
263 if self._to_provision:
264 # Add new nodes to the slice
265 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
266 new_nodes = list(set(cur_nodes) | self._to_provision)
267 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
270 self._just_provisioned = self._to_provision
271 del self._to_provision
273 def do_wait_nodes(self):
274 for guid, node in self._elements.iteritems():
275 if isinstance(node, self._node.Node):
276 # Just inject configuration stuff
277 node.home_path = "nepi-node-%s" % (guid,)
278 node.ident_path = self.sliceSSHKey
279 node.slicename = self.slicename
282 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
285 for guid, node in self._elements.iteritems():
286 if isinstance(node, self._node.Node):
287 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
289 node.wait_provisioning(
290 (20*60 if node._node_id in self._just_provisioned else 60)
293 self._logger.info("READY Node %s at %s", guid, node.hostname)
295 # Prepare dependency installer now
296 node.prepare_dependencies()
297 except self._node.UnresponsiveNodeError:
299 self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
301 # Mark all dead nodes (which are unresponsive) on the blacklist
303 for guid, node in self._elements.iteritems():
304 if isinstance(node, self._node.Node):
305 if not node.is_alive():
306 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
307 self._blacklist.add(node._node_id)
311 self._save_blacklist()
315 traceback.print_exc()
319 def do_spanning_deployment_plan(self):
320 # Create application groups by collecting all applications
321 # based on their hash - the hash should contain everything that
322 # defines them and the platform they're built
326 frozenset((app.depends or "").split(' ')),
327 frozenset((app.sources or "").split(' ')),
330 app.node.architecture,
331 app.node.operatingSystem,
335 depgroups = collections.defaultdict(list)
337 for element in self._elements.itervalues():
338 if isinstance(element, self._app.Dependency):
339 depgroups[dephash(element)].append(element)
340 elif isinstance(element, self._node.Node):
341 deps = element._yum_dependencies
343 depgroups[dephash(deps)].append(deps)
345 # Set up spanning deployment for those applications that
346 # have been deployed in several nodes.
347 for dh, group in depgroups.iteritems():
349 # Pick root (deterministically)
350 root = min(group, key=lambda app:app.node.hostname)
352 # Obtain all IPs in numeric format
353 # (which means faster distance computations)
355 dep._ip = socket.gethostbyname(dep.node.hostname)
356 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
359 # NOTE: the plan is an iterator
362 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
366 # Re-sign private key
368 tempprk, temppuk, tmppass = self._make_temp_private_key()
374 for slave, master in plan:
375 slave.set_master(master)
376 slave.install_keys(tempprk, temppuk, tmppass)
378 # We don't need the user's passphrase anymore
379 self.sliceSSHKeyPass = None
381 def _make_temp_private_key(self):
382 # Get the user's key's passphrase
383 if not self.sliceSSHKeyPass:
384 if 'SSH_ASKPASS' in os.environ:
385 proc = subprocess.Popen(
386 [ os.environ['SSH_ASKPASS'],
387 "Please type the passphrase for the %s SSH identity file. "
388 "The passphrase will be used to re-cipher the identity file with "
389 "a random 256-bit key for automated chain deployment on the "
390 "%s PlanetLab slice" % (
391 os.path.basename(self.sliceSSHKey),
394 stdin = open("/dev/null"),
395 stdout = subprocess.PIPE,
396 stderr = subprocess.PIPE)
397 out,err = proc.communicate()
398 self.sliceSSHKeyPass = out.strip()
400 if not self.sliceSSHKeyPass:
403 # Create temporary key files
404 prk = tempfile.NamedTemporaryFile(
405 dir = self.root_directory,
406 prefix = "pl_deploy_tmpk_",
409 puk = tempfile.NamedTemporaryFile(
410 dir = self.root_directory,
411 prefix = "pl_deploy_tmpk_",
414 # Create secure 256-bits temporary passphrase
415 passphrase = ''.join(map(chr,[rng.randint(0,255)
416 for rng in (random.SystemRandom(),)
417 for i in xrange(32)] )).encode("hex")
420 oprk = open(self.sliceSSHKey, "rb")
421 opuk = open(self.sliceSSHKey+".pub", "rb")
422 shutil.copymode(oprk.name, prk.name)
423 shutil.copymode(opuk.name, puk.name)
424 shutil.copyfileobj(oprk, prk)
425 shutil.copyfileobj(opuk, puk)
431 # A descriptive comment
432 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
435 proc = subprocess.Popen(
438 "-P", self.sliceSSHKeyPass,
441 stdout = subprocess.PIPE,
442 stderr = subprocess.PIPE,
443 stdin = subprocess.PIPE
445 out, err = proc.communicate()
448 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
454 # Change comment on public key
455 puklines = puk.readlines()
456 puklines[0] = puklines[0].split(' ')
457 puklines[0][-1] = comment+'\n'
458 puklines[0] = ' '.join(puklines[0])
461 puk.writelines(puklines)
465 return prk, puk, passphrase
467 def set(self, guid, name, value, time = TIME_NOW):
468 super(TestbedController, self).set(guid, name, value, time)
469 # TODO: take on account schedule time for the task
470 element = self._elements[guid]
472 setattr(element, name, value)
474 if hasattr(element, 'refresh'):
475 # invoke attribute refresh hook
478 def get(self, guid, name, time = TIME_NOW):
479 value = super(TestbedController, self).get(guid, name, time)
480 # TODO: take on account schedule time for the task
481 factory_id = self._create[guid]
482 factory = self._factories[factory_id]
483 element = self._elements.get(guid)
485 return getattr(element, name)
486 except (KeyError, AttributeError):
489 def get_address(self, guid, index, attribute='Address'):
493 iface = self._elements.get(guid)
494 if iface and index == 0:
495 if attribute == 'Address':
497 elif attribute == 'NetPrefix':
498 return iface.netprefix
499 elif attribute == 'Broadcast':
500 return iface.broadcast
502 # if all else fails, query box
503 return super(TestbedController, self).get_address(guid, index, attribute)
505 def action(self, time, guid, action):
506 raise NotImplementedError
509 for trace in self._traces.itervalues():
512 def invokeif(action, testbed, guid):
513 element = self._elements[guid]
514 if hasattr(element, action):
515 getattr(element, action)()
517 self._do_in_factory_order(
518 functools.partial(invokeif, 'cleanup'),
519 metadata.shutdown_order)
521 self._do_in_factory_order(
522 functools.partial(invokeif, 'destroy'),
523 metadata.shutdown_order)
525 self._elements.clear()
528 def trace(self, guid, trace_id, attribute='value'):
529 app = self._elements[guid]
531 if attribute == 'value':
532 path = app.sync_trace(self.home_directory, trace_id)
539 elif attribute == 'path':
540 content = app.remote_trace_path(trace_id)
545 def follow_trace(self, trace_id, trace):
546 self._traces[trace_id] = trace
549 # Create and connect do not perform any real tasks against
550 # the nodes, it only sets up the object hierarchy,
551 # so we can run them normally
553 self.do_connect_init()
554 self.do_connect_compl()
556 # Manually recover nodes, to mark dependencies installed
557 # and clean up mutable attributes
558 self._do_in_factory_order(
559 lambda self, guid : self._elements[guid].recover(),
564 # Assign nodes - since we're working off exeucte XML, nodes
565 # have specific hostnames assigned and we don't need to do
566 # real assignment, only find out node ids and check liveliness
567 self.do_resource_discovery(recover = True)
570 # Pre/post configure, however, tends to set up tunnels
571 # Execute configuration steps only for those object
572 # kinds that do not have side effects
574 # Do the ones without side effects,
575 # including nodes that need to set up home
576 # folders and all that
577 self._do_in_factory_order(
578 "preconfigure_function",
581 Parallel(metadata.NODE),
585 # Tunnels require a home path that is configured
586 # at this step. Since we cannot run the step itself,
587 # we need to inject this homepath ourselves
588 for guid, element in self._elements.iteritems():
589 if isinstance(element, self._interfaces.TunIface):
590 element._home_path = "tun-%s" % (guid,)
592 # Manually recover tunnels, applications and
593 # netpipes, negating the side effects
594 self._do_in_factory_order(
595 lambda self, guid : self._elements[guid].recover(),
597 Parallel(metadata.TAPIFACE),
598 Parallel(metadata.TUNIFACE),
600 Parallel(metadata.NEPIDEPENDENCY),
601 Parallel(metadata.NS3DEPENDENCY),
602 Parallel(metadata.DEPENDENCY),
603 Parallel(metadata.APPLICATION),
606 # Tunnels are not harmed by configuration after
607 # recovery, and some attributes get set this way
608 # like external_iface
609 self._do_in_factory_order(
610 "preconfigure_function",
612 Parallel(metadata.TAPIFACE),
613 Parallel(metadata.TUNIFACE),
616 # Post-do the ones without side effects
617 self._do_in_factory_order(
618 "configure_function",
621 Parallel(metadata.NODE),
623 Parallel(metadata.TAPIFACE),
624 Parallel(metadata.TUNIFACE),
627 # There are no required prestart steps
628 # to call upon recovery, so we're done
631 def _make_generic(self, parameters, kind):
632 app = kind(self.plapi)
634 # Note: there is 1-to-1 correspondence between attribute names
635 # If that changes, this has to change as well
636 for attr,val in parameters.iteritems():
637 setattr(app, attr, val)
641 def _make_node(self, parameters):
642 node = self._make_generic(parameters, self._node.Node)
643 node.enable_cleanup = self.dedicatedSlice
646 def _make_node_iface(self, parameters):
647 return self._make_generic(parameters, self._interfaces.NodeIface)
649 def _make_tun_iface(self, parameters):
650 return self._make_generic(parameters, self._interfaces.TunIface)
652 def _make_tap_iface(self, parameters):
653 return self._make_generic(parameters, self._interfaces.TapIface)
655 def _make_netpipe(self, parameters):
656 return self._make_generic(parameters, self._interfaces.NetPipe)
658 def _make_internet(self, parameters):
659 return self._make_generic(parameters, self._interfaces.Internet)
661 def _make_application(self, parameters):
662 return self._make_generic(parameters, self._app.Application)
664 def _make_dependency(self, parameters):
665 return self._make_generic(parameters, self._app.Dependency)
667 def _make_nepi_dependency(self, parameters):
668 return self._make_generic(parameters, self._app.NepiDependency)
670 def _make_ns3_dependency(self, parameters):
671 return self._make_generic(parameters, self._app.NS3Dependency)
673 def _make_tun_filter(self, parameters):
674 return self._make_generic(parameters, self._interfaces.TunFilter)