1 # -*- coding: utf-8 -*-
3 from constants import TESTBED_ID, TESTBED_VERSION
4 from nepi.core import testbed_impl
5 from nepi.core.metadata import Parallel
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
31 class TempKeyError(Exception):
34 class TestbedController(testbed_impl.TestbedController):
36 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
37 self._home_directory = None
41 import node, interfaces, application, multicast
43 self._interfaces = interfaces
44 self._app = application
45 self._multicast = multicast
47 self._blacklist = set()
48 self._just_provisioned = set()
50 self._load_blacklist()
55 self._vsys_vnet = None
57 self._logger = logging.getLogger('nepi.testbeds.planetlab')
59 self.recovering = False
62 def home_directory(self):
63 return self._home_directory
69 self._plcapi = plcapi.plcapi(
79 if not self._sliceapi:
81 self._sliceapi = self.plcapi
83 from nepi.util import sfiapi
84 self._sliceapi = sfiapi.sfiapi(self.slice_id)
89 if not self._slice_id:
90 self._slice_id = self.sliceapi.GetSliceId(self.slicename)
95 if not self._vsys_vnet:
96 self._vsys_vnet = self.sliceapi.GetSliceVnetSysTag(self.slicename)
97 return self._vsys_vnet
99 def _load_blacklist(self):
100 blpath = environ.homepath('plblacklist')
103 bl = open(blpath, "r")
105 self._blacklist = set()
109 self._blacklist = set(
110 map(str.strip, bl.readlines())
115 def _save_blacklist(self):
116 blpath = environ.homepath('plblacklist')
117 bl = open(blpath, "w")
120 map('%s\n'.__mod__, self._blacklist))
125 self._home_directory = self._attributes.\
126 get_attribute_value("homeDirectory")
127 self.slicename = self._attributes.\
128 get_attribute_value("slice")
129 self.authUser = self._attributes.\
130 get_attribute_value("authUser")
131 self.authString = self._attributes.\
132 get_attribute_value("authPass")
133 self.sliceSSHKey = self._attributes.\
134 get_attribute_value("sliceSSHKey")
135 self.sliceSSHKeyPass = None
136 self.plcHost = self._attributes.\
137 get_attribute_value("plcHost")
138 self.plcUrl = self._attributes.\
139 get_attribute_value("plcUrl")
140 self.logLevel = self._attributes.\
141 get_attribute_value("plLogLevel")
142 self.tapPortBase = self._attributes.\
143 get_attribute_value("tapPortBase")
144 self.p2pDeployment = self._attributes.\
145 get_attribute_value("p2pDeployment")
146 self.dedicatedSlice = self._attributes.\
147 get_attribute_value("dedicatedSlice")
148 self.sfa = self._attributes.\
149 get_attribute_value("sfa")
151 self._slice_id = self._attributes.\
152 get_attribute_value("sliceHrn")
154 if not self.slicename:
155 raise RuntimeError, "Slice not set"
156 if not self.authUser:
157 raise RuntimeError, "PlanetLab account username not set"
158 if not self.authString:
159 raise RuntimeError, "PlanetLab account passphrase not set"
160 if not self.sliceSSHKey:
161 raise RuntimeError, "PlanetLab account key not specified"
162 if not os.path.exists(self.sliceSSHKey):
163 raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
165 self._logger.setLevel(getattr(logging,self.logLevel))
167 super(TestbedController, self).do_setup()
169 def do_post_asynclaunch(self, guid):
170 # Dependencies were launched asynchronously,
172 dep = self._elements[guid]
173 if isinstance(dep, self._app.Dependency):
174 dep.async_setup_wait()
176 # Two-phase configuration for asynchronous launch
177 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
178 do_poststep_configure = staticmethod(do_post_asynclaunch)
180 def do_preconfigure(self):
182 # Perform resource discovery if we don't have
183 # specific resources assigned yet
184 self.do_resource_discovery()
186 # Create PlanetLab slivers
187 self.do_provisioning()
190 # Wait for provisioning
195 except self._node.UnresponsiveNodeError:
199 if self.p2pDeployment:
200 # Plan application deployment
201 self.do_spanning_deployment_plan()
203 # Configure elements per XML data
204 super(TestbedController, self).do_preconfigure()
206 def do_resource_discovery(self, recover = False):
207 to_provision = self._to_provision = set()
209 reserved = set(self._blacklist)
210 for guid, node in self._elements.iteritems():
211 if isinstance(node, self._node.Node) and node._node_id is not None:
212 reserved.add(node.hostname)
215 # look for perfectly defined nodes
216 # (ie: those with only one candidate)
217 reserve_lock = threading.RLock()
218 def assignifunique(guid, node):
219 # Try existing nodes first
220 # If we have only one candidate, simply use it
221 candidates = node.find_candidates(
222 filter_slice_id = self.slice_id)
225 candidate_hosts = set(candidates.keys() if candidates else [])
226 reserve_lock.acquire()
228 candidate_hosts -= reserved
229 if len(candidate_hosts) == 1:
230 hostname = iter(candidate_hosts).next()
231 node_id = candidates[hostname]
232 reserved.add(hostname)
233 elif not candidate_hosts:
234 # Try again including unassigned nodes
235 reserve_lock.release()
237 candidates = node.find_candidates()
239 reserve_lock.acquire()
240 candidate_hosts = set(candidates.keys() if candidates else [])
241 candidate_hosts -= reserved
242 if len(candidate_hosts) > 1:
244 if len(candidate_hosts) == 1:
245 hostname = iter(candidate_hosts).next()
246 node_id = candidates[hostname]
247 to_provision.add(node_id)
248 reserved.add(hostname)
250 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
251 node.make_filter_description())
253 reserve_lock.release()
255 if node_id is not None:
256 node.assign_node_id(node_id)
258 runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
260 for guid, node in self._elements.iteritems():
261 if isinstance(node, self._node.Node) and node._node_id is None:
262 runner.put(assignifunique, guid, node)
265 # Now do the backtracking search for a suitable solution
266 # First with existing slice nodes
269 def genreqs(node, filter_slice_id=None):
270 # Try existing nodes first
271 # If we have only one candidate, simply use it
272 candidates = node.find_candidates(
273 filter_slice_id = filter_slice_id)
275 if candidates.has_key(r):
277 reqs.append(candidates.values())
279 for guid, node in self._elements.iteritems():
280 if isinstance(node, self._node.Node) and node._node_id is None:
281 runner.put(genreqs, node, self.slice_id)
286 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
288 def pickbest(fullset, nreq, node=nodes[0]):
289 if len(fullset) > nreq:
290 fullset = zip(node.rate_nodes(fullset),fullset)
291 fullset.sort(reverse=True)
293 return set(map(operator.itemgetter(1),fullset))
298 solution = resourcealloc.alloc(reqs, sample=pickbest)
299 except resourcealloc.ResourceAllocationError:
300 # Failed, try again with all nodes
303 runner.put(genreqs, node)
305 solution = resourcealloc.alloc(reqs, sample=pickbest)
306 to_provision.update(solution)
309 for node, node_id in zip(nodes, solution):
310 runner.put(node.assign_node_id, node_id)
313 def do_provisioning(self):
314 if self._to_provision:
315 # Add new nodes to the slice
316 cur_nodes = self.sliceapi.GetSliceNodes(self.slice_id)
317 new_nodes = list(set(cur_nodes) | self._to_provision)
318 self.sliceapi.AddSliceNodes(self.slice_id, nodes=new_nodes)
321 self._just_provisioned = self._to_provision
322 del self._to_provision
324 def do_wait_nodes(self):
325 for guid, node in self._elements.iteritems():
326 if isinstance(node, self._node.Node):
327 # Just inject configuration stuff
328 node.home_path = "nepi-node-%s" % (guid,)
329 node.ident_path = self.sliceSSHKey
330 node.slicename = self.slicename
333 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
336 runner = ParallelRun(maxthreads=64, maxqueue=1)
338 def waitforit(guid, node):
340 node.wait_provisioning(
341 (20*60 if node._node_id in self._just_provisioned else 60)
344 self._logger.info("READY Node %s at %s", guid, node.hostname)
346 # Prepare dependency installer now
347 node.prepare_dependencies()
352 for guid, node in self._elements.iteritems():
355 if isinstance(node, self._node.Node):
356 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
357 runner.put(waitforit, guid, node)
360 except self._node.UnresponsiveNodeError:
362 self._logger.warn("UNRESPONSIVE Nodes")
364 # Mark all dead nodes (which are unresponsive) on the blacklist
366 for guid, node in self._elements.iteritems():
367 if isinstance(node, self._node.Node):
368 if not node.is_alive():
369 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
370 self._blacklist.add(node.hostname)
374 self._save_blacklist()
378 traceback.print_exc()
382 def do_spanning_deployment_plan(self):
383 # Create application groups by collecting all applications
384 # based on their hash - the hash should contain everything that
385 # defines them and the platform they're built
389 frozenset((app.depends or "").split(' ')),
390 frozenset((app.sources or "").split(' ')),
393 app.node.architecture,
394 app.node.operatingSystem,
399 depgroups = collections.defaultdict(list)
401 for element in self._elements.itervalues():
402 if isinstance(element, self._app.Dependency):
403 depgroups[dephash(element)].append(element)
404 elif isinstance(element, self._node.Node):
405 deps = element._yum_dependencies
407 depgroups[dephash(deps)].append(deps)
409 # Set up spanning deployment for those applications that
410 # have been deployed in several nodes.
411 for dh, group in depgroups.iteritems():
413 # Pick root (deterministically)
414 root = min(group, key=lambda app:app.node.hostname)
416 # Obtain all IPs in numeric format
417 # (which means faster distance computations)
419 dep._ip = socket.gethostbyname(dep.node.hostname)
420 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
423 # NOTE: the plan is an iterator
426 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
430 # Re-sign private key
432 tempprk, temppuk, tmppass = self._make_temp_private_key()
438 for slave, master in plan:
439 slave.set_master(master)
440 slave.install_keys(tempprk, temppuk, tmppass)
442 # We don't need the user's passphrase anymore
443 self.sliceSSHKeyPass = None
445 def _make_temp_private_key(self):
446 # Get the user's key's passphrase
447 if not self.sliceSSHKeyPass:
448 if 'SSH_ASKPASS' in os.environ:
449 proc = subprocess.Popen(
450 [ os.environ['SSH_ASKPASS'],
451 "Please type the passphrase for the %s SSH identity file. "
452 "The passphrase will be used to re-cipher the identity file with "
453 "a random 256-bit key for automated chain deployment on the "
454 "%s PlanetLab slice" % (
455 os.path.basename(self.sliceSSHKey),
458 stdin = open("/dev/null"),
459 stdout = subprocess.PIPE,
460 stderr = subprocess.PIPE)
461 out,err = proc.communicate()
462 self.sliceSSHKeyPass = out.strip()
464 if not self.sliceSSHKeyPass:
467 # Create temporary key files
468 prk = tempfile.NamedTemporaryFile(
469 dir = self.root_directory,
470 prefix = "pl_deploy_tmpk_",
473 puk = tempfile.NamedTemporaryFile(
474 dir = self.root_directory,
475 prefix = "pl_deploy_tmpk_",
478 # Create secure 256-bits temporary passphrase
479 passphrase = os.urandom(32).encode("hex")
482 oprk = open(self.sliceSSHKey, "rb")
483 opuk = open(self.sliceSSHKey+".pub", "rb")
484 shutil.copymode(oprk.name, prk.name)
485 shutil.copymode(opuk.name, puk.name)
486 shutil.copyfileobj(oprk, prk)
487 shutil.copyfileobj(opuk, puk)
493 # A descriptive comment
494 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
497 proc = subprocess.Popen(
500 "-P", self.sliceSSHKeyPass,
503 stdout = subprocess.PIPE,
504 stderr = subprocess.PIPE,
505 stdin = subprocess.PIPE
507 out, err = proc.communicate()
510 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
516 # Change comment on public key
517 puklines = puk.readlines()
518 puklines[0] = puklines[0].split(' ')
519 puklines[0][-1] = comment+'\n'
520 puklines[0] = ' '.join(puklines[0])
523 puk.writelines(puklines)
527 return prk, puk, passphrase
529 def set(self, guid, name, value, time = TIME_NOW):
530 super(TestbedController, self).set(guid, name, value, time)
531 # TODO: take on account schedule time for the task
532 element = self._elements[guid]
535 setattr(element, name, value)
537 # We ignore these errors while recovering.
538 # Some attributes are immutable, and setting
539 # them is necessary (to recover the state), but
540 # some are not (they throw an exception).
541 if not self.recovering:
544 if hasattr(element, 'refresh'):
545 # invoke attribute refresh hook
548 def get(self, guid, name, time = TIME_NOW):
549 value = super(TestbedController, self).get(guid, name, time)
550 # TODO: take on account schedule time for the task
551 factory_id = self._create[guid]
552 factory = self._factories[factory_id]
553 element = self._elements.get(guid)
555 return getattr(element, name)
556 except (KeyError, AttributeError):
559 def get_address(self, guid, index, attribute='Address'):
563 iface = self._elements.get(guid)
564 if iface and index == 0:
565 if attribute == 'Address':
567 elif attribute == 'NetPrefix':
568 return iface.netprefix
569 elif attribute == 'Broadcast':
570 return iface.broadcast
572 # if all else fails, query box
573 return super(TestbedController, self).get_address(guid, index, attribute)
575 def action(self, time, guid, action):
576 raise NotImplementedError
579 for trace in self._traces.itervalues():
582 def invokeif(action, testbed, guid):
583 element = self._elements[guid]
584 if hasattr(element, action):
585 getattr(element, action)()
587 self._do_in_factory_order(
588 functools.partial(invokeif, 'cleanup'),
589 metadata.shutdown_order)
591 self._do_in_factory_order(
592 functools.partial(invokeif, 'destroy'),
593 metadata.shutdown_order)
595 self._elements.clear()
598 def trace(self, guid, trace_id, attribute='value'):
599 elem = self._elements[guid]
601 if attribute == 'value':
602 path = elem.sync_trace(self.home_directory, trace_id)
609 elif attribute == 'path':
610 content = elem.remote_trace_path(trace_id)
611 elif attribute == 'name':
612 content = elem.remote_trace_name(trace_id)
617 def follow_trace(self, trace_id, trace):
618 self._traces[trace_id] = trace
622 # An internal flag, so we know to behave differently in
623 # a few corner cases.
624 self.recovering = True
626 # Create and connect do not perform any real tasks against
627 # the nodes, it only sets up the object hierarchy,
628 # so we can run them normally
630 self.do_connect_init()
631 self.do_connect_compl()
633 # Manually recover nodes, to mark dependencies installed
634 # and clean up mutable attributes
635 self._do_in_factory_order(
636 lambda self, guid : self._elements[guid].recover(),
641 # Assign nodes - since we're working off exeucte XML, nodes
642 # have specific hostnames assigned and we don't need to do
643 # real assignment, only find out node ids and check liveliness
644 self.do_resource_discovery(recover = True)
647 # Pre/post configure, however, tends to set up tunnels
648 # Execute configuration steps only for those object
649 # kinds that do not have side effects
651 # Do the ones without side effects,
652 # including nodes that need to set up home
653 # folders and all that
654 self._do_in_factory_order(
655 "preconfigure_function",
658 Parallel(metadata.NODE),
662 # Tunnels require a home path that is configured
663 # at this step. Since we cannot run the step itself,
664 # we need to inject this homepath ourselves
665 for guid, element in self._elements.iteritems():
666 if isinstance(element, self._interfaces.TunIface):
667 element._home_path = "tun-%s" % (guid,)
669 # Manually recover tunnels, applications and
670 # netpipes, negating the side effects
671 self._do_in_factory_order(
672 lambda self, guid : self._elements[guid].recover(),
674 Parallel(metadata.TAPIFACE),
675 Parallel(metadata.TUNIFACE),
677 Parallel(metadata.NEPIDEPENDENCY),
678 Parallel(metadata.NS3DEPENDENCY),
679 Parallel(metadata.DEPENDENCY),
680 Parallel(metadata.APPLICATION),
683 # Tunnels are not harmed by configuration after
684 # recovery, and some attributes get set this way
685 # like external_iface
686 self._do_in_factory_order(
687 "preconfigure_function",
689 Parallel(metadata.TAPIFACE),
690 Parallel(metadata.TUNIFACE),
693 # Post-do the ones without side effects
694 self._do_in_factory_order(
695 "configure_function",
698 Parallel(metadata.NODE),
700 Parallel(metadata.TAPIFACE),
701 Parallel(metadata.TUNIFACE),
704 # There are no required prestart steps
705 # to call upon recovery, so we're done
707 self.recovering = True
709 def _make_generic(self, parameters, kind, **kwargs):
710 args = dict({'api': self.plcapi})
713 app.testbed = weakref.ref(self)
715 # Note: there is 1-to-1 correspondence between attribute names
716 # If that changes, this has to change as well
717 for attr,val in parameters.iteritems():
719 setattr(app, attr, val)
721 # We ignore these errors while recovering.
722 # Some attributes are immutable, and setting
723 # them is necessary (to recover the state), but
724 # some are not (they throw an exception).
725 if not self.recovering:
730 def _make_node(self, parameters):
731 args = dict({'sliceapi': self.sliceapi})
732 node = self._make_generic(parameters, self._node.Node, **args)
733 node.enable_cleanup = self.dedicatedSlice
736 def _make_node_iface(self, parameters):
737 return self._make_generic(parameters, self._interfaces.NodeIface)
739 def _make_tun_iface(self, parameters):
740 return self._make_generic(parameters, self._interfaces.TunIface)
742 def _make_tap_iface(self, parameters):
743 return self._make_generic(parameters, self._interfaces.TapIface)
745 def _make_netpipe(self, parameters):
746 return self._make_generic(parameters, self._interfaces.NetPipe)
748 def _make_internet(self, parameters):
749 return self._make_generic(parameters, self._interfaces.Internet)
751 def _make_application(self, parameters):
752 return self._make_generic(parameters, self._app.Application)
754 def _make_dependency(self, parameters):
755 return self._make_generic(parameters, self._app.Dependency)
757 def _make_nepi_dependency(self, parameters):
758 return self._make_generic(parameters, self._app.NepiDependency)
760 def _make_ns3_dependency(self, parameters):
761 return self._make_generic(parameters, self._app.NS3Dependency)
763 def _make_tun_filter(self, parameters):
764 return self._make_generic(parameters, self._interfaces.TunFilter)
766 def _make_class_queue_filter(self, parameters):
767 return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
769 def _make_tos_queue_filter(self, parameters):
770 return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
772 def _make_multicast_forwarder(self, parameters):
773 return self._make_generic(parameters, self._multicast.MulticastForwarder)
775 def _make_multicast_announcer(self, parameters):
776 return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
778 def _make_multicast_router(self, parameters):
779 return self._make_generic(parameters, self._multicast.MulticastRouter)