1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID, TESTBED_VERSION
4 from nepi.core import testbed_impl
5 from nepi.core.metadata import Parallel
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
31 class TempKeyError(Exception):
34 class TestbedController(testbed_impl.TestbedController):
36 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
37 self._home_directory = None
41 import node, interfaces, application, multicast
43 self._interfaces = interfaces
44 self._app = application
45 self._multicast = multicast
47 self._blacklist = set()
48 self._just_provisioned = set()
50 self._load_blacklist()
52 self._logger = logging.getLogger('nepi.testbeds.planetlab')
54 self.recovering = False
57 def home_directory(self):
58 return self._home_directory
62 if not hasattr(self, '_plapi'):
66 self._plapi = plcapi.PLCAPI(
67 username = self.authUser,
68 password = self.authString,
69 hostname = self.plcHost,
70 urlpattern = self.plcUrl
73 # anonymous access - may not be enough for much
74 self._plapi = plcapi.PLCAPI()
79 if not hasattr(self, '_slice_id'):
80 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
82 self._slice_id = slices[0]['slice_id']
84 # If it wasn't found, don't remember this failure, keep trying
90 if not hasattr(self, '_vsys_vnet'):
91 self._vsys_vnet = plutil.getVnet(
94 return self._vsys_vnet
96 def _load_blacklist(self):
97 blpath = environ.homepath('plblacklist')
100 bl = open(blpath, "r")
102 self._blacklist = set()
106 self._blacklist = set(
108 map(str.strip, bl.readlines())
114 def _save_blacklist(self):
115 blpath = environ.homepath('plblacklist')
116 bl = open(blpath, "w")
119 map('%s\n'.__mod__, self._blacklist))
124 self._home_directory = self._attributes.\
125 get_attribute_value("homeDirectory")
126 self.slicename = self._attributes.\
127 get_attribute_value("slice")
128 self.authUser = self._attributes.\
129 get_attribute_value("authUser")
130 self.authString = self._attributes.\
131 get_attribute_value("authPass")
132 self.sliceSSHKey = self._attributes.\
133 get_attribute_value("sliceSSHKey")
134 self.sliceSSHKeyPass = None
135 self.plcHost = self._attributes.\
136 get_attribute_value("plcHost")
137 self.plcUrl = self._attributes.\
138 get_attribute_value("plcUrl")
139 self.logLevel = self._attributes.\
140 get_attribute_value("plLogLevel")
141 self.tapPortBase = self._attributes.\
142 get_attribute_value("tapPortBase")
143 self.p2pDeployment = self._attributes.\
144 get_attribute_value("p2pDeployment")
145 self.dedicatedSlice = self._attributes.\
146 get_attribute_value("dedicatedSlice")
148 if not self.slicename:
149 raise RuntimeError, "Slice not set"
150 if not self.authUser:
151 raise RuntimeError, "PlanetLab account username not set"
152 if not self.authString:
153 raise RuntimeError, "PlanetLab account passphrase not set"
154 if not self.sliceSSHKey:
155 raise RuntimeError, "PlanetLab account key not specified"
156 if not os.path.exists(self.sliceSSHKey):
157 raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
159 self._logger.setLevel(getattr(logging,self.logLevel))
161 super(TestbedController, self).do_setup()
163 def do_post_asynclaunch(self, guid):
164 # Dependencies were launched asynchronously,
166 dep = self._elements[guid]
167 if isinstance(dep, self._app.Dependency):
168 dep.async_setup_wait()
170 # Two-phase configuration for asynchronous launch
171 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
172 do_poststep_configure = staticmethod(do_post_asynclaunch)
174 def do_preconfigure(self):
176 # Perform resource discovery if we don't have
177 # specific resources assigned yet
178 self.do_resource_discovery()
180 # Create PlanetLab slivers
181 self.do_provisioning()
184 # Wait for provisioning
189 except self._node.UnresponsiveNodeError:
193 if self.p2pDeployment:
194 # Plan application deployment
195 self.do_spanning_deployment_plan()
197 # Configure elements per XML data
198 super(TestbedController, self).do_preconfigure()
200 def do_resource_discovery(self, recover = False):
201 to_provision = self._to_provision = set()
203 reserved = set(self._blacklist)
204 for guid, node in self._elements.iteritems():
205 if isinstance(node, self._node.Node) and node._node_id is not None:
206 reserved.add(node._node_id)
209 # look for perfectly defined nodes
210 # (ie: those with only one candidate)
211 reserve_lock = threading.RLock()
212 def assignifunique(guid, node):
213 # Try existing nodes first
214 # If we have only one candidate, simply use it
215 candidates = node.find_candidates(
216 filter_slice_id = self.slice_id)
219 reserve_lock.acquire()
221 candidates -= reserved
222 if len(candidates) == 1:
223 node_id = iter(candidates).next()
224 reserved.add(node_id)
226 # Try again including unassigned nodes
227 reserve_lock.release()
229 candidates = node.find_candidates()
231 reserve_lock.acquire()
232 candidates -= reserved
233 if len(candidates) > 1:
235 if len(candidates) == 1:
236 node_id = iter(candidates).next()
237 to_provision.add(node_id)
238 reserved.add(node_id)
240 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
241 node.make_filter_description())
243 reserve_lock.release()
245 if node_id is not None:
246 node.assign_node_id(node_id)
248 runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
250 for guid, node in self._elements.iteritems():
251 if isinstance(node, self._node.Node) and node._node_id is None:
252 runner.put(assignifunique, guid, node)
255 # Now do the backtracking search for a suitable solution
256 # First with existing slice nodes
259 def genreqs(node, filter_slice_id=None):
260 # Try existing nodes first
261 # If we have only one candidate, simply use it
262 candidates = node.find_candidates(
263 filter_slice_id = filter_slice_id)
264 candidates -= reserved
265 reqs.append(candidates)
267 for guid, node in self._elements.iteritems():
268 if isinstance(node, self._node.Node) and node._node_id is None:
269 runner.put(genreqs, node, self.slice_id)
274 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
276 def pickbest(fullset, nreq, node=nodes[0]):
277 if len(fullset) > nreq:
278 fullset = zip(node.rate_nodes(fullset),fullset)
279 fullset.sort(reverse=True)
281 return set(map(operator.itemgetter(1),fullset))
286 solution = resourcealloc.alloc(reqs, sample=pickbest)
287 except resourcealloc.ResourceAllocationError:
288 # Failed, try again with all nodes
291 runner.put(genreqs, node)
293 solution = resourcealloc.alloc(reqs, sample=pickbest)
294 to_provision.update(solution)
297 for node, node_id in zip(nodes, solution):
298 runner.put(node.assign_node_id, node_id)
301 def do_provisioning(self):
302 if self._to_provision:
303 # Add new nodes to the slice
304 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
305 new_nodes = list(set(cur_nodes) | self._to_provision)
306 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
309 self._just_provisioned = self._to_provision
310 del self._to_provision
312 def do_wait_nodes(self):
313 for guid, node in self._elements.iteritems():
314 if isinstance(node, self._node.Node):
315 # Just inject configuration stuff
316 node.home_path = "nepi-node-%s" % (guid,)
317 node.ident_path = self.sliceSSHKey
318 node.slicename = self.slicename
321 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
324 runner = ParallelRun(maxthreads=64, maxqueue=1)
326 def waitforit(guid, node):
328 node.wait_provisioning(
329 (20*60 if node._node_id in self._just_provisioned else 60)
332 self._logger.info("READY Node %s at %s", guid, node.hostname)
334 # Prepare dependency installer now
335 node.prepare_dependencies()
340 for guid, node in self._elements.iteritems():
343 if isinstance(node, self._node.Node):
344 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
345 runner.put(waitforit, guid, node)
348 except self._node.UnresponsiveNodeError:
350 self._logger.warn("UNRESPONSIVE Nodes")
352 # Mark all dead nodes (which are unresponsive) on the blacklist
354 for guid, node in self._elements.iteritems():
355 if isinstance(node, self._node.Node):
356 if not node.is_alive():
357 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
358 self._blacklist.add(node._node_id)
362 self._save_blacklist()
366 traceback.print_exc()
370 def do_spanning_deployment_plan(self):
371 # Create application groups by collecting all applications
372 # based on their hash - the hash should contain everything that
373 # defines them and the platform they're built
377 frozenset((app.depends or "").split(' ')),
378 frozenset((app.sources or "").split(' ')),
381 app.node.architecture,
382 app.node.operatingSystem,
387 depgroups = collections.defaultdict(list)
389 for element in self._elements.itervalues():
390 if isinstance(element, self._app.Dependency):
391 depgroups[dephash(element)].append(element)
392 elif isinstance(element, self._node.Node):
393 deps = element._yum_dependencies
395 depgroups[dephash(deps)].append(deps)
397 # Set up spanning deployment for those applications that
398 # have been deployed in several nodes.
399 for dh, group in depgroups.iteritems():
401 # Pick root (deterministically)
402 root = min(group, key=lambda app:app.node.hostname)
404 # Obtain all IPs in numeric format
405 # (which means faster distance computations)
407 dep._ip = socket.gethostbyname(dep.node.hostname)
408 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
411 # NOTE: the plan is an iterator
414 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
418 # Re-sign private key
420 tempprk, temppuk, tmppass = self._make_temp_private_key()
426 for slave, master in plan:
427 slave.set_master(master)
428 slave.install_keys(tempprk, temppuk, tmppass)
430 # We don't need the user's passphrase anymore
431 self.sliceSSHKeyPass = None
433 def _make_temp_private_key(self):
434 # Get the user's key's passphrase
435 if not self.sliceSSHKeyPass:
436 if 'SSH_ASKPASS' in os.environ:
437 proc = subprocess.Popen(
438 [ os.environ['SSH_ASKPASS'],
439 "Please type the passphrase for the %s SSH identity file. "
440 "The passphrase will be used to re-cipher the identity file with "
441 "a random 256-bit key for automated chain deployment on the "
442 "%s PlanetLab slice" % (
443 os.path.basename(self.sliceSSHKey),
446 stdin = open("/dev/null"),
447 stdout = subprocess.PIPE,
448 stderr = subprocess.PIPE)
449 out,err = proc.communicate()
450 self.sliceSSHKeyPass = out.strip()
452 if not self.sliceSSHKeyPass:
455 # Create temporary key files
456 prk = tempfile.NamedTemporaryFile(
457 dir = self.root_directory,
458 prefix = "pl_deploy_tmpk_",
461 puk = tempfile.NamedTemporaryFile(
462 dir = self.root_directory,
463 prefix = "pl_deploy_tmpk_",
466 # Create secure 256-bits temporary passphrase
467 passphrase = os.urandom(32).encode("hex")
470 oprk = open(self.sliceSSHKey, "rb")
471 opuk = open(self.sliceSSHKey+".pub", "rb")
472 shutil.copymode(oprk.name, prk.name)
473 shutil.copymode(opuk.name, puk.name)
474 shutil.copyfileobj(oprk, prk)
475 shutil.copyfileobj(opuk, puk)
481 # A descriptive comment
482 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
485 proc = subprocess.Popen(
488 "-P", self.sliceSSHKeyPass,
491 stdout = subprocess.PIPE,
492 stderr = subprocess.PIPE,
493 stdin = subprocess.PIPE
495 out, err = proc.communicate()
498 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
504 # Change comment on public key
505 puklines = puk.readlines()
506 puklines[0] = puklines[0].split(' ')
507 puklines[0][-1] = comment+'\n'
508 puklines[0] = ' '.join(puklines[0])
511 puk.writelines(puklines)
515 return prk, puk, passphrase
517 def set(self, guid, name, value, time = TIME_NOW):
518 super(TestbedController, self).set(guid, name, value, time)
519 # TODO: take on account schedule time for the task
520 element = self._elements[guid]
523 setattr(element, name, value)
525 # We ignore these errors while recovering.
526 # Some attributes are immutable, and setting
527 # them is necessary (to recover the state), but
528 # some are not (they throw an exception).
529 if not self.recovering:
532 if hasattr(element, 'refresh'):
533 # invoke attribute refresh hook
536 def get(self, guid, name, time = TIME_NOW):
537 value = super(TestbedController, self).get(guid, name, time)
538 # TODO: take on account schedule time for the task
539 factory_id = self._create[guid]
540 factory = self._factories[factory_id]
541 element = self._elements.get(guid)
543 return getattr(element, name)
544 except (KeyError, AttributeError):
547 def get_address(self, guid, index, attribute='Address'):
551 iface = self._elements.get(guid)
552 if iface and index == 0:
553 if attribute == 'Address':
555 elif attribute == 'NetPrefix':
556 return iface.netprefix
557 elif attribute == 'Broadcast':
558 return iface.broadcast
560 # if all else fails, query box
561 return super(TestbedController, self).get_address(guid, index, attribute)
563 def action(self, time, guid, action):
564 raise NotImplementedError
567 for trace in self._traces.itervalues():
570 def invokeif(action, testbed, guid):
571 element = self._elements[guid]
572 if hasattr(element, action):
573 getattr(element, action)()
575 self._do_in_factory_order(
576 functools.partial(invokeif, 'cleanup'),
577 metadata.shutdown_order)
579 self._do_in_factory_order(
580 functools.partial(invokeif, 'destroy'),
581 metadata.shutdown_order)
583 self._elements.clear()
586 def trace(self, guid, trace_id, attribute='value'):
587 elem = self._elements[guid]
589 if attribute == 'value':
590 path = elem.sync_trace(self.home_directory, trace_id)
597 elif attribute == 'path':
598 content = elem.remote_trace_path(trace_id)
599 elif attribute == 'name':
600 content = elem.remote_trace_name(trace_id)
605 def follow_trace(self, trace_id, trace):
606 self._traces[trace_id] = trace
610 # An internal flag, so we know to behave differently in
611 # a few corner cases.
612 self.recovering = True
614 # Create and connect do not perform any real tasks against
615 # the nodes, it only sets up the object hierarchy,
616 # so we can run them normally
618 self.do_connect_init()
619 self.do_connect_compl()
621 # Manually recover nodes, to mark dependencies installed
622 # and clean up mutable attributes
623 self._do_in_factory_order(
624 lambda self, guid : self._elements[guid].recover(),
629 # Assign nodes - since we're working off exeucte XML, nodes
630 # have specific hostnames assigned and we don't need to do
631 # real assignment, only find out node ids and check liveliness
632 self.do_resource_discovery(recover = True)
635 # Pre/post configure, however, tends to set up tunnels
636 # Execute configuration steps only for those object
637 # kinds that do not have side effects
639 # Do the ones without side effects,
640 # including nodes that need to set up home
641 # folders and all that
642 self._do_in_factory_order(
643 "preconfigure_function",
646 Parallel(metadata.NODE),
650 # Tunnels require a home path that is configured
651 # at this step. Since we cannot run the step itself,
652 # we need to inject this homepath ourselves
653 for guid, element in self._elements.iteritems():
654 if isinstance(element, self._interfaces.TunIface):
655 element._home_path = "tun-%s" % (guid,)
657 # Manually recover tunnels, applications and
658 # netpipes, negating the side effects
659 self._do_in_factory_order(
660 lambda self, guid : self._elements[guid].recover(),
662 Parallel(metadata.TAPIFACE),
663 Parallel(metadata.TUNIFACE),
665 Parallel(metadata.NEPIDEPENDENCY),
666 Parallel(metadata.NS3DEPENDENCY),
667 Parallel(metadata.DEPENDENCY),
668 Parallel(metadata.APPLICATION),
671 # Tunnels are not harmed by configuration after
672 # recovery, and some attributes get set this way
673 # like external_iface
674 self._do_in_factory_order(
675 "preconfigure_function",
677 Parallel(metadata.TAPIFACE),
678 Parallel(metadata.TUNIFACE),
681 # Post-do the ones without side effects
682 self._do_in_factory_order(
683 "configure_function",
686 Parallel(metadata.NODE),
688 Parallel(metadata.TAPIFACE),
689 Parallel(metadata.TUNIFACE),
692 # There are no required prestart steps
693 # to call upon recovery, so we're done
695 self.recovering = True
697 def _make_generic(self, parameters, kind):
698 app = kind(self.plapi)
699 app.testbed = weakref.ref(self)
701 # Note: there is 1-to-1 correspondence between attribute names
702 # If that changes, this has to change as well
703 for attr,val in parameters.iteritems():
705 setattr(app, attr, val)
707 # We ignore these errors while recovering.
708 # Some attributes are immutable, and setting
709 # them is necessary (to recover the state), but
710 # some are not (they throw an exception).
711 if not self.recovering:
716 def _make_node(self, parameters):
717 node = self._make_generic(parameters, self._node.Node)
718 node.enable_cleanup = self.dedicatedSlice
721 def _make_node_iface(self, parameters):
722 return self._make_generic(parameters, self._interfaces.NodeIface)
724 def _make_tun_iface(self, parameters):
725 return self._make_generic(parameters, self._interfaces.TunIface)
727 def _make_tap_iface(self, parameters):
728 return self._make_generic(parameters, self._interfaces.TapIface)
730 def _make_netpipe(self, parameters):
731 return self._make_generic(parameters, self._interfaces.NetPipe)
733 def _make_internet(self, parameters):
734 return self._make_generic(parameters, self._interfaces.Internet)
736 def _make_application(self, parameters):
737 return self._make_generic(parameters, self._app.Application)
739 def _make_dependency(self, parameters):
740 return self._make_generic(parameters, self._app.Dependency)
742 def _make_nepi_dependency(self, parameters):
743 return self._make_generic(parameters, self._app.NepiDependency)
745 def _make_ns3_dependency(self, parameters):
746 return self._make_generic(parameters, self._app.NS3Dependency)
748 def _make_tun_filter(self, parameters):
749 return self._make_generic(parameters, self._interfaces.TunFilter)
751 def _make_class_queue_filter(self, parameters):
752 return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
754 def _make_tos_queue_filter(self, parameters):
755 return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
757 def _make_multicast_forwarder(self, parameters):
758 return self._make_generic(parameters, self._multicast.MulticastForwarder)
760 def _make_multicast_announcer(self, parameters):
761 return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
763 def _make_multicast_router(self, parameters):
764 return self._make_generic(parameters, self._multicast.MulticastRouter)