2 # -*- coding: utf-8 -*-
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
30 class TempKeyError(Exception):
33 class TestbedController(testbed_impl.TestbedController):
35 super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
36 self._home_directory = None
40 import node, interfaces, application
42 self._interfaces = interfaces
43 self._app = application
45 self._blacklist = set()
46 self._just_provisioned = set()
48 self._load_blacklist()
50 self._logger = logging.getLogger('nepi.testbeds.planetlab')
52 self.recovering = False
55 def home_directory(self):
56 return self._home_directory
60 if not hasattr(self, '_plapi'):
64 self._plapi = plcapi.PLCAPI(
65 username = self.authUser,
66 password = self.authString,
67 hostname = self.plcHost,
68 urlpattern = self.plcUrl
71 # anonymous access - may not be enough for much
72 self._plapi = plcapi.PLCAPI()
77 if not hasattr(self, '_slice_id'):
78 slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
80 self._slice_id = slices[0]['slice_id']
82 # If it wasn't found, don't remember this failure, keep trying
88 if not hasattr(self, '_vsys_vnet'):
89 slicetags = self.plapi.GetSliceTags(
90 name = self.slicename,
91 tagname = 'vsys_vnet',
94 self._vsys_vnet = slicetags[0]['value']
96 # If it wasn't found, don't remember this failure, keep trying
98 return self._vsys_vnet
100 def _load_blacklist(self):
101 blpath = environ.homepath('plblacklist')
104 bl = open(blpath, "r")
106 self._blacklist = set()
110 self._blacklist = set(
112 map(str.strip, bl.readlines())
118 def _save_blacklist(self):
119 blpath = environ.homepath('plblacklist')
120 bl = open(blpath, "w")
123 map('%s\n'.__mod__, self._blacklist))
128 self._home_directory = self._attributes.\
129 get_attribute_value("homeDirectory")
130 self.slicename = self._attributes.\
131 get_attribute_value("slice")
132 self.authUser = self._attributes.\
133 get_attribute_value("authUser")
134 self.authString = self._attributes.\
135 get_attribute_value("authPass")
136 self.sliceSSHKey = self._attributes.\
137 get_attribute_value("sliceSSHKey")
138 self.sliceSSHKeyPass = None
139 self.plcHost = self._attributes.\
140 get_attribute_value("plcHost")
141 self.plcUrl = self._attributes.\
142 get_attribute_value("plcUrl")
143 self.logLevel = self._attributes.\
144 get_attribute_value("plLogLevel")
145 self.tapPortBase = self._attributes.\
146 get_attribute_value("tapPortBase")
147 self.p2pDeployment = self._attributes.\
148 get_attribute_value("p2pDeployment")
149 self.dedicatedSlice = self._attributes.\
150 get_attribute_value("dedicatedSlice")
152 if not self.slicename:
153 raise RuntimeError, "Slice not set"
154 if not self.authUser:
155 raise RuntimeError, "PlanetLab account username not set"
156 if not self.authString:
157 raise RuntimeError, "PlanetLab account passphrase not set"
158 if not self.sliceSSHKey:
159 raise RuntimeError, "PlanetLab account key not specified"
160 if not os.path.exists(self.sliceSSHKey):
161 raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
163 self._logger.setLevel(getattr(logging,self.logLevel))
165 super(TestbedController, self).do_setup()
167 def do_post_asynclaunch(self, guid):
168 # Dependencies were launched asynchronously,
170 dep = self._elements[guid]
171 if isinstance(dep, self._app.Dependency):
172 dep.async_setup_wait()
174 # Two-phase configuration for asynchronous launch
175 do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
176 do_poststep_configure = staticmethod(do_post_asynclaunch)
178 def do_preconfigure(self):
180 # Perform resource discovery if we don't have
181 # specific resources assigned yet
182 self.do_resource_discovery()
184 # Create PlanetLab slivers
185 self.do_provisioning()
188 # Wait for provisioning
193 except self._node.UnresponsiveNodeError:
197 if self.p2pDeployment:
198 # Plan application deployment
199 self.do_spanning_deployment_plan()
201 # Configure elements per XML data
202 super(TestbedController, self).do_preconfigure()
204 def do_resource_discovery(self, recover = False):
205 to_provision = self._to_provision = set()
207 reserved = set(self._blacklist)
208 for guid, node in self._elements.iteritems():
209 if isinstance(node, self._node.Node) and node._node_id is not None:
210 reserved.add(node._node_id)
213 # look for perfectly defined nodes
214 # (ie: those with only one candidate)
215 for guid, node in self._elements.iteritems():
216 if isinstance(node, self._node.Node) and node._node_id is None:
217 # Try existing nodes first
218 # If we have only one candidate, simply use it
219 candidates = node.find_candidates(
220 filter_slice_id = self.slice_id)
221 candidates -= reserved
222 if len(candidates) == 1:
223 node_id = iter(candidates).next()
224 node.assign_node_id(node_id)
225 reserved.add(node_id)
227 # Try again including unassigned nodes
228 candidates = node.find_candidates()
229 candidates -= reserved
230 if len(candidates) > 1:
232 if len(candidates) == 1:
233 node_id = iter(candidates).next()
234 node.assign_node_id(node_id)
235 to_provision.add(node_id)
236 reserved.add(node_id)
238 raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
239 node.make_filter_description())
241 # Now do the backtracking search for a suitable solution
242 # First with existing slice nodes
245 for guid, node in self._elements.iteritems():
246 if isinstance(node, self._node.Node) and node._node_id is None:
247 # Try existing nodes first
248 # If we have only one candidate, simply use it
249 candidates = node.find_candidates(
250 filter_slice_id = self.slice_id)
251 candidates -= reserved
252 reqs.append(candidates)
257 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
260 solution = resourcealloc.alloc(reqs)
261 except resourcealloc.ResourceAllocationError:
262 # Failed, try again with all nodes
265 candidates = node.find_candidates()
266 candidates -= reserved
267 reqs.append(candidates)
269 solution = resourcealloc.alloc(reqs)
270 to_provision.update(solution)
273 for node, node_id in zip(nodes, solution):
274 node.assign_node_id(node_id)
276 def do_provisioning(self):
277 if self._to_provision:
278 # Add new nodes to the slice
279 cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
280 new_nodes = list(set(cur_nodes) | self._to_provision)
281 self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
284 self._just_provisioned = self._to_provision
285 del self._to_provision
287 def do_wait_nodes(self):
288 for guid, node in self._elements.iteritems():
289 if isinstance(node, self._node.Node):
290 # Just inject configuration stuff
291 node.home_path = "nepi-node-%s" % (guid,)
292 node.ident_path = self.sliceSSHKey
293 node.slicename = self.slicename
296 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
299 for guid, node in self._elements.iteritems():
300 if isinstance(node, self._node.Node):
301 self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
303 node.wait_provisioning(
304 (20*60 if node._node_id in self._just_provisioned else 60)
307 self._logger.info("READY Node %s at %s", guid, node.hostname)
309 # Prepare dependency installer now
310 node.prepare_dependencies()
311 except self._node.UnresponsiveNodeError:
313 self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
315 # Mark all dead nodes (which are unresponsive) on the blacklist
317 for guid, node in self._elements.iteritems():
318 if isinstance(node, self._node.Node):
319 if not node.is_alive():
320 self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
321 self._blacklist.add(node._node_id)
325 self._save_blacklist()
329 traceback.print_exc()
333 def do_spanning_deployment_plan(self):
334 # Create application groups by collecting all applications
335 # based on their hash - the hash should contain everything that
336 # defines them and the platform they're built
340 frozenset((app.depends or "").split(' ')),
341 frozenset((app.sources or "").split(' ')),
344 app.node.architecture,
345 app.node.operatingSystem,
349 depgroups = collections.defaultdict(list)
351 for element in self._elements.itervalues():
352 if isinstance(element, self._app.Dependency):
353 depgroups[dephash(element)].append(element)
354 elif isinstance(element, self._node.Node):
355 deps = element._yum_dependencies
357 depgroups[dephash(deps)].append(deps)
359 # Set up spanning deployment for those applications that
360 # have been deployed in several nodes.
361 for dh, group in depgroups.iteritems():
363 # Pick root (deterministically)
364 root = min(group, key=lambda app:app.node.hostname)
366 # Obtain all IPs in numeric format
367 # (which means faster distance computations)
369 dep._ip = socket.gethostbyname(dep.node.hostname)
370 dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
373 # NOTE: the plan is an iterator
376 lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
380 # Re-sign private key
382 tempprk, temppuk, tmppass = self._make_temp_private_key()
388 for slave, master in plan:
389 slave.set_master(master)
390 slave.install_keys(tempprk, temppuk, tmppass)
392 # We don't need the user's passphrase anymore
393 self.sliceSSHKeyPass = None
395 def _make_temp_private_key(self):
396 # Get the user's key's passphrase
397 if not self.sliceSSHKeyPass:
398 if 'SSH_ASKPASS' in os.environ:
399 proc = subprocess.Popen(
400 [ os.environ['SSH_ASKPASS'],
401 "Please type the passphrase for the %s SSH identity file. "
402 "The passphrase will be used to re-cipher the identity file with "
403 "a random 256-bit key for automated chain deployment on the "
404 "%s PlanetLab slice" % (
405 os.path.basename(self.sliceSSHKey),
408 stdin = open("/dev/null"),
409 stdout = subprocess.PIPE,
410 stderr = subprocess.PIPE)
411 out,err = proc.communicate()
412 self.sliceSSHKeyPass = out.strip()
414 if not self.sliceSSHKeyPass:
417 # Create temporary key files
418 prk = tempfile.NamedTemporaryFile(
419 dir = self.root_directory,
420 prefix = "pl_deploy_tmpk_",
423 puk = tempfile.NamedTemporaryFile(
424 dir = self.root_directory,
425 prefix = "pl_deploy_tmpk_",
428 # Create secure 256-bits temporary passphrase
429 passphrase = ''.join(map(chr,[rng.randint(0,255)
430 for rng in (random.SystemRandom(),)
431 for i in xrange(32)] )).encode("hex")
434 oprk = open(self.sliceSSHKey, "rb")
435 opuk = open(self.sliceSSHKey+".pub", "rb")
436 shutil.copymode(oprk.name, prk.name)
437 shutil.copymode(opuk.name, puk.name)
438 shutil.copyfileobj(oprk, prk)
439 shutil.copyfileobj(opuk, puk)
445 # A descriptive comment
446 comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
449 proc = subprocess.Popen(
452 "-P", self.sliceSSHKeyPass,
455 stdout = subprocess.PIPE,
456 stderr = subprocess.PIPE,
457 stdin = subprocess.PIPE
459 out, err = proc.communicate()
462 raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
468 # Change comment on public key
469 puklines = puk.readlines()
470 puklines[0] = puklines[0].split(' ')
471 puklines[0][-1] = comment+'\n'
472 puklines[0] = ' '.join(puklines[0])
475 puk.writelines(puklines)
479 return prk, puk, passphrase
481 def set(self, guid, name, value, time = TIME_NOW):
482 super(TestbedController, self).set(guid, name, value, time)
483 # TODO: take on account schedule time for the task
484 element = self._elements[guid]
487 setattr(element, name, value)
489 # We ignore these errors while recovering.
490 # Some attributes are immutable, and setting
491 # them is necessary (to recover the state), but
492 # some are not (they throw an exception).
493 if not self.recovering:
496 if hasattr(element, 'refresh'):
497 # invoke attribute refresh hook
500 def get(self, guid, name, time = TIME_NOW):
501 value = super(TestbedController, self).get(guid, name, time)
502 # TODO: take on account schedule time for the task
503 factory_id = self._create[guid]
504 factory = self._factories[factory_id]
505 element = self._elements.get(guid)
507 return getattr(element, name)
508 except (KeyError, AttributeError):
511 def get_address(self, guid, index, attribute='Address'):
515 iface = self._elements.get(guid)
516 if iface and index == 0:
517 if attribute == 'Address':
519 elif attribute == 'NetPrefix':
520 return iface.netprefix
521 elif attribute == 'Broadcast':
522 return iface.broadcast
524 # if all else fails, query box
525 return super(TestbedController, self).get_address(guid, index, attribute)
527 def action(self, time, guid, action):
528 raise NotImplementedError
531 for trace in self._traces.itervalues():
534 def invokeif(action, testbed, guid):
535 element = self._elements[guid]
536 if hasattr(element, action):
537 getattr(element, action)()
539 self._do_in_factory_order(
540 functools.partial(invokeif, 'cleanup'),
541 metadata.shutdown_order)
543 self._do_in_factory_order(
544 functools.partial(invokeif, 'destroy'),
545 metadata.shutdown_order)
547 self._elements.clear()
550 def trace(self, guid, trace_id, attribute='value'):
551 elem = self._elements[guid]
553 if attribute == 'value':
554 path = elem.sync_trace(self.home_directory, trace_id)
561 elif attribute == 'path':
562 content = elem.remote_trace_path(trace_id)
563 elif attribute == 'name':
564 content = elem.remote_trace_name(trace_id)
569 def follow_trace(self, trace_id, trace):
570 self._traces[trace_id] = trace
574 # An internal flag, so we know to behave differently in
575 # a few corner cases.
576 self.recovering = True
578 # Create and connect do not perform any real tasks against
579 # the nodes, it only sets up the object hierarchy,
580 # so we can run them normally
582 self.do_connect_init()
583 self.do_connect_compl()
585 # Manually recover nodes, to mark dependencies installed
586 # and clean up mutable attributes
587 self._do_in_factory_order(
588 lambda self, guid : self._elements[guid].recover(),
593 # Assign nodes - since we're working off exeucte XML, nodes
594 # have specific hostnames assigned and we don't need to do
595 # real assignment, only find out node ids and check liveliness
596 self.do_resource_discovery(recover = True)
599 # Pre/post configure, however, tends to set up tunnels
600 # Execute configuration steps only for those object
601 # kinds that do not have side effects
603 # Do the ones without side effects,
604 # including nodes that need to set up home
605 # folders and all that
606 self._do_in_factory_order(
607 "preconfigure_function",
610 Parallel(metadata.NODE),
614 # Tunnels require a home path that is configured
615 # at this step. Since we cannot run the step itself,
616 # we need to inject this homepath ourselves
617 for guid, element in self._elements.iteritems():
618 if isinstance(element, self._interfaces.TunIface):
619 element._home_path = "tun-%s" % (guid,)
621 # Manually recover tunnels, applications and
622 # netpipes, negating the side effects
623 self._do_in_factory_order(
624 lambda self, guid : self._elements[guid].recover(),
626 Parallel(metadata.TAPIFACE),
627 Parallel(metadata.TUNIFACE),
629 Parallel(metadata.NEPIDEPENDENCY),
630 Parallel(metadata.NS3DEPENDENCY),
631 Parallel(metadata.DEPENDENCY),
632 Parallel(metadata.APPLICATION),
635 # Tunnels are not harmed by configuration after
636 # recovery, and some attributes get set this way
637 # like external_iface
638 self._do_in_factory_order(
639 "preconfigure_function",
641 Parallel(metadata.TAPIFACE),
642 Parallel(metadata.TUNIFACE),
645 # Post-do the ones without side effects
646 self._do_in_factory_order(
647 "configure_function",
650 Parallel(metadata.NODE),
652 Parallel(metadata.TAPIFACE),
653 Parallel(metadata.TUNIFACE),
656 # There are no required prestart steps
657 # to call upon recovery, so we're done
659 self.recovering = True
661 def _make_generic(self, parameters, kind):
662 app = kind(self.plapi)
663 app.testbed = weakref.ref(self)
665 # Note: there is 1-to-1 correspondence between attribute names
666 # If that changes, this has to change as well
667 for attr,val in parameters.iteritems():
669 setattr(app, attr, val)
671 # We ignore these errors while recovering.
672 # Some attributes are immutable, and setting
673 # them is necessary (to recover the state), but
674 # some are not (they throw an exception).
675 if not self.recovering:
680 def _make_node(self, parameters):
681 node = self._make_generic(parameters, self._node.Node)
682 node.enable_cleanup = self.dedicatedSlice
685 def _make_node_iface(self, parameters):
686 return self._make_generic(parameters, self._interfaces.NodeIface)
688 def _make_tun_iface(self, parameters):
689 return self._make_generic(parameters, self._interfaces.TunIface)
691 def _make_tap_iface(self, parameters):
692 return self._make_generic(parameters, self._interfaces.TapIface)
694 def _make_netpipe(self, parameters):
695 return self._make_generic(parameters, self._interfaces.NetPipe)
697 def _make_internet(self, parameters):
698 return self._make_generic(parameters, self._interfaces.Internet)
700 def _make_application(self, parameters):
701 return self._make_generic(parameters, self._app.Application)
703 def _make_dependency(self, parameters):
704 return self._make_generic(parameters, self._app.Dependency)
706 def _make_nepi_dependency(self, parameters):
707 return self._make_generic(parameters, self._app.NepiDependency)
709 def _make_ns3_dependency(self, parameters):
710 return self._make_generic(parameters, self._app.NS3Dependency)
712 def _make_tun_filter(self, parameters):
713 return self._make_generic(parameters, self._interfaces.TunFilter)
715 def _make_class_queue_filter(self, parameters):
716 return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
718 def _make_tos_queue_filter(self, parameters):
719 return self._make_generic(parameters, self._interfaces.ToSQueueFilter)