2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
30 class TempKeyError(Exception):
33 class TestbedController(testbed_impl.TestbedController):
35 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
36 self._home_directory = None
40 import node, interfaces, application, multicast
42 self._interfaces = interfaces
43 self._app = application
44 self._multicast = multicast
46 self._blacklist = set()
47 self._just_provisioned = set()
49 self._load_blacklist()
51 self._logger = logging.getLogger('nepi.testbeds.planetlab')
53 self.recovering = False
56 def home_directory(self):
57 return self._home_directory
61 if not hasattr(self, '_plapi'):
65 self._plapi = plcapi.PLCAPI(
66 username = self.authUser,
67 password = self.authString,
68 hostname = self.plcHost,
69 urlpattern = self.plcUrl
72 # anonymous access - may not be enough for much
73 self._plapi = plcapi.PLCAPI()
78 if not hasattr(self, '_slice_id'):
79 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
81 self._slice_id = slices[0]['slice_id']
83 # If it wasn't found, don't remember this failure, keep trying
89 if not hasattr(self, '_vsys_vnet'):
90 slicetags = self.plapi.GetSliceTags(
91 name = self.slicename,
92 tagname = 'vsys_vnet',
95 self._vsys_vnet = slicetags[0]['value']
97 # If it wasn't found, don't remember this failure, keep trying
99 return self._vsys_vnet
101 def _load_blacklist(self):
102 blpath = environ.homepath('plblacklist')
105 bl = open(blpath, "r")
107 self._blacklist = set()
111 self._blacklist = set(
113 map(str.strip, bl.readlines())
119 def _save_blacklist(self):
120 blpath = environ.homepath('plblacklist')
121 bl = open(blpath, "w")
124 map('%s\n'.__mod__, self._blacklist))
129 self._home_directory = self._attributes.\
130 get_attribute_value("homeDirectory")
131 self.slicename = self._attributes.\
132 get_attribute_value("slice")
133 self.authUser = self._attributes.\
134 get_attribute_value("authUser")
135 self.authString = self._attributes.\
136 get_attribute_value("authPass")
137 self.sliceSSHKey = self._attributes.\
138 get_attribute_value("sliceSSHKey")
139 self.sliceSSHKeyPass = None
140 self.plcHost = self._attributes.\
141 get_attribute_value("plcHost")
142 self.plcUrl = self._attributes.\
143 get_attribute_value("plcUrl")
144 self.logLevel = self._attributes.\
145 get_attribute_value("plLogLevel")
146 self.tapPortBase = self._attributes.\
147 get_attribute_value("tapPortBase")
148 self.p2pDeployment = self._attributes.\
149 get_attribute_value("p2pDeployment")
150 self.dedicatedSlice = self._attributes.\
151 get_attribute_value("dedicatedSlice")
153 if not self.slicename:
154 raise RuntimeError, "Slice not set"
155 if not self.authUser:
156 raise RuntimeError, "PlanetLab account username not set"
157 if not self.authString:
158 raise RuntimeError, "PlanetLab account passphrase not set"
159 if not self.sliceSSHKey:
160 raise RuntimeError, "PlanetLab account key not specified"
161 if not os.path.exists(self.sliceSSHKey):
162 raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
164 self._logger.setLevel(getattr(logging,self.logLevel))
166 super(TestbedController, self).do_setup()
168 def do_post_asynclaunch(self, guid):
169 # Dependencies were launched asynchronously,
171 dep = self._elements[guid]
172 if isinstance(dep, self._app.Dependency):
173 dep.async_setup_wait()
175 # Two-phase configuration for asynchronous launch
176 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
177 do_poststep_configure = staticmethod(do_post_asynclaunch)
179 def do_preconfigure(self):
181 # Perform resource discovery if we don't have
182 # specific resources assigned yet
183 self.do_resource_discovery()
185 # Create PlanetLab slivers
186 self.do_provisioning()
189 # Wait for provisioning
194 except self._node.UnresponsiveNodeError:
198 if self.p2pDeployment:
199 # Plan application deployment
200 self.do_spanning_deployment_plan()
202 # Configure elements per XML data
203 super(TestbedController, self).do_preconfigure()
205 def do_resource_discovery(self, recover = False):
206 to_provision = self._to_provision = set()
208 reserved = set(self._blacklist)
209 for guid, node in self._elements.iteritems():
210 if isinstance(node, self._node.Node) and node._node_id is not None:
211 reserved.add(node._node_id)
214 # look for perfectly defined nodes
215 # (ie: those with only one candidate)
216 for guid, node in self._elements.iteritems():
217 if isinstance(node, self._node.Node) and node._node_id is None:
218 # Try existing nodes first
219 # If we have only one candidate, simply use it
220 candidates = node.find_candidates(
221 filter_slice_id = self.slice_id)
222 candidates -= reserved
223 if len(candidates) == 1:
224 node_id = iter(candidates).next()
225 node.assign_node_id(node_id)
226 reserved.add(node_id)
228 # Try again including unassigned nodes
229 candidates = node.find_candidates()
230 candidates -= reserved
231 if len(candidates) > 1:
233 if len(candidates) == 1:
234 node_id = iter(candidates).next()
235 node.assign_node_id(node_id)
236 to_provision.add(node_id)
237 reserved.add(node_id)
239 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
240 node.make_filter_description())
242 # Now do the backtracking search for a suitable solution
243 # First with existing slice nodes
246 for guid, node in self._elements.iteritems():
247 if isinstance(node, self._node.Node) and node._node_id is None:
248 # Try existing nodes first
249 # If we have only one candidate, simply use it
250 candidates = node.find_candidates(
251 filter_slice_id = self.slice_id)
252 candidates -= reserved
253 reqs.append(candidates)
258 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
260 def pickbest(fullset, nreq, node=nodes[0]):
261 if len(fullset) > nreq:
262 fullset = zip(node.rate_nodes(fullset),fullset)
263 fullset.sort(reverse=True)
265 return set(map(operator.itemgetter(1),fullset))
270 solution = resourcealloc.alloc(reqs, sample=pickbest)
271 except resourcealloc.ResourceAllocationError:
272 # Failed, try again with all nodes
275 candidates = node.find_candidates()
276 candidates -= reserved
277 reqs.append(candidates)
279 solution = resourcealloc.alloc(reqs, sample=pickbest)
280 to_provision.update(solution)
283 for node, node_id in zip(nodes, solution):
284 node.assign_node_id(node_id)
286 def do_provisioning(self):
287 if self._to_provision:
288 # Add new nodes to the slice
289 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
290 new_nodes = list(set(cur_nodes) | self._to_provision)
291 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
294 self._just_provisioned = self._to_provision
295 del self._to_provision
297 def do_wait_nodes(self):
298 for guid, node in self._elements.iteritems():
299 if isinstance(node, self._node.Node):
300 # Just inject configuration stuff
301 node.home_path = "nepi-node-%s" % (guid,)
302 node.ident_path = self.sliceSSHKey
303 node.slicename = self.slicename
306 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
309 for guid, node in self._elements.iteritems():
310 if isinstance(node, self._node.Node):
311 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
313 node.wait_provisioning(
314 (20*60 if node._node_id in self._just_provisioned else 60)
317 self._logger.info("READY Node %s at %s", guid, node.hostname)
319 # Prepare dependency installer now
320 node.prepare_dependencies()
321 except self._node.UnresponsiveNodeError:
323 self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
325 # Mark all dead nodes (which are unresponsive) on the blacklist
327 for guid, node in self._elements.iteritems():
328 if isinstance(node, self._node.Node):
329 if not node.is_alive():
330 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
331 self._blacklist.add(node._node_id)
335 self._save_blacklist()
339 traceback.print_exc()
343 def do_spanning_deployment_plan(self):
344 # Create application groups by collecting all applications
345 # based on their hash - the hash should contain everything that
346 # defines them and the platform they're built
350 frozenset((app.depends or "").split(' ')),
351 frozenset((app.sources or "").split(' ')),
354 app.node.architecture,
355 app.node.operatingSystem,
359 depgroups = collections.defaultdict(list)
361 for element in self._elements.itervalues():
362 if isinstance(element, self._app.Dependency):
363 depgroups[dephash(element)].append(element)
364 elif isinstance(element, self._node.Node):
365 deps = element._yum_dependencies
367 depgroups[dephash(deps)].append(deps)
369 # Set up spanning deployment for those applications that
370 # have been deployed in several nodes.
371 for dh, group in depgroups.iteritems():
373 # Pick root (deterministically)
374 root = min(group, key=lambda app:app.node.hostname)
376 # Obtain all IPs in numeric format
377 # (which means faster distance computations)
379 dep._ip = socket.gethostbyname(dep.node.hostname)
380 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
383 # NOTE: the plan is an iterator
386 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
390 # Re-sign private key
392 tempprk, temppuk, tmppass = self._make_temp_private_key()
398 for slave, master in plan:
399 slave.set_master(master)
400 slave.install_keys(tempprk, temppuk, tmppass)
402 # We don't need the user's passphrase anymore
403 self.sliceSSHKeyPass = None
405 def _make_temp_private_key(self):
406 # Get the user's key's passphrase
407 if not self.sliceSSHKeyPass:
408 if 'SSH_ASKPASS' in os.environ:
409 proc = subprocess.Popen(
410 [ os.environ['SSH_ASKPASS'],
411 "Please type the passphrase for the %s SSH identity file. "
412 "The passphrase will be used to re-cipher the identity file with "
413 "a random 256-bit key for automated chain deployment on the "
414 "%s PlanetLab slice" % (
415 os.path.basename(self.sliceSSHKey),
418 stdin = open("/dev/null"),
419 stdout = subprocess.PIPE,
420 stderr = subprocess.PIPE)
421 out,err = proc.communicate()
422 self.sliceSSHKeyPass = out.strip()
424 if not self.sliceSSHKeyPass:
427 # Create temporary key files
428 prk = tempfile.NamedTemporaryFile(
429 dir = self.root_directory,
430 prefix = "pl_deploy_tmpk_",
433 puk = tempfile.NamedTemporaryFile(
434 dir = self.root_directory,
435 prefix = "pl_deploy_tmpk_",
438 # Create secure 256-bits temporary passphrase
439 passphrase = ''.join(map(chr,[rng.randint(0,255)
440 for rng in (random.SystemRandom(),)
441 for i in xrange(32)] )).encode("hex")
444 oprk = open(self.sliceSSHKey, "rb")
445 opuk = open(self.sliceSSHKey+".pub", "rb")
446 shutil.copymode(oprk.name, prk.name)
447 shutil.copymode(opuk.name, puk.name)
448 shutil.copyfileobj(oprk, prk)
449 shutil.copyfileobj(opuk, puk)
455 # A descriptive comment
456 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
459 proc = subprocess.Popen(
462 "-P", self.sliceSSHKeyPass,
465 stdout = subprocess.PIPE,
466 stderr = subprocess.PIPE,
467 stdin = subprocess.PIPE
469 out, err = proc.communicate()
472 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
478 # Change comment on public key
479 puklines = puk.readlines()
480 puklines[0] = puklines[0].split(' ')
481 puklines[0][-1] = comment+'\n'
482 puklines[0] = ' '.join(puklines[0])
485 puk.writelines(puklines)
489 return prk, puk, passphrase
491 def set(self, guid, name, value, time = TIME_NOW):
492 super(TestbedController, self).set(guid, name, value, time)
493 # TODO: take on account schedule time for the task
494 element = self._elements[guid]
497 setattr(element, name, value)
499 # We ignore these errors while recovering.
500 # Some attributes are immutable, and setting
501 # them is necessary (to recover the state), but
502 # some are not (they throw an exception).
503 if not self.recovering:
506 if hasattr(element, 'refresh'):
507 # invoke attribute refresh hook
510 def get(self, guid, name, time = TIME_NOW):
511 value = super(TestbedController, self).get(guid, name, time)
512 # TODO: take on account schedule time for the task
513 factory_id = self._create[guid]
514 factory = self._factories[factory_id]
515 element = self._elements.get(guid)
517 return getattr(element, name)
518 except (KeyError, AttributeError):
521 def get_address(self, guid, index, attribute='Address'):
525 iface = self._elements.get(guid)
526 if iface and index == 0:
527 if attribute == 'Address':
529 elif attribute == 'NetPrefix':
530 return iface.netprefix
531 elif attribute == 'Broadcast':
532 return iface.broadcast
534 # if all else fails, query box
535 return super(TestbedController, self).get_address(guid, index, attribute)
537 def action(self, time, guid, action):
538 raise NotImplementedError
541 for trace in self._traces.itervalues():
544 def invokeif(action, testbed, guid):
545 element = self._elements[guid]
546 if hasattr(element, action):
547 getattr(element, action)()
549 self._do_in_factory_order(
550 functools.partial(invokeif, 'cleanup'),
551 metadata.shutdown_order)
553 self._do_in_factory_order(
554 functools.partial(invokeif, 'destroy'),
555 metadata.shutdown_order)
557 self._elements.clear()
560 def trace(self, guid, trace_id, attribute='value'):
561 elem = self._elements[guid]
563 if attribute == 'value':
564 path = elem.sync_trace(self.home_directory, trace_id)
571 elif attribute == 'path':
572 content = elem.remote_trace_path(trace_id)
573 elif attribute == 'name':
574 content = elem.remote_trace_name(trace_id)
579 def follow_trace(self, trace_id, trace):
580 self._traces[trace_id] = trace
584 # An internal flag, so we know to behave differently in
585 # a few corner cases.
586 self.recovering = True
588 # Create and connect do not perform any real tasks against
589 # the nodes, it only sets up the object hierarchy,
590 # so we can run them normally
592 self.do_connect_init()
593 self.do_connect_compl()
595 # Manually recover nodes, to mark dependencies installed
596 # and clean up mutable attributes
597 self._do_in_factory_order(
598 lambda self, guid : self._elements[guid].recover(),
603 # Assign nodes - since we're working off exeucte XML, nodes
604 # have specific hostnames assigned and we don't need to do
605 # real assignment, only find out node ids and check liveliness
606 self.do_resource_discovery(recover = True)
609 # Pre/post configure, however, tends to set up tunnels
610 # Execute configuration steps only for those object
611 # kinds that do not have side effects
613 # Do the ones without side effects,
614 # including nodes that need to set up home
615 # folders and all that
616 self._do_in_factory_order(
617 "preconfigure_function",
620 Parallel(metadata.NODE),
624 # Tunnels require a home path that is configured
625 # at this step. Since we cannot run the step itself,
626 # we need to inject this homepath ourselves
627 for guid, element in self._elements.iteritems():
628 if isinstance(element, self._interfaces.TunIface):
629 element._home_path = "tun-%s" % (guid,)
631 # Manually recover tunnels, applications and
632 # netpipes, negating the side effects
633 self._do_in_factory_order(
634 lambda self, guid : self._elements[guid].recover(),
636 Parallel(metadata.TAPIFACE),
637 Parallel(metadata.TUNIFACE),
639 Parallel(metadata.NEPIDEPENDENCY),
640 Parallel(metadata.NS3DEPENDENCY),
641 Parallel(metadata.DEPENDENCY),
642 Parallel(metadata.APPLICATION),
645 # Tunnels are not harmed by configuration after
646 # recovery, and some attributes get set this way
647 # like external_iface
648 self._do_in_factory_order(
649 "preconfigure_function",
651 Parallel(metadata.TAPIFACE),
652 Parallel(metadata.TUNIFACE),
655 # Post-do the ones without side effects
656 self._do_in_factory_order(
657 "configure_function",
660 Parallel(metadata.NODE),
662 Parallel(metadata.TAPIFACE),
663 Parallel(metadata.TUNIFACE),
666 # There are no required prestart steps
667 # to call upon recovery, so we're done
669 self.recovering = True
671 def _make_generic(self, parameters, kind):
672 app = kind(self.plapi)
673 app.testbed = weakref.ref(self)
675 # Note: there is 1-to-1 correspondence between attribute names
676 # If that changes, this has to change as well
677 for attr,val in parameters.iteritems():
679 setattr(app, attr, val)
681 # We ignore these errors while recovering.
682 # Some attributes are immutable, and setting
683 # them is necessary (to recover the state), but
684 # some are not (they throw an exception).
685 if not self.recovering:
690 def _make_node(self, parameters):
691 node = self._make_generic(parameters, self._node.Node)
692 node.enable_cleanup = self.dedicatedSlice
695 def _make_node_iface(self, parameters):
696 return self._make_generic(parameters, self._interfaces.NodeIface)
698 def _make_tun_iface(self, parameters):
699 return self._make_generic(parameters, self._interfaces.TunIface)
701 def _make_tap_iface(self, parameters):
702 return self._make_generic(parameters, self._interfaces.TapIface)
704 def _make_netpipe(self, parameters):
705 return self._make_generic(parameters, self._interfaces.NetPipe)
707 def _make_internet(self, parameters):
708 return self._make_generic(parameters, self._interfaces.Internet)
710 def _make_application(self, parameters):
711 return self._make_generic(parameters, self._app.Application)
713 def _make_dependency(self, parameters):
714 return self._make_generic(parameters, self._app.Dependency)
716 def _make_nepi_dependency(self, parameters):
717 return self._make_generic(parameters, self._app.NepiDependency)
719 def _make_ns3_dependency(self, parameters):
720 return self._make_generic(parameters, self._app.NS3Dependency)
722 def _make_tun_filter(self, parameters):
723 return self._make_generic(parameters, self._interfaces.TunFilter)
725 def _make_class_queue_filter(self, parameters):
726 return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
728 def _make_tos_queue_filter(self, parameters):
729 return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
731 def _make_multicast_forwarder(self, parameters):
732 return self._make_generic(parameters, self._multicast.MulticastForwarder)
734 def _make_multicast_announcer(self, parameters):
735 return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
737 def _make_multicast_router(self, parameters):
738 return self._make_generic(parameters, self._multicast.MulticastRouter)