2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
32 class TempKeyError(Exception):
35 class TestbedController(testbed_impl.TestbedController):
37 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
38 self._home_directory = None
42 import node, interfaces, application, multicast
44 self._interfaces = interfaces
45 self._app = application
46 self._multicast = multicast
48 self._blacklist = set()
49 self._just_provisioned = set()
51 self._load_blacklist()
53 self._logger = logging.getLogger('nepi.testbeds.planetlab')
55 self.recovering = False
58 def home_directory(self):
59 return self._home_directory
63 if not hasattr(self, '_plapi'):
67 self._plapi = plcapi.PLCAPI(
68 username = self.authUser,
69 password = self.authString,
70 hostname = self.plcHost,
71 urlpattern = self.plcUrl
74 # anonymous access - may not be enough for much
75 self._plapi = plcapi.PLCAPI()
80 if not hasattr(self, '_slice_id'):
81 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
83 self._slice_id = slices[0]['slice_id']
85 # If it wasn't found, don't remember this failure, keep trying
91 if not hasattr(self, '_vsys_vnet'):
92 self._vsys_vnet = plutil.getVnet(
95 return self._vsys_vnet
97 def _load_blacklist(self):
98 blpath = environ.homepath('plblacklist')
101 bl = open(blpath, "r")
103 self._blacklist = set()
107 self._blacklist = set(
109 map(str.strip, bl.readlines())
115 def _save_blacklist(self):
116 blpath = environ.homepath('plblacklist')
117 bl = open(blpath, "w")
120 map('%s\n'.__mod__, self._blacklist))
125 self._home_directory = self._attributes.\
126 get_attribute_value("homeDirectory")
127 self.slicename = self._attributes.\
128 get_attribute_value("slice")
129 self.authUser = self._attributes.\
130 get_attribute_value("authUser")
131 self.authString = self._attributes.\
132 get_attribute_value("authPass")
133 self.sliceSSHKey = self._attributes.\
134 get_attribute_value("sliceSSHKey")
135 self.sliceSSHKeyPass = None
136 self.plcHost = self._attributes.\
137 get_attribute_value("plcHost")
138 self.plcUrl = self._attributes.\
139 get_attribute_value("plcUrl")
140 self.logLevel = self._attributes.\
141 get_attribute_value("plLogLevel")
142 self.tapPortBase = self._attributes.\
143 get_attribute_value("tapPortBase")
144 self.p2pDeployment = self._attributes.\
145 get_attribute_value("p2pDeployment")
146 self.dedicatedSlice = self._attributes.\
147 get_attribute_value("dedicatedSlice")
149 if not self.slicename:
150 raise RuntimeError, "Slice not set"
151 if not self.authUser:
152 raise RuntimeError, "PlanetLab account username not set"
153 if not self.authString:
154 raise RuntimeError, "PlanetLab account passphrase not set"
155 if not self.sliceSSHKey:
156 raise RuntimeError, "PlanetLab account key not specified"
157 if not os.path.exists(self.sliceSSHKey):
158 raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
160 self._logger.setLevel(getattr(logging,self.logLevel))
162 super(TestbedController, self).do_setup()
164 def do_post_asynclaunch(self, guid):
165 # Dependencies were launched asynchronously,
167 dep = self._elements[guid]
168 if isinstance(dep, self._app.Dependency):
169 dep.async_setup_wait()
171 # Two-phase configuration for asynchronous launch
172 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
173 do_poststep_configure = staticmethod(do_post_asynclaunch)
175 def do_preconfigure(self):
177 # Perform resource discovery if we don't have
178 # specific resources assigned yet
179 self.do_resource_discovery()
181 # Create PlanetLab slivers
182 self.do_provisioning()
185 # Wait for provisioning
190 except self._node.UnresponsiveNodeError:
194 if self.p2pDeployment:
195 # Plan application deployment
196 self.do_spanning_deployment_plan()
198 # Configure elements per XML data
199 super(TestbedController, self).do_preconfigure()
201 def do_resource_discovery(self, recover = False):
202 to_provision = self._to_provision = set()
204 reserved = set(self._blacklist)
205 for guid, node in self._elements.iteritems():
206 if isinstance(node, self._node.Node) and node._node_id is not None:
207 reserved.add(node._node_id)
210 # look for perfectly defined nodes
211 # (ie: those with only one candidate)
212 reserve_lock = threading.RLock()
213 def assignifunique(guid, node):
214 # Try existing nodes first
215 # If we have only one candidate, simply use it
216 candidates = node.find_candidates(
217 filter_slice_id = self.slice_id)
220 reserve_lock.acquire()
222 candidates -= reserved
223 if len(candidates) == 1:
224 node_id = iter(candidates).next()
225 reserved.add(node_id)
227 # Try again including unassigned nodes
228 reserve_lock.release()
230 candidates = node.find_candidates()
232 reserve_lock.acquire()
233 candidates -= reserved
234 if len(candidates) > 1:
236 if len(candidates) == 1:
237 node_id = iter(candidates).next()
238 to_provision.add(node_id)
239 reserved.add(node_id)
241 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
242 node.make_filter_description())
244 reserve_lock.release()
246 if node_id is not None:
247 node.assign_node_id(node_id)
249 runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
251 for guid, node in self._elements.iteritems():
252 if isinstance(node, self._node.Node) and node._node_id is None:
253 runner.put(assignifunique, guid, node)
256 # Now do the backtracking search for a suitable solution
257 # First with existing slice nodes
260 def genreqs(node, filter_slice_id=None):
261 # Try existing nodes first
262 # If we have only one candidate, simply use it
263 candidates = node.find_candidates(
264 filter_slice_id = filter_slice_id)
265 candidates -= reserved
266 reqs.append(candidates)
268 for guid, node in self._elements.iteritems():
269 if isinstance(node, self._node.Node) and node._node_id is None:
270 runner.put(genreqs, node, self.slice_id)
275 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
277 def pickbest(fullset, nreq, node=nodes[0]):
278 if len(fullset) > nreq:
279 fullset = zip(node.rate_nodes(fullset),fullset)
280 fullset.sort(reverse=True)
282 return set(map(operator.itemgetter(1),fullset))
287 solution = resourcealloc.alloc(reqs, sample=pickbest)
288 except resourcealloc.ResourceAllocationError:
289 # Failed, try again with all nodes
292 runner.put(genreqs, node)
294 solution = resourcealloc.alloc(reqs, sample=pickbest)
295 to_provision.update(solution)
298 for node, node_id in zip(nodes, solution):
299 runner.put(node.assign_node_id, node_id)
302 def do_provisioning(self):
303 if self._to_provision:
304 # Add new nodes to the slice
305 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
306 new_nodes = list(set(cur_nodes) | self._to_provision)
307 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
310 self._just_provisioned = self._to_provision
311 del self._to_provision
313 def do_wait_nodes(self):
314 for guid, node in self._elements.iteritems():
315 if isinstance(node, self._node.Node):
316 # Just inject configuration stuff
317 node.home_path = "nepi-node-%s" % (guid,)
318 node.ident_path = self.sliceSSHKey
319 node.slicename = self.slicename
322 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
325 runner = ParallelRun(maxthreads=64, maxqueue=1)
327 def waitforit(guid, node):
329 node.wait_provisioning(
330 (20*60 if node._node_id in self._just_provisioned else 60)
333 self._logger.info("READY Node %s at %s", guid, node.hostname)
335 # Prepare dependency installer now
336 node.prepare_dependencies()
341 for guid, node in self._elements.iteritems():
344 if isinstance(node, self._node.Node):
345 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
346 runner.put(waitforit, guid, node)
349 except self._node.UnresponsiveNodeError:
351 self._logger.warn("UNRESPONSIVE Nodes")
353 # Mark all dead nodes (which are unresponsive) on the blacklist
355 for guid, node in self._elements.iteritems():
356 if isinstance(node, self._node.Node):
357 if not node.is_alive():
358 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
359 self._blacklist.add(node._node_id)
363 self._save_blacklist()
367 traceback.print_exc()
371 def do_spanning_deployment_plan(self):
372 # Create application groups by collecting all applications
373 # based on their hash - the hash should contain everything that
374 # defines them and the platform they're built
378 frozenset((app.depends or "").split(' ')),
379 frozenset((app.sources or "").split(' ')),
382 app.node.architecture,
383 app.node.operatingSystem,
388 depgroups = collections.defaultdict(list)
390 for element in self._elements.itervalues():
391 if isinstance(element, self._app.Dependency):
392 depgroups[dephash(element)].append(element)
393 elif isinstance(element, self._node.Node):
394 deps = element._yum_dependencies
396 depgroups[dephash(deps)].append(deps)
398 # Set up spanning deployment for those applications that
399 # have been deployed in several nodes.
400 for dh, group in depgroups.iteritems():
402 # Pick root (deterministically)
403 root = min(group, key=lambda app:app.node.hostname)
405 # Obtain all IPs in numeric format
406 # (which means faster distance computations)
408 dep._ip = socket.gethostbyname(dep.node.hostname)
409 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
412 # NOTE: the plan is an iterator
415 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
419 # Re-sign private key
421 tempprk, temppuk, tmppass = self._make_temp_private_key()
427 for slave, master in plan:
428 slave.set_master(master)
429 slave.install_keys(tempprk, temppuk, tmppass)
431 # We don't need the user's passphrase anymore
432 self.sliceSSHKeyPass = None
434 def _make_temp_private_key(self):
435 # Get the user's key's passphrase
436 if not self.sliceSSHKeyPass:
437 if 'SSH_ASKPASS' in os.environ:
438 proc = subprocess.Popen(
439 [ os.environ['SSH_ASKPASS'],
440 "Please type the passphrase for the %s SSH identity file. "
441 "The passphrase will be used to re-cipher the identity file with "
442 "a random 256-bit key for automated chain deployment on the "
443 "%s PlanetLab slice" % (
444 os.path.basename(self.sliceSSHKey),
447 stdin = open("/dev/null"),
448 stdout = subprocess.PIPE,
449 stderr = subprocess.PIPE)
450 out,err = proc.communicate()
451 self.sliceSSHKeyPass = out.strip()
453 if not self.sliceSSHKeyPass:
456 # Create temporary key files
457 prk = tempfile.NamedTemporaryFile(
458 dir = self.root_directory,
459 prefix = "pl_deploy_tmpk_",
462 puk = tempfile.NamedTemporaryFile(
463 dir = self.root_directory,
464 prefix = "pl_deploy_tmpk_",
467 # Create secure 256-bits temporary passphrase
468 passphrase = os.urandom(32).encode("hex")
471 oprk = open(self.sliceSSHKey, "rb")
472 opuk = open(self.sliceSSHKey+".pub", "rb")
473 shutil.copymode(oprk.name, prk.name)
474 shutil.copymode(opuk.name, puk.name)
475 shutil.copyfileobj(oprk, prk)
476 shutil.copyfileobj(opuk, puk)
482 # A descriptive comment
483 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
486 proc = subprocess.Popen(
489 "-P", self.sliceSSHKeyPass,
492 stdout = subprocess.PIPE,
493 stderr = subprocess.PIPE,
494 stdin = subprocess.PIPE
496 out, err = proc.communicate()
499 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
505 # Change comment on public key
506 puklines = puk.readlines()
507 puklines[0] = puklines[0].split(' ')
508 puklines[0][-1] = comment+'\n'
509 puklines[0] = ' '.join(puklines[0])
512 puk.writelines(puklines)
516 return prk, puk, passphrase
518 def set(self, guid, name, value, time = TIME_NOW):
519 super(TestbedController, self).set(guid, name, value, time)
520 # TODO: take on account schedule time for the task
521 element = self._elements[guid]
524 setattr(element, name, value)
526 # We ignore these errors while recovering.
527 # Some attributes are immutable, and setting
528 # them is necessary (to recover the state), but
529 # some are not (they throw an exception).
530 if not self.recovering:
533 if hasattr(element, 'refresh'):
534 # invoke attribute refresh hook
537 def get(self, guid, name, time = TIME_NOW):
538 value = super(TestbedController, self).get(guid, name, time)
539 # TODO: take on account schedule time for the task
540 factory_id = self._create[guid]
541 factory = self._factories[factory_id]
542 element = self._elements.get(guid)
544 return getattr(element, name)
545 except (KeyError, AttributeError):
548 def get_address(self, guid, index, attribute='Address'):
552 iface = self._elements.get(guid)
553 if iface and index == 0:
554 if attribute == 'Address':
556 elif attribute == 'NetPrefix':
557 return iface.netprefix
558 elif attribute == 'Broadcast':
559 return iface.broadcast
561 # if all else fails, query box
562 return super(TestbedController, self).get_address(guid, index, attribute)
564 def action(self, time, guid, action):
565 raise NotImplementedError
568 for trace in self._traces.itervalues():
571 def invokeif(action, testbed, guid):
572 element = self._elements[guid]
573 if hasattr(element, action):
574 getattr(element, action)()
576 self._do_in_factory_order(
577 functools.partial(invokeif, 'cleanup'),
578 metadata.shutdown_order)
580 self._do_in_factory_order(
581 functools.partial(invokeif, 'destroy'),
582 metadata.shutdown_order)
584 self._elements.clear()
587 def trace(self, guid, trace_id, attribute='value'):
588 elem = self._elements[guid]
590 if attribute == 'value':
591 path = elem.sync_trace(self.home_directory, trace_id)
598 elif attribute == 'path':
599 content = elem.remote_trace_path(trace_id)
600 elif attribute == 'name':
601 content = elem.remote_trace_name(trace_id)
606 def follow_trace(self, trace_id, trace):
607 self._traces[trace_id] = trace
611 # An internal flag, so we know to behave differently in
612 # a few corner cases.
613 self.recovering = True
615 # Create and connect do not perform any real tasks against
616 # the nodes, it only sets up the object hierarchy,
617 # so we can run them normally
619 self.do_connect_init()
620 self.do_connect_compl()
622 # Manually recover nodes, to mark dependencies installed
623 # and clean up mutable attributes
624 self._do_in_factory_order(
625 lambda self, guid : self._elements[guid].recover(),
630 # Assign nodes - since we're working off exeucte XML, nodes
631 # have specific hostnames assigned and we don't need to do
632 # real assignment, only find out node ids and check liveliness
633 self.do_resource_discovery(recover = True)
636 # Pre/post configure, however, tends to set up tunnels
637 # Execute configuration steps only for those object
638 # kinds that do not have side effects
640 # Do the ones without side effects,
641 # including nodes that need to set up home
642 # folders and all that
643 self._do_in_factory_order(
644 "preconfigure_function",
647 Parallel(metadata.NODE),
651 # Tunnels require a home path that is configured
652 # at this step. Since we cannot run the step itself,
653 # we need to inject this homepath ourselves
654 for guid, element in self._elements.iteritems():
655 if isinstance(element, self._interfaces.TunIface):
656 element._home_path = "tun-%s" % (guid,)
658 # Manually recover tunnels, applications and
659 # netpipes, negating the side effects
660 self._do_in_factory_order(
661 lambda self, guid : self._elements[guid].recover(),
663 Parallel(metadata.TAPIFACE),
664 Parallel(metadata.TUNIFACE),
666 Parallel(metadata.NEPIDEPENDENCY),
667 Parallel(metadata.NS3DEPENDENCY),
668 Parallel(metadata.DEPENDENCY),
669 Parallel(metadata.APPLICATION),
672 # Tunnels are not harmed by configuration after
673 # recovery, and some attributes get set this way
674 # like external_iface
675 self._do_in_factory_order(
676 "preconfigure_function",
678 Parallel(metadata.TAPIFACE),
679 Parallel(metadata.TUNIFACE),
682 # Post-do the ones without side effects
683 self._do_in_factory_order(
684 "configure_function",
687 Parallel(metadata.NODE),
689 Parallel(metadata.TAPIFACE),
690 Parallel(metadata.TUNIFACE),
693 # There are no required prestart steps
694 # to call upon recovery, so we're done
696 self.recovering = True
698 def _make_generic(self, parameters, kind):
699 app = kind(self.plapi)
700 app.testbed = weakref.ref(self)
702 # Note: there is 1-to-1 correspondence between attribute names
703 # If that changes, this has to change as well
704 for attr,val in parameters.iteritems():
706 setattr(app, attr, val)
708 # We ignore these errors while recovering.
709 # Some attributes are immutable, and setting
710 # them is necessary (to recover the state), but
711 # some are not (they throw an exception).
712 if not self.recovering:
717 def _make_node(self, parameters):
718 node = self._make_generic(parameters, self._node.Node)
719 node.enable_cleanup = self.dedicatedSlice
722 def _make_node_iface(self, parameters):
723 return self._make_generic(parameters, self._interfaces.NodeIface)
725 def _make_tun_iface(self, parameters):
726 return self._make_generic(parameters, self._interfaces.TunIface)
728 def _make_tap_iface(self, parameters):
729 return self._make_generic(parameters, self._interfaces.TapIface)
731 def _make_netpipe(self, parameters):
732 return self._make_generic(parameters, self._interfaces.NetPipe)
734 def _make_internet(self, parameters):
735 return self._make_generic(parameters, self._interfaces.Internet)
737 def _make_application(self, parameters):
738 return self._make_generic(parameters, self._app.Application)
740 def _make_dependency(self, parameters):
741 return self._make_generic(parameters, self._app.Dependency)
743 def _make_nepi_dependency(self, parameters):
744 return self._make_generic(parameters, self._app.NepiDependency)
746 def _make_ns3_dependency(self, parameters):
747 return self._make_generic(parameters, self._app.NS3Dependency)
749 def _make_tun_filter(self, parameters):
750 return self._make_generic(parameters, self._interfaces.TunFilter)
752 def _make_class_queue_filter(self, parameters):
753 return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
755 def _make_tos_queue_filter(self, parameters):
756 return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
758 def _make_multicast_forwarder(self, parameters):
759 return self._make_generic(parameters, self._multicast.MulticastForwarder)
761 def _make_multicast_announcer(self, parameters):
762 return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
764 def _make_multicast_router(self, parameters):
765 return self._make_generic(parameters, self._multicast.MulticastRouter)