1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID, TESTBED_VERSION
4 from nepi.core import testbed_impl
5 from nepi.core.metadata import Parallel
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
31 class TempKeyError(Exception):
34 class TestbedController(testbed_impl.TestbedController):
36 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
37 self._home_directory = None
41 import node, interfaces, application, multicast
43 self._interfaces = interfaces
44 self._app = application
45 self._multicast = multicast
47 self._blacklist = set()
48 self._just_provisioned = set()
50 self._load_blacklist()
55 self._vsys_vnet = None
57 self._logger = logging.getLogger('nepi.testbeds.planetlab')
59 self.recovering = False
62 def home_directory(self):
63 return self._home_directory
69 self._plcapi = plcapi.plcapi(
79 if not self._sliceapi:
81 self._sliceapi = self.plcapi
83 from nepi.util import sfiapi
84 self._sliceapi = sfiapi.sfiapi(self.slice_id)
89 if not self._slice_id:
90 self._slice_id = self.sliceapi.GetSliceId(self.slicename)
95 if not self._vsys_vnet:
96 self._vsys_vnet = self.sliceapi.GetSliceVnetSysTag(self.slicename)
97 return self._vsys_vnet
99 def _load_blacklist(self):
100 blpath = environ.homepath('plblacklist')
103 bl = open(blpath, "r")
105 self._blacklist = set()
109 self._blacklist = set(
110 map(str.strip, bl.readlines())
115 def _save_blacklist(self):
116 blpath = environ.homepath('plblacklist')
117 bl = open(blpath, "w")
120 map('%s\n'.__mod__, self._blacklist))
125 self._home_directory = self._attributes.\
126 get_attribute_value("homeDirectory")
127 self.slicename = self._attributes.\
128 get_attribute_value("slice")
129 self.authUser = self._attributes.\
130 get_attribute_value("authUser")
131 self.authString = self._attributes.\
132 get_attribute_value("authPass")
133 self.sliceSSHKey = self._attributes.\
134 get_attribute_value("sliceSSHKey")
135 self.sliceSSHKeyPass = None
136 self.plcHost = self._attributes.\
137 get_attribute_value("plcHost")
138 self.plcUrl = self._attributes.\
139 get_attribute_value("plcUrl")
140 self.logLevel = self._attributes.\
141 get_attribute_value("plLogLevel")
142 self.tapPortBase = self._attributes.\
143 get_attribute_value("tapPortBase")
144 self.p2pDeployment = self._attributes.\
145 get_attribute_value("p2pDeployment")
146 self.cleanProc = self._attributes.\
147 get_attribute_value("cleanProc")
148 self.cleanHome = self._attributes.\
149 get_attribute_value("cleanHome")
150 self.sfa = self._attributes.\
151 get_attribute_value("sfa")
153 self._slice_id = self._attributes.\
154 get_attribute_value("sliceHrn")
156 if not self.slicename:
157 raise RuntimeError, "Slice not set"
158 if not self.authUser:
159 raise RuntimeError, "PlanetLab account username not set"
160 if not self.authString:
161 raise RuntimeError, "PlanetLab account passphrase not set"
162 if not self.sliceSSHKey:
163 raise RuntimeError, "PlanetLab account key not specified"
164 if not os.path.exists(self.sliceSSHKey):
165 raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
167 self._logger.setLevel(getattr(logging,self.logLevel))
169 super(TestbedController, self).do_setup()
171 def do_post_asynclaunch(self, guid):
172 # Dependencies were launched asynchronously,
174 dep = self._elements[guid]
175 if isinstance(dep, self._app.Dependency):
176 dep.async_setup_wait()
178 # Two-phase configuration for asynchronous launch
179 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
180 do_poststep_configure = staticmethod(do_post_asynclaunch)
182 def do_preconfigure(self):
184 # Perform resource discovery if we don't have
185 # specific resources assigned yet
186 self.do_resource_discovery()
188 # Create PlanetLab slivers
189 self.do_provisioning()
192 # Wait for provisioning
197 except self._node.UnresponsiveNodeError:
201 if self.p2pDeployment:
202 # Plan application deployment
203 self.do_spanning_deployment_plan()
205 # Configure elements per XML data
206 super(TestbedController, self).do_preconfigure()
208 def do_resource_discovery(self, recover = False):
209 to_provision = self._to_provision = set()
211 reserved = set(self._blacklist)
212 for guid, node in self._elements.iteritems():
213 if isinstance(node, self._node.Node) and node._node_id is not None:
214 reserved.add(node.hostname)
217 # look for perfectly defined nodes
218 # (ie: those with only one candidate)
219 reserve_lock = threading.RLock()
220 def assignifunique(guid, node):
221 # Try existing nodes first
222 # If we have only one candidate, simply use it
223 candidates = node.find_candidates(
224 filter_slice_id = self.slice_id)
227 candidate_hosts = set(candidates.keys() if candidates else [])
228 reserve_lock.acquire()
230 candidate_hosts -= reserved
231 if len(candidate_hosts) == 1:
232 hostname = iter(candidate_hosts).next()
233 node_id = candidates[hostname]
234 reserved.add(hostname)
235 elif not candidate_hosts:
236 # Try again including unassigned nodes
237 reserve_lock.release()
239 candidates = node.find_candidates()
241 reserve_lock.acquire()
242 candidate_hosts = set(candidates.keys() if candidates else [])
243 candidate_hosts -= reserved
244 if len(candidate_hosts) > 1:
246 if len(candidate_hosts) == 1:
247 hostname = iter(candidate_hosts).next()
248 node_id = candidates[hostname]
249 to_provision.add(node_id)
250 reserved.add(hostname)
252 raise RuntimeError, "Cannot assign resources for node %s, no candidates with %s" % (guid,
253 node.make_filter_description())
255 reserve_lock.release()
257 if node_id is not None:
258 node.assign_node_id(node_id)
260 runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
262 for guid, node in self._elements.iteritems():
263 if isinstance(node, self._node.Node) and node._node_id is None:
264 runner.put(assignifunique, guid, node)
267 # Now do the backtracking search for a suitable solution
268 # First with existing slice nodes
271 def genreqs(node, filter_slice_id=None):
272 # Try existing nodes first
273 # If we have only one candidate, simply use it
274 candidates = node.find_candidates(
275 filter_slice_id = filter_slice_id)
277 if candidates.has_key(r):
279 reqs.append(candidates.values())
281 for guid, node in self._elements.iteritems():
282 if isinstance(node, self._node.Node) and node._node_id is None:
283 runner.put(genreqs, node, self.slice_id)
288 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
290 def pickbest(fullset, nreq, node=nodes[0]):
291 if len(fullset) > nreq:
292 fullset = zip(node.rate_nodes(fullset),fullset)
293 fullset.sort(reverse=True)
295 return set(map(operator.itemgetter(1),fullset))
300 solution = resourcealloc.alloc(reqs, sample=pickbest)
301 except resourcealloc.ResourceAllocationError:
302 # Failed, try again with all nodes
305 runner.put(genreqs, node)
307 solution = resourcealloc.alloc(reqs, sample=pickbest)
308 to_provision.update(solution)
311 for node, node_id in zip(nodes, solution):
312 runner.put(node.assign_node_id, node_id)
315 def do_provisioning(self):
316 if self._to_provision:
317 # Add new nodes to the slice
318 cur_nodes = self.sliceapi.GetSliceNodes(self.slice_id)
319 new_nodes = list(set(cur_nodes) | self._to_provision)
320 self.sliceapi.AddSliceNodes(self.slice_id, nodes=new_nodes)
323 self._just_provisioned = self._to_provision
324 del self._to_provision
326 def do_wait_nodes(self):
327 for guid, node in self._elements.iteritems():
328 if isinstance(node, self._node.Node):
329 # Just inject configuration stuff
330 node.home_path = "nepi-node-%s" % (guid,)
331 node.ident_path = self.sliceSSHKey
332 node.slicename = self.slicename
335 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
338 runner = ParallelRun(maxthreads=64, maxqueue=1)
340 def waitforit(guid, node):
342 node.wait_provisioning(
343 (20*60 if node._node_id in self._just_provisioned else 60)
346 self._logger.info("READY Node %s at %s", guid, node.hostname)
348 # Prepare dependency installer now
349 node.prepare_dependencies()
354 for guid, node in self._elements.iteritems():
357 if isinstance(node, self._node.Node):
358 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
359 runner.put(waitforit, guid, node)
362 except self._node.UnresponsiveNodeError:
364 self._logger.warn("UNRESPONSIVE Nodes")
366 # Mark all dead nodes (which are unresponsive) on the blacklist
368 for guid, node in self._elements.iteritems():
369 if isinstance(node, self._node.Node):
370 if not node.is_alive():
371 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
372 self._blacklist.add(node.hostname)
376 self._save_blacklist()
380 traceback.print_exc()
384 def do_spanning_deployment_plan(self):
385 # Create application groups by collecting all applications
386 # based on their hash - the hash should contain everything that
387 # defines them and the platform they're built
391 frozenset((app.depends or "").split(' ')),
392 frozenset((app.sources or "").split(' ')),
395 app.node.architecture,
396 app.node.operatingSystem,
401 depgroups = collections.defaultdict(list)
403 for element in self._elements.itervalues():
404 if isinstance(element, self._app.Dependency):
405 depgroups[dephash(element)].append(element)
406 elif isinstance(element, self._node.Node):
407 deps = element._yum_dependencies
409 depgroups[dephash(deps)].append(deps)
411 # Set up spanning deployment for those applications that
412 # have been deployed in several nodes.
413 for dh, group in depgroups.iteritems():
415 # Pick root (deterministically)
416 root = min(group, key=lambda app:app.node.hostname)
418 # Obtain all IPs in numeric format
419 # (which means faster distance computations)
421 dep._ip = socket.gethostbyname(dep.node.hostname)
422 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
425 # NOTE: the plan is an iterator
428 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
432 # Re-sign private key
434 tempprk, temppuk, tmppass = self._make_temp_private_key()
440 for slave, master in plan:
441 slave.set_master(master)
442 slave.install_keys(tempprk, temppuk, tmppass)
444 # We don't need the user's passphrase anymore
445 self.sliceSSHKeyPass = None
447 def _make_temp_private_key(self):
448 # Get the user's key's passphrase
449 if not self.sliceSSHKeyPass:
450 if 'SSH_ASKPASS' in os.environ:
451 proc = subprocess.Popen(
452 [ os.environ['SSH_ASKPASS'],
453 "Please type the passphrase for the %s SSH identity file. "
454 "The passphrase will be used to re-cipher the identity file with "
455 "a random 256-bit key for automated chain deployment on the "
456 "%s PlanetLab slice" % (
457 os.path.basename(self.sliceSSHKey),
460 stdin = open("/dev/null"),
461 stdout = subprocess.PIPE,
462 stderr = subprocess.PIPE)
463 out,err = proc.communicate()
464 self.sliceSSHKeyPass = out.strip()
466 if not self.sliceSSHKeyPass:
469 # Create temporary key files
470 prk = tempfile.NamedTemporaryFile(
471 dir = self.root_directory,
472 prefix = "pl_deploy_tmpk_",
475 puk = tempfile.NamedTemporaryFile(
476 dir = self.root_directory,
477 prefix = "pl_deploy_tmpk_",
480 # Create secure 256-bits temporary passphrase
481 passphrase = os.urandom(32).encode("hex")
484 oprk = open(self.sliceSSHKey, "rb")
485 opuk = open(self.sliceSSHKey+".pub", "rb")
486 shutil.copymode(oprk.name, prk.name)
487 shutil.copymode(opuk.name, puk.name)
488 shutil.copyfileobj(oprk, prk)
489 shutil.copyfileobj(opuk, puk)
495 # A descriptive comment
496 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
499 proc = subprocess.Popen(
502 "-P", self.sliceSSHKeyPass,
505 stdout = subprocess.PIPE,
506 stderr = subprocess.PIPE,
507 stdin = subprocess.PIPE
509 out, err = proc.communicate()
512 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
518 # Change comment on public key
519 puklines = puk.readlines()
520 puklines[0] = puklines[0].split(' ')
521 puklines[0][-1] = comment+'\n'
522 puklines[0] = ' '.join(puklines[0])
525 puk.writelines(puklines)
529 return prk, puk, passphrase
531 def set(self, guid, name, value, time = TIME_NOW):
532 super(TestbedController, self).set(guid, name, value, time)
533 # TODO: take on account schedule time for the task
534 element = self._elements[guid]
543 setattr(element, name, value)
545 # We ignore these errors while recovering.
546 # Some attributes are immutable, and setting
547 # them is necessary (to recover the state), but
548 # some are not (they throw an exception).
549 if not self.recovering:
552 if hasattr(element, 'refresh'):
553 # invoke attribute refresh hook
556 def get(self, guid, name, time = TIME_NOW):
557 value = super(TestbedController, self).get(guid, name, time)
558 # TODO: take on account schedule time for the task
559 factory_id = self._create[guid]
560 factory = self._factories[factory_id]
561 element = self._elements.get(guid)
563 return getattr(element, name)
564 except (KeyError, AttributeError):
567 def get_address(self, guid, index, attribute='Address'):
571 iface = self._elements.get(guid)
572 if iface and index == 0:
573 if attribute == 'Address':
575 elif attribute == 'NetPrefix':
576 return iface.netprefix
577 elif attribute == 'Broadcast':
578 return iface.broadcast
580 # if all else fails, query box
581 return super(TestbedController, self).get_address(guid, index, attribute)
583 def action(self, time, guid, action):
584 raise NotImplementedError
587 for trace in self._traces.itervalues():
590 def invokeif(action, testbed, guid):
591 element = self._elements[guid]
592 if hasattr(element, action):
593 getattr(element, action)()
595 self._do_in_factory_order(
596 functools.partial(invokeif, 'cleanup'),
597 metadata.shutdown_order)
599 self._do_in_factory_order(
600 functools.partial(invokeif, 'destroy'),
601 metadata.shutdown_order)
603 self._elements.clear()
606 def trace(self, guid, trace_id, attribute='value'):
607 elem = self._elements[guid]
609 if attribute == 'value':
610 path = elem.sync_trace(self.home_directory, trace_id)
617 elif attribute == 'path':
618 content = elem.remote_trace_path(trace_id)
619 elif attribute == 'name':
620 content = elem.remote_trace_name(trace_id)
625 def follow_trace(self, trace_id, trace):
626 self._traces[trace_id] = trace
630 # An internal flag, so we know to behave differently in
631 # a few corner cases.
632 self.recovering = True
634 # Create and connect do not perform any real tasks against
635 # the nodes, it only sets up the object hierarchy,
636 # so we can run them normally
638 self.do_connect_init()
639 self.do_connect_compl()
641 # Manually recover nodes, to mark dependencies installed
642 # and clean up mutable attributes
643 self._do_in_factory_order(
644 lambda self, guid : self._elements[guid].recover(),
649 # Assign nodes - since we're working off exeucte XML, nodes
650 # have specific hostnames assigned and we don't need to do
651 # real assignment, only find out node ids and check liveliness
652 self.do_resource_discovery(recover = True)
655 # Pre/post configure, however, tends to set up tunnels
656 # Execute configuration steps only for those object
657 # kinds that do not have side effects
659 # Do the ones without side effects,
660 # including nodes that need to set up home
661 # folders and all that
662 self._do_in_factory_order(
663 "preconfigure_function",
666 Parallel(metadata.NODE),
670 # Tunnels require a home path that is configured
671 # at this step. Since we cannot run the step itself,
672 # we need to inject this homepath ourselves
673 for guid, element in self._elements.iteritems():
674 if isinstance(element, self._interfaces.TunIface):
675 element._home_path = "tun-%s" % (guid,)
677 # Manually recover tunnels, applications and
678 # netpipes, negating the side effects
679 self._do_in_factory_order(
680 lambda self, guid : self._elements[guid].recover(),
682 Parallel(metadata.TAPIFACE),
683 Parallel(metadata.TUNIFACE),
685 Parallel(metadata.NEPIDEPENDENCY),
686 Parallel(metadata.NS3DEPENDENCY),
687 Parallel(metadata.DEPENDENCY),
688 Parallel(metadata.APPLICATION),
689 Parallel(metadata.CCNXDAEMON),
692 # Tunnels are not harmed by configuration after
693 # recovery, and some attributes get set this way
694 # like external_iface
695 self._do_in_factory_order(
696 "preconfigure_function",
698 Parallel(metadata.TAPIFACE),
699 Parallel(metadata.TUNIFACE),
702 # Post-do the ones without side effects
703 self._do_in_factory_order(
704 "configure_function",
707 Parallel(metadata.NODE),
709 Parallel(metadata.TAPIFACE),
710 Parallel(metadata.TUNIFACE),
713 # There are no required prestart steps
714 # to call upon recovery, so we're done
716 self.recovering = True
718 def _make_generic(self, parameters, kind, **kwargs):
719 args = dict({'api': self.plcapi})
722 app.testbed = weakref.ref(self)
724 # Note: there is 1-to-1 correspondence between attribute names
725 # If that changes, this has to change as well
726 for attr,val in parameters.iteritems():
728 setattr(app, attr, val)
730 # We ignore these errors while recovering.
731 # Some attributes are immutable, and setting
732 # them is necessary (to recover the state), but
733 # some are not (they throw an exception).
734 if not self.recovering:
739 def _make_node(self, parameters):
740 args = dict({'sliceapi': self.sliceapi})
741 node = self._make_generic(parameters, self._node.Node, **args)
742 node.enable_proc_cleanup = self.cleanProc
743 node.enable_home_cleanup = self.cleanHome
746 def _make_node_iface(self, parameters):
747 return self._make_generic(parameters, self._interfaces.NodeIface)
749 def _make_tun_iface(self, parameters):
750 return self._make_generic(parameters, self._interfaces.TunIface)
752 def _make_tap_iface(self, parameters):
753 return self._make_generic(parameters, self._interfaces.TapIface)
755 def _make_netpipe(self, parameters):
756 return self._make_generic(parameters, self._interfaces.NetPipe)
758 def _make_internet(self, parameters):
759 return self._make_generic(parameters, self._interfaces.Internet)
761 def _make_application(self, parameters, clazz = None):
763 clazz = self._app.Application
764 return self._make_generic(parameters, clazz)
766 def _make_dependency(self, parameters):
767 return self._make_generic(parameters, self._app.Dependency)
769 def _make_nepi_dependency(self, parameters):
770 return self._make_generic(parameters, self._app.NepiDependency)
772 def _make_ns3_dependency(self, parameters):
773 return self._make_generic(parameters, self._app.NS3Dependency)
775 def _make_tun_filter(self, parameters):
776 return self._make_generic(parameters, self._interfaces.TunFilter)
778 def _make_class_queue_filter(self, parameters):
779 return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
781 def _make_tos_queue_filter(self, parameters):
782 return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
784 def _make_multicast_forwarder(self, parameters):
785 return self._make_generic(parameters, self._multicast.MulticastForwarder)
787 def _make_multicast_announcer(self, parameters):
788 return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
790 def _make_multicast_router(self, parameters):
791 return self._make_generic(parameters, self._multicast.MulticastRouter)