1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID, TESTBED_VERSION
4 from nepi.core import testbed_impl
5 from nepi.core.metadata import Parallel
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
31 class TempKeyError(Exception):
34 class TestbedController(testbed_impl.TestbedController):
36 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
37 self._home_directory = None
41 import node, interfaces, application, multicast
43 self._interfaces = interfaces
44 self._app = application
45 self._multicast = multicast
47 self._blacklist = set()
48 self._just_provisioned = set()
50 self._load_blacklist()
55 self._vsys_vnet = None
57 self._logger = logging.getLogger('nepi.testbeds.planetlab')
59 self.recovering = False
62 def home_directory(self):
63 return self._home_directory
69 self._plcapi = plcapi.plcapi(
80 if not self._sliceapi:
82 self._sliceapi = self.plcapi
84 from nepi.util import sfiapi
85 self._sliceapi = sfiapi.sfiapi(self.slice_id)
90 if not self._slice_id:
91 self._slice_id = self.sliceapi.GetSliceId(self.slicename)
96 if not self._vsys_vnet:
97 self._vsys_vnet = self.sliceapi.GetSliceVnetSysTag(self.slicename)
98 return self._vsys_vnet
100 def _load_blacklist(self):
101 blpath = environ.homepath('plblacklist')
104 bl = open(blpath, "r")
106 self._blacklist = set()
110 self._blacklist = set(
111 map(str.strip, bl.readlines())
116 def _save_blacklist(self):
117 blpath = environ.homepath('plblacklist')
118 bl = open(blpath, "w")
121 map('%s\n'.__mod__, self._blacklist))
126 self._home_directory = self._attributes.\
127 get_attribute_value("homeDirectory")
128 self.slicename = self._attributes.\
129 get_attribute_value("slice")
130 self.authUser = self._attributes.\
131 get_attribute_value("authUser")
132 self.authString = self._attributes.\
133 get_attribute_value("authPass")
134 self.sliceSSHKey = self._attributes.\
135 get_attribute_value("sliceSSHKey")
136 self.sliceSSHKeyPass = None
137 self.plcHost = self._attributes.\
138 get_attribute_value("plcHost")
139 self.plcUrl = self._attributes.\
140 get_attribute_value("plcUrl")
141 self.logLevel = self._attributes.\
142 get_attribute_value("plLogLevel")
143 self.proxy = self._attributes.\
144 get_attribute_value("proxy")
145 self.tapPortBase = self._attributes.\
146 get_attribute_value("tapPortBase")
147 self.p2pDeployment = self._attributes.\
148 get_attribute_value("p2pDeployment")
149 self.cleanProc = self._attributes.\
150 get_attribute_value("cleanProc")
151 self.cleanHome = self._attributes.\
152 get_attribute_value("cleanHome")
153 self.sfa = self._attributes.\
154 get_attribute_value("sfa")
156 self._slice_id = self._attributes.\
157 get_attribute_value("sliceHrn")
159 if not self.slicename:
160 raise RuntimeError, "Slice not set"
161 if not self.authUser:
162 raise RuntimeError, "PlanetLab account username not set"
163 if not self.authString:
164 raise RuntimeError, "PlanetLab account passphrase not set"
165 if not self.sliceSSHKey:
166 raise RuntimeError, "PlanetLab account key not specified"
167 if not os.path.exists(self.sliceSSHKey):
168 raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
170 self._logger.setLevel(getattr(logging,self.logLevel))
172 super(TestbedController, self).do_setup()
174 def do_post_asynclaunch(self, guid):
175 # Dependencies were launched asynchronously,
177 dep = self._elements[guid]
178 if isinstance(dep, self._app.Dependency):
179 dep.async_setup_wait()
181 # Two-phase configuration for asynchronous launch
182 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
183 do_poststep_configure = staticmethod(do_post_asynclaunch)
185 def do_preconfigure(self):
187 # Perform resource discovery if we don't have
188 # specific resources assigned yet
189 self.do_resource_discovery()
191 # Create PlanetLab slivers
192 self.do_provisioning()
195 # Wait for provisioning
200 except self._node.UnresponsiveNodeError:
204 if self.p2pDeployment:
205 # Plan application deployment
206 self.do_spanning_deployment_plan()
208 # Configure elements per XML data
209 super(TestbedController, self).do_preconfigure()
211 def do_resource_discovery(self, recover = False):
212 to_provision = self._to_provision = set()
214 reserved = set(self._blacklist)
215 for guid, node in self._elements.iteritems():
216 if isinstance(node, self._node.Node) and node._node_id is not None:
217 reserved.add(node.hostname)
220 # look for perfectly defined nodes
221 # (ie: those with only one candidate)
222 reserve_lock = threading.RLock()
223 def assignifunique(guid, node):
224 # Try existing nodes first
225 # If we have only one candidate, simply use it
226 candidates = node.find_candidates(
227 filter_slice_id = self.slice_id)
230 candidate_hosts = set(candidates.keys() if candidates else [])
231 reserve_lock.acquire()
233 candidate_hosts -= reserved
234 if len(candidate_hosts) == 1:
235 hostname = iter(candidate_hosts).next()
236 node_id = candidates[hostname]
237 reserved.add(hostname)
238 elif not candidate_hosts:
239 # Try again including unassigned nodes
240 reserve_lock.release()
242 candidates = node.find_candidates()
244 reserve_lock.acquire()
245 candidate_hosts = set(candidates.keys() if candidates else [])
246 candidate_hosts -= reserved
247 if len(candidate_hosts) > 1:
249 if len(candidate_hosts) == 1:
250 hostname = iter(candidate_hosts).next()
251 node_id = candidates[hostname]
252 to_provision.add(node_id)
253 reserved.add(hostname)
255 raise RuntimeError, "Cannot assign resources for node %s, no candidates with %s" % (guid,
256 node.make_filter_description())
258 reserve_lock.release()
260 if node_id is not None:
261 node.assign_node_id(node_id)
263 runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
265 for guid, node in self._elements.iteritems():
266 if isinstance(node, self._node.Node) and node._node_id is None:
267 runner.put(assignifunique, guid, node)
270 # Now do the backtracking search for a suitable solution
271 # First with existing slice nodes
274 def genreqs(node, filter_slice_id=None):
275 # Try existing nodes first
276 # If we have only one candidate, simply use it
277 candidates = node.find_candidates(
278 filter_slice_id = filter_slice_id)
280 if candidates.has_key(r):
282 reqs.append(candidates.values())
284 for guid, node in self._elements.iteritems():
285 if isinstance(node, self._node.Node) and node._node_id is None:
286 runner.put(genreqs, node, self.slice_id)
291 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
293 def pickbest(fullset, nreq, node=nodes[0]):
294 if len(fullset) > nreq:
295 fullset = zip(node.rate_nodes(fullset),fullset)
296 fullset.sort(reverse=True)
298 return set(map(operator.itemgetter(1),fullset))
303 solution = resourcealloc.alloc(reqs, sample=pickbest)
304 except resourcealloc.ResourceAllocationError:
305 # Failed, try again with all nodes
308 runner.put(genreqs, node)
310 solution = resourcealloc.alloc(reqs, sample=pickbest)
311 to_provision.update(solution)
314 for node, node_id in zip(nodes, solution):
315 runner.put(node.assign_node_id, node_id)
318 def do_provisioning(self):
319 if self._to_provision:
320 # Add new nodes to the slice
321 cur_nodes = self.sliceapi.GetSliceNodes(self.slice_id)
322 new_nodes = list(set(cur_nodes) | self._to_provision)
323 self.sliceapi.AddSliceNodes(self.slice_id, nodes=new_nodes)
326 self._just_provisioned = self._to_provision
327 del self._to_provision
329 def do_wait_nodes(self):
330 for guid, node in self._elements.iteritems():
331 if isinstance(node, self._node.Node):
332 # Just inject configuration stuff
333 node.home_path = "nepi-node-%s" % (guid,)
334 node.ident_path = self.sliceSSHKey
335 node.slicename = self.slicename
338 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
341 runner = ParallelRun(maxthreads=64, maxqueue=1)
343 def waitforit(guid, node):
345 node.wait_provisioning(
346 (20*60 if node._node_id in self._just_provisioned else 60)
349 self._logger.info("READY Node %s at %s", guid, node.hostname)
351 # Prepare dependency installer now
352 node.prepare_dependencies()
357 for guid, node in self._elements.iteritems():
360 if isinstance(node, self._node.Node):
361 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
362 runner.put(waitforit, guid, node)
365 except self._node.UnresponsiveNodeError:
367 self._logger.warn("UNRESPONSIVE Nodes")
369 # Mark all dead nodes (which are unresponsive) on the blacklist
371 for guid, node in self._elements.iteritems():
372 if isinstance(node, self._node.Node):
373 if not node.is_alive():
374 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
375 self._blacklist.add(node.hostname)
379 self._save_blacklist()
383 traceback.print_exc()
387 def do_spanning_deployment_plan(self):
388 # Create application groups by collecting all applications
389 # based on their hash - the hash should contain everything that
390 # defines them and the platform they're built
394 frozenset((app.depends or "").split(' ')),
395 frozenset((app.sources or "").split(' ')),
398 app.node.architecture,
399 app.node.operatingSystem,
404 depgroups = collections.defaultdict(list)
406 for element in self._elements.itervalues():
407 if isinstance(element, self._app.Dependency):
408 depgroups[dephash(element)].append(element)
409 elif isinstance(element, self._node.Node):
410 deps = element._yum_dependencies
412 depgroups[dephash(deps)].append(deps)
414 # Set up spanning deployment for those applications that
415 # have been deployed in several nodes.
416 for dh, group in depgroups.iteritems():
418 # Pick root (deterministically)
419 root = min(group, key=lambda app:app.node.hostname)
421 # Obtain all IPs in numeric format
422 # (which means faster distance computations)
424 dep._ip = socket.gethostbyname(dep.node.hostname)
425 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
428 # NOTE: the plan is an iterator
431 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
435 # Re-sign private key
437 tempprk, temppuk, tmppass = self._make_temp_private_key()
443 for slave, master in plan:
444 slave.set_master(master)
445 slave.install_keys(tempprk, temppuk, tmppass)
447 # We don't need the user's passphrase anymore
448 self.sliceSSHKeyPass = None
450 def _make_temp_private_key(self):
451 # Get the user's key's passphrase
452 if not self.sliceSSHKeyPass:
453 if 'SSH_ASKPASS' in os.environ:
454 proc = subprocess.Popen(
455 [ os.environ['SSH_ASKPASS'],
456 "Please type the passphrase for the %s SSH identity file. "
457 "The passphrase will be used to re-cipher the identity file with "
458 "a random 256-bit key for automated chain deployment on the "
459 "%s PlanetLab slice" % (
460 os.path.basename(self.sliceSSHKey),
463 stdin = open("/dev/null"),
464 stdout = subprocess.PIPE,
465 stderr = subprocess.PIPE)
466 out,err = proc.communicate()
467 self.sliceSSHKeyPass = out.strip()
469 if not self.sliceSSHKeyPass:
472 # Create temporary key files
473 prk = tempfile.NamedTemporaryFile(
474 dir = self.root_directory,
475 prefix = "pl_deploy_tmpk_",
478 puk = tempfile.NamedTemporaryFile(
479 dir = self.root_directory,
480 prefix = "pl_deploy_tmpk_",
483 # Create secure 256-bits temporary passphrase
484 passphrase = os.urandom(32).encode("hex")
487 oprk = open(self.sliceSSHKey, "rb")
488 opuk = open(self.sliceSSHKey+".pub", "rb")
489 shutil.copymode(oprk.name, prk.name)
490 shutil.copymode(opuk.name, puk.name)
491 shutil.copyfileobj(oprk, prk)
492 shutil.copyfileobj(opuk, puk)
498 # A descriptive comment
499 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
502 proc = subprocess.Popen(
505 "-P", self.sliceSSHKeyPass,
508 stdout = subprocess.PIPE,
509 stderr = subprocess.PIPE,
510 stdin = subprocess.PIPE
512 out, err = proc.communicate()
515 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
521 # Change comment on public key
522 puklines = puk.readlines()
523 puklines[0] = puklines[0].split(' ')
524 puklines[0][-1] = comment+'\n'
525 puklines[0] = ' '.join(puklines[0])
528 puk.writelines(puklines)
532 return prk, puk, passphrase
534 def set(self, guid, name, value, time = TIME_NOW):
535 super(TestbedController, self).set(guid, name, value, time)
536 # TODO: take on account schedule time for the task
537 element = self._elements[guid]
546 setattr(element, name, value)
548 # We ignore these errors while recovering.
549 # Some attributes are immutable, and setting
550 # them is necessary (to recover the state), but
551 # some are not (they throw an exception).
552 if not self.recovering:
555 if hasattr(element, 'refresh'):
556 # invoke attribute refresh hook
559 def get(self, guid, name, time = TIME_NOW):
560 value = super(TestbedController, self).get(guid, name, time)
561 # TODO: take on account schedule time for the task
562 factory_id = self._create[guid]
563 factory = self._factories[factory_id]
564 element = self._elements.get(guid)
566 return getattr(element, name)
567 except (KeyError, AttributeError):
570 def get_address(self, guid, index, attribute='Address'):
574 iface = self._elements.get(guid)
575 if iface and index == 0:
576 if attribute == 'Address':
578 elif attribute == 'NetPrefix':
579 return iface.netprefix
580 elif attribute == 'Broadcast':
581 return iface.broadcast
583 # if all else fails, query box
584 return super(TestbedController, self).get_address(guid, index, attribute)
586 def action(self, time, guid, action):
587 raise NotImplementedError
590 for trace in self._traces.itervalues():
593 def invokeif(action, testbed, guid):
594 element = self._elements[guid]
595 if hasattr(element, action):
596 getattr(element, action)()
598 self._do_in_factory_order(
599 functools.partial(invokeif, 'cleanup'),
600 metadata.shutdown_order)
602 self._do_in_factory_order(
603 functools.partial(invokeif, 'destroy'),
604 metadata.shutdown_order)
606 self._elements.clear()
609 def trace(self, guid, trace_id, attribute='value'):
610 elem = self._elements[guid]
612 if attribute == 'value':
613 path = elem.sync_trace(self.home_directory, trace_id)
620 elif attribute == 'path':
621 content = elem.remote_trace_path(trace_id)
622 elif attribute == 'name':
623 content = elem.remote_trace_name(trace_id)
628 def follow_trace(self, trace_id, trace):
629 self._traces[trace_id] = trace
633 # An internal flag, so we know to behave differently in
634 # a few corner cases.
635 self.recovering = True
637 # Create and connect do not perform any real tasks against
638 # the nodes, it only sets up the object hierarchy,
639 # so we can run them normally
641 self.do_connect_init()
642 self.do_connect_compl()
644 # Manually recover nodes, to mark dependencies installed
645 # and clean up mutable attributes
646 self._do_in_factory_order(
647 lambda self, guid : self._elements[guid].recover(),
652 # Assign nodes - since we're working off exeucte XML, nodes
653 # have specific hostnames assigned and we don't need to do
654 # real assignment, only find out node ids and check liveliness
655 self.do_resource_discovery(recover = True)
658 # Pre/post configure, however, tends to set up tunnels
659 # Execute configuration steps only for those object
660 # kinds that do not have side effects
662 # Do the ones without side effects,
663 # including nodes that need to set up home
664 # folders and all that
665 self._do_in_factory_order(
666 "preconfigure_function",
669 Parallel(metadata.NODE),
673 # Tunnels require a home path that is configured
674 # at this step. Since we cannot run the step itself,
675 # we need to inject this homepath ourselves
676 for guid, element in self._elements.iteritems():
677 if isinstance(element, self._interfaces.TunIface):
678 element._home_path = "tun-%s" % (guid,)
680 # Manually recover tunnels, applications and
681 # netpipes, negating the side effects
682 self._do_in_factory_order(
683 lambda self, guid : self._elements[guid].recover(),
685 Parallel(metadata.TAPIFACE),
686 Parallel(metadata.TUNIFACE),
688 Parallel(metadata.NEPIDEPENDENCY),
689 Parallel(metadata.NS3DEPENDENCY),
690 Parallel(metadata.DEPENDENCY),
691 Parallel(metadata.APPLICATION),
692 Parallel(metadata.CCNXDAEMON),
695 # Tunnels are not harmed by configuration after
696 # recovery, and some attributes get set this way
697 # like external_iface
698 self._do_in_factory_order(
699 "preconfigure_function",
701 Parallel(metadata.TAPIFACE),
702 Parallel(metadata.TUNIFACE),
705 # Post-do the ones without side effects
706 self._do_in_factory_order(
707 "configure_function",
710 Parallel(metadata.NODE),
712 Parallel(metadata.TAPIFACE),
713 Parallel(metadata.TUNIFACE),
716 # There are no required prestart steps
717 # to call upon recovery, so we're done
719 self.recovering = True
721 def _make_generic(self, parameters, kind, **kwargs):
722 args = dict({'api': self.plcapi})
725 app.testbed = weakref.ref(self)
727 # Note: there is 1-to-1 correspondence between attribute names
728 # If that changes, this has to change as well
729 for attr,val in parameters.iteritems():
731 setattr(app, attr, val)
733 # We ignore these errors while recovering.
734 # Some attributes are immutable, and setting
735 # them is necessary (to recover the state), but
736 # some are not (they throw an exception).
737 if not self.recovering:
742 def _make_node(self, parameters):
743 args = dict({'sliceapi': self.sliceapi})
744 node = self._make_generic(parameters, self._node.Node, **args)
745 node.enable_proc_cleanup = self.cleanProc
746 node.enable_home_cleanup = self.cleanHome
749 def _make_node_iface(self, parameters):
750 return self._make_generic(parameters, self._interfaces.NodeIface)
752 def _make_tun_iface(self, parameters):
753 return self._make_generic(parameters, self._interfaces.TunIface)
755 def _make_tap_iface(self, parameters):
756 return self._make_generic(parameters, self._interfaces.TapIface)
758 def _make_netpipe(self, parameters):
759 return self._make_generic(parameters, self._interfaces.NetPipe)
761 def _make_internet(self, parameters):
762 return self._make_generic(parameters, self._interfaces.Internet)
764 def _make_application(self, parameters, clazz = None):
766 clazz = self._app.Application
767 return self._make_generic(parameters, clazz)
769 def _make_dependency(self, parameters):
770 return self._make_generic(parameters, self._app.Dependency)
772 def _make_nepi_dependency(self, parameters):
773 return self._make_generic(parameters, self._app.NepiDependency)
775 def _make_ns3_dependency(self, parameters):
776 return self._make_generic(parameters, self._app.NS3Dependency)
778 def _make_tun_filter(self, parameters):
779 return self._make_generic(parameters, self._interfaces.TunFilter)
781 def _make_class_queue_filter(self, parameters):
782 return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
784 def _make_logging_class_queue_filter(self, parameters):
785 return self._make_generic(parameters, self._interfaces.LoggingClassQueueFilter)
787 def _make_tos_queue_filter(self, parameters):
788 return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
790 def _make_multicast_forwarder(self, parameters):
791 return self._make_generic(parameters, self._multicast.MulticastForwarder)
793 def _make_multicast_announcer(self, parameters):
794 return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
796 def _make_multicast_router(self, parameters):
797 return self._make_generic(parameters, self._multicast.MulticastRouter)