2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
31 class TempKeyError(Exception):
34 class TestbedController(testbed_impl.TestbedController):
36 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
37 self._home_directory = None
41 import node, interfaces, application, multicast
43 self._interfaces = interfaces
44 self._app = application
45 self._multicast = multicast
47 self._blacklist = set()
48 self._just_provisioned = set()
50 self._load_blacklist()
52 self._logger = logging.getLogger('nepi.testbeds.planetlab')
54 self.recovering = False
57 def home_directory(self):
58 return self._home_directory
62 if not hasattr(self, '_plapi'):
66 self._plapi = plcapi.PLCAPI(
67 username = self.authUser,
68 password = self.authString,
69 hostname = self.plcHost,
70 urlpattern = self.plcUrl
73 # anonymous access - may not be enough for much
74 self._plapi = plcapi.PLCAPI()
79 if not hasattr(self, '_slice_id'):
80 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
82 self._slice_id = slices[0]['slice_id']
84 # If it wasn't found, don't remember this failure, keep trying
90 if not hasattr(self, '_vsys_vnet'):
91 slicetags = self.plapi.GetSliceTags(
92 name = self.slicename,
93 tagname = 'vsys_vnet',
96 self._vsys_vnet = slicetags[0]['value']
98 # If it wasn't found, don't remember this failure, keep trying
100 return self._vsys_vnet
102 def _load_blacklist(self):
103 blpath = environ.homepath('plblacklist')
106 bl = open(blpath, "r")
108 self._blacklist = set()
112 self._blacklist = set(
114 map(str.strip, bl.readlines())
120 def _save_blacklist(self):
121 blpath = environ.homepath('plblacklist')
122 bl = open(blpath, "w")
125 map('%s\n'.__mod__, self._blacklist))
130 self._home_directory = self._attributes.\
131 get_attribute_value("homeDirectory")
132 self.slicename = self._attributes.\
133 get_attribute_value("slice")
134 self.authUser = self._attributes.\
135 get_attribute_value("authUser")
136 self.authString = self._attributes.\
137 get_attribute_value("authPass")
138 self.sliceSSHKey = self._attributes.\
139 get_attribute_value("sliceSSHKey")
140 self.sliceSSHKeyPass = None
141 self.plcHost = self._attributes.\
142 get_attribute_value("plcHost")
143 self.plcUrl = self._attributes.\
144 get_attribute_value("plcUrl")
145 self.logLevel = self._attributes.\
146 get_attribute_value("plLogLevel")
147 self.tapPortBase = self._attributes.\
148 get_attribute_value("tapPortBase")
149 self.p2pDeployment = self._attributes.\
150 get_attribute_value("p2pDeployment")
151 self.dedicatedSlice = self._attributes.\
152 get_attribute_value("dedicatedSlice")
154 if not self.slicename:
155 raise RuntimeError, "Slice not set"
156 if not self.authUser:
157 raise RuntimeError, "PlanetLab account username not set"
158 if not self.authString:
159 raise RuntimeError, "PlanetLab account passphrase not set"
160 if not self.sliceSSHKey:
161 raise RuntimeError, "PlanetLab account key not specified"
162 if not os.path.exists(self.sliceSSHKey):
163 raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
165 self._logger.setLevel(getattr(logging,self.logLevel))
167 super(TestbedController, self).do_setup()
169 def do_post_asynclaunch(self, guid):
170 # Dependencies were launched asynchronously,
172 dep = self._elements[guid]
173 if isinstance(dep, self._app.Dependency):
174 dep.async_setup_wait()
176 # Two-phase configuration for asynchronous launch
177 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
178 do_poststep_configure = staticmethod(do_post_asynclaunch)
180 def do_preconfigure(self):
182 # Perform resource discovery if we don't have
183 # specific resources assigned yet
184 self.do_resource_discovery()
186 # Create PlanetLab slivers
187 self.do_provisioning()
190 # Wait for provisioning
195 except self._node.UnresponsiveNodeError:
199 if self.p2pDeployment:
200 # Plan application deployment
201 self.do_spanning_deployment_plan()
203 # Configure elements per XML data
204 super(TestbedController, self).do_preconfigure()
206 def do_resource_discovery(self, recover = False):
207 to_provision = self._to_provision = set()
209 reserved = set(self._blacklist)
210 for guid, node in self._elements.iteritems():
211 if isinstance(node, self._node.Node) and node._node_id is not None:
212 reserved.add(node._node_id)
215 # look for perfectly defined nodes
216 # (ie: those with only one candidate)
217 reserve_lock = threading.Lock()
218 def assignifunique(guid, node):
219 # Try existing nodes first
220 # If we have only one candidate, simply use it
221 candidates = node.find_candidates(
222 filter_slice_id = self.slice_id)
225 reserve_lock.acquire()
227 candidates -= reserved
228 if len(candidates) == 1:
229 node_id = iter(candidates).next()
230 reserved.add(node_id)
232 # Try again including unassigned nodes
233 reserve_lock.release()
235 candidates = node.find_candidates()
237 reserve_lock.acquire()
238 candidates -= reserved
239 if len(candidates) > 1:
241 if len(candidates) == 1:
242 node_id = iter(candidates).next()
243 to_provision.add(node_id)
244 reserved.add(node_id)
246 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
247 node.make_filter_description())
249 reserve_lock.release()
251 if node_id is not None:
252 node.assign_node_id(node_id)
254 runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
256 for guid, node in self._elements.iteritems():
257 if isinstance(node, self._node.Node) and node._node_id is None:
258 runner.put(assignifunique, guid, node)
261 # Now do the backtracking search for a suitable solution
262 # First with existing slice nodes
265 def genreqs(node, filter_slice_id=None):
266 # Try existing nodes first
267 # If we have only one candidate, simply use it
268 candidates = node.find_candidates(
269 filter_slice_id = filter_slice_id)
270 candidates -= reserved
271 reqs.append(candidates)
273 for guid, node in self._elements.iteritems():
274 if isinstance(node, self._node.Node) and node._node_id is None:
275 runner.put(genreqs, node, self.slice_id)
280 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
282 def pickbest(fullset, nreq, node=nodes[0]):
283 if len(fullset) > nreq:
284 fullset = zip(node.rate_nodes(fullset),fullset)
285 fullset.sort(reverse=True)
287 return set(map(operator.itemgetter(1),fullset))
292 solution = resourcealloc.alloc(reqs, sample=pickbest)
293 except resourcealloc.ResourceAllocationError:
294 # Failed, try again with all nodes
297 runner.put(genreqs, node)
299 solution = resourcealloc.alloc(reqs, sample=pickbest)
300 to_provision.update(solution)
303 for node, node_id in zip(nodes, solution):
304 runner.put(node.assign_node_id, node_id)
307 def do_provisioning(self):
308 if self._to_provision:
309 # Add new nodes to the slice
310 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
311 new_nodes = list(set(cur_nodes) | self._to_provision)
312 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
315 self._just_provisioned = self._to_provision
316 del self._to_provision
318 def do_wait_nodes(self):
319 for guid, node in self._elements.iteritems():
320 if isinstance(node, self._node.Node):
321 # Just inject configuration stuff
322 node.home_path = "nepi-node-%s" % (guid,)
323 node.ident_path = self.sliceSSHKey
324 node.slicename = self.slicename
327 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
330 runner = ParallelRun(maxthreads=64, maxqueue=1)
332 def waitforit(guid, node):
334 node.wait_provisioning(
335 (20*60 if node._node_id in self._just_provisioned else 60)
338 self._logger.info("READY Node %s at %s", guid, node.hostname)
340 # Prepare dependency installer now
341 node.prepare_dependencies()
346 for guid, node in self._elements.iteritems():
349 if isinstance(node, self._node.Node):
350 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
351 runner.put(waitforit, guid, node)
354 except self._node.UnresponsiveNodeError:
356 self._logger.warn("UNRESPONSIVE Nodes")
358 # Mark all dead nodes (which are unresponsive) on the blacklist
360 for guid, node in self._elements.iteritems():
361 if isinstance(node, self._node.Node):
362 if not node.is_alive():
363 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
364 self._blacklist.add(node._node_id)
368 self._save_blacklist()
372 traceback.print_exc()
376 def do_spanning_deployment_plan(self):
377 # Create application groups by collecting all applications
378 # based on their hash - the hash should contain everything that
379 # defines them and the platform they're built
383 frozenset((app.depends or "").split(' ')),
384 frozenset((app.sources or "").split(' ')),
387 app.node.architecture,
388 app.node.operatingSystem,
392 depgroups = collections.defaultdict(list)
394 for element in self._elements.itervalues():
395 if isinstance(element, self._app.Dependency):
396 depgroups[dephash(element)].append(element)
397 elif isinstance(element, self._node.Node):
398 deps = element._yum_dependencies
400 depgroups[dephash(deps)].append(deps)
402 # Set up spanning deployment for those applications that
403 # have been deployed in several nodes.
404 for dh, group in depgroups.iteritems():
406 # Pick root (deterministically)
407 root = min(group, key=lambda app:app.node.hostname)
409 # Obtain all IPs in numeric format
410 # (which means faster distance computations)
412 dep._ip = socket.gethostbyname(dep.node.hostname)
413 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
416 # NOTE: the plan is an iterator
419 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
423 # Re-sign private key
425 tempprk, temppuk, tmppass = self._make_temp_private_key()
431 for slave, master in plan:
432 slave.set_master(master)
433 slave.install_keys(tempprk, temppuk, tmppass)
435 # We don't need the user's passphrase anymore
436 self.sliceSSHKeyPass = None
438 def _make_temp_private_key(self):
439 # Get the user's key's passphrase
440 if not self.sliceSSHKeyPass:
441 if 'SSH_ASKPASS' in os.environ:
442 proc = subprocess.Popen(
443 [ os.environ['SSH_ASKPASS'],
444 "Please type the passphrase for the %s SSH identity file. "
445 "The passphrase will be used to re-cipher the identity file with "
446 "a random 256-bit key for automated chain deployment on the "
447 "%s PlanetLab slice" % (
448 os.path.basename(self.sliceSSHKey),
451 stdin = open("/dev/null"),
452 stdout = subprocess.PIPE,
453 stderr = subprocess.PIPE)
454 out,err = proc.communicate()
455 self.sliceSSHKeyPass = out.strip()
457 if not self.sliceSSHKeyPass:
460 # Create temporary key files
461 prk = tempfile.NamedTemporaryFile(
462 dir = self.root_directory,
463 prefix = "pl_deploy_tmpk_",
466 puk = tempfile.NamedTemporaryFile(
467 dir = self.root_directory,
468 prefix = "pl_deploy_tmpk_",
471 # Create secure 256-bits temporary passphrase
472 passphrase = ''.join(map(chr,[rng.randint(0,255)
473 for rng in (random.SystemRandom(),)
474 for i in xrange(32)] )).encode("hex")
477 oprk = open(self.sliceSSHKey, "rb")
478 opuk = open(self.sliceSSHKey+".pub", "rb")
479 shutil.copymode(oprk.name, prk.name)
480 shutil.copymode(opuk.name, puk.name)
481 shutil.copyfileobj(oprk, prk)
482 shutil.copyfileobj(opuk, puk)
488 # A descriptive comment
489 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
492 proc = subprocess.Popen(
495 "-P", self.sliceSSHKeyPass,
498 stdout = subprocess.PIPE,
499 stderr = subprocess.PIPE,
500 stdin = subprocess.PIPE
502 out, err = proc.communicate()
505 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
511 # Change comment on public key
512 puklines = puk.readlines()
513 puklines[0] = puklines[0].split(' ')
514 puklines[0][-1] = comment+'\n'
515 puklines[0] = ' '.join(puklines[0])
518 puk.writelines(puklines)
522 return prk, puk, passphrase
524 def set(self, guid, name, value, time = TIME_NOW):
525 super(TestbedController, self).set(guid, name, value, time)
526 # TODO: take on account schedule time for the task
527 element = self._elements[guid]
530 setattr(element, name, value)
532 # We ignore these errors while recovering.
533 # Some attributes are immutable, and setting
534 # them is necessary (to recover the state), but
535 # some are not (they throw an exception).
536 if not self.recovering:
539 if hasattr(element, 'refresh'):
540 # invoke attribute refresh hook
543 def get(self, guid, name, time = TIME_NOW):
544 value = super(TestbedController, self).get(guid, name, time)
545 # TODO: take on account schedule time for the task
546 factory_id = self._create[guid]
547 factory = self._factories[factory_id]
548 element = self._elements.get(guid)
550 return getattr(element, name)
551 except (KeyError, AttributeError):
554 def get_address(self, guid, index, attribute='Address'):
558 iface = self._elements.get(guid)
559 if iface and index == 0:
560 if attribute == 'Address':
562 elif attribute == 'NetPrefix':
563 return iface.netprefix
564 elif attribute == 'Broadcast':
565 return iface.broadcast
567 # if all else fails, query box
568 return super(TestbedController, self).get_address(guid, index, attribute)
570 def action(self, time, guid, action):
571 raise NotImplementedError
574 for trace in self._traces.itervalues():
577 def invokeif(action, testbed, guid):
578 element = self._elements[guid]
579 if hasattr(element, action):
580 getattr(element, action)()
582 self._do_in_factory_order(
583 functools.partial(invokeif, 'cleanup'),
584 metadata.shutdown_order)
586 self._do_in_factory_order(
587 functools.partial(invokeif, 'destroy'),
588 metadata.shutdown_order)
590 self._elements.clear()
593 def trace(self, guid, trace_id, attribute='value'):
594 elem = self._elements[guid]
596 if attribute == 'value':
597 path = elem.sync_trace(self.home_directory, trace_id)
604 elif attribute == 'path':
605 content = elem.remote_trace_path(trace_id)
606 elif attribute == 'name':
607 content = elem.remote_trace_name(trace_id)
612 def follow_trace(self, trace_id, trace):
613 self._traces[trace_id] = trace
617 # An internal flag, so we know to behave differently in
618 # a few corner cases.
619 self.recovering = True
621 # Create and connect do not perform any real tasks against
622 # the nodes, it only sets up the object hierarchy,
623 # so we can run them normally
625 self.do_connect_init()
626 self.do_connect_compl()
628 # Manually recover nodes, to mark dependencies installed
629 # and clean up mutable attributes
630 self._do_in_factory_order(
631 lambda self, guid : self._elements[guid].recover(),
636 # Assign nodes - since we're working off exeucte XML, nodes
637 # have specific hostnames assigned and we don't need to do
638 # real assignment, only find out node ids and check liveliness
639 self.do_resource_discovery(recover = True)
642 # Pre/post configure, however, tends to set up tunnels
643 # Execute configuration steps only for those object
644 # kinds that do not have side effects
646 # Do the ones without side effects,
647 # including nodes that need to set up home
648 # folders and all that
649 self._do_in_factory_order(
650 "preconfigure_function",
653 Parallel(metadata.NODE),
657 # Tunnels require a home path that is configured
658 # at this step. Since we cannot run the step itself,
659 # we need to inject this homepath ourselves
660 for guid, element in self._elements.iteritems():
661 if isinstance(element, self._interfaces.TunIface):
662 element._home_path = "tun-%s" % (guid,)
664 # Manually recover tunnels, applications and
665 # netpipes, negating the side effects
666 self._do_in_factory_order(
667 lambda self, guid : self._elements[guid].recover(),
669 Parallel(metadata.TAPIFACE),
670 Parallel(metadata.TUNIFACE),
672 Parallel(metadata.NEPIDEPENDENCY),
673 Parallel(metadata.NS3DEPENDENCY),
674 Parallel(metadata.DEPENDENCY),
675 Parallel(metadata.APPLICATION),
678 # Tunnels are not harmed by configuration after
679 # recovery, and some attributes get set this way
680 # like external_iface
681 self._do_in_factory_order(
682 "preconfigure_function",
684 Parallel(metadata.TAPIFACE),
685 Parallel(metadata.TUNIFACE),
688 # Post-do the ones without side effects
689 self._do_in_factory_order(
690 "configure_function",
693 Parallel(metadata.NODE),
695 Parallel(metadata.TAPIFACE),
696 Parallel(metadata.TUNIFACE),
699 # There are no required prestart steps
700 # to call upon recovery, so we're done
702 self.recovering = True
704 def _make_generic(self, parameters, kind):
705 app = kind(self.plapi)
706 app.testbed = weakref.ref(self)
708 # Note: there is 1-to-1 correspondence between attribute names
709 # If that changes, this has to change as well
710 for attr,val in parameters.iteritems():
712 setattr(app, attr, val)
714 # We ignore these errors while recovering.
715 # Some attributes are immutable, and setting
716 # them is necessary (to recover the state), but
717 # some are not (they throw an exception).
718 if not self.recovering:
723 def _make_node(self, parameters):
724 node = self._make_generic(parameters, self._node.Node)
725 node.enable_cleanup = self.dedicatedSlice
728 def _make_node_iface(self, parameters):
729 return self._make_generic(parameters, self._interfaces.NodeIface)
731 def _make_tun_iface(self, parameters):
732 return self._make_generic(parameters, self._interfaces.TunIface)
734 def _make_tap_iface(self, parameters):
735 return self._make_generic(parameters, self._interfaces.TapIface)
737 def _make_netpipe(self, parameters):
738 return self._make_generic(parameters, self._interfaces.NetPipe)
740 def _make_internet(self, parameters):
741 return self._make_generic(parameters, self._interfaces.Internet)
743 def _make_application(self, parameters):
744 return self._make_generic(parameters, self._app.Application)
746 def _make_dependency(self, parameters):
747 return self._make_generic(parameters, self._app.Dependency)
749 def _make_nepi_dependency(self, parameters):
750 return self._make_generic(parameters, self._app.NepiDependency)
752 def _make_ns3_dependency(self, parameters):
753 return self._make_generic(parameters, self._app.NS3Dependency)
755 def _make_tun_filter(self, parameters):
756 return self._make_generic(parameters, self._interfaces.TunFilter)
758 def _make_class_queue_filter(self, parameters):
759 return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
761 def _make_tos_queue_filter(self, parameters):
762 return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
764 def _make_multicast_forwarder(self, parameters):
765 return self._make_generic(parameters, self._multicast.MulticastForwarder)
767 def _make_multicast_announcer(self, parameters):
768 return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
770 def _make_multicast_router(self, parameters):
771 return self._make_generic(parameters, self._multicast.MulticastRouter)