dcb5023d809a9b54a8266e6f109294fca2083e2c
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28 import weakref
29
30 class TempKeyError(Exception):
31     pass
32
33 class TestbedController(testbed_impl.TestbedController):
34     def __init__(self):
35         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
36         self._home_directory = None
37         self.slicename = None
38         self._traces = dict()
39
40         import node, interfaces, application, multicast
41         self._node = node
42         self._interfaces = interfaces
43         self._app = application
44         self._multicast = multicast
45         
46         self._blacklist = set()
47         self._just_provisioned = set()
48         
49         self._load_blacklist()
50         
51         self._logger = logging.getLogger('nepi.testbeds.planetlab')
52         
53         self.recovering = False
54
55     @property
56     def home_directory(self):
57         return self._home_directory
58
59     @property
60     def plapi(self):
61         if not hasattr(self, '_plapi'):
62             import plcapi
63
64             if self.authUser:
65                 self._plapi = plcapi.PLCAPI(
66                     username = self.authUser,
67                     password = self.authString,
68                     hostname = self.plcHost,
69                     urlpattern = self.plcUrl
70                     )
71             else:
72                 # anonymous access - may not be enough for much
73                 self._plapi = plcapi.PLCAPI()
74         return self._plapi
75
76     @property
77     def slice_id(self):
78         if not hasattr(self, '_slice_id'):
79             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
80             if slices:
81                 self._slice_id = slices[0]['slice_id']
82             else:
83                 # If it wasn't found, don't remember this failure, keep trying
84                 return None
85         return self._slice_id
86     
87     @property
88     def vsys_vnet(self):
89         if not hasattr(self, '_vsys_vnet'):
90             slicetags = self.plapi.GetSliceTags(
91                 name = self.slicename,
92                 tagname = 'vsys_vnet',
93                 fields=('value',))
94             if slicetags:
95                 self._vsys_vnet = slicetags[0]['value']
96             else:
97                 # If it wasn't found, don't remember this failure, keep trying
98                 return None
99         return self._vsys_vnet
100     
101     def _load_blacklist(self):
102         blpath = environ.homepath('plblacklist')
103         
104         try:
105             bl = open(blpath, "r")
106         except:
107             self._blacklist = set()
108             return
109             
110         try:
111             self._blacklist = set(
112                 map(int,
113                     map(str.strip, bl.readlines())
114                 )
115             )
116         finally:
117             bl.close()
118     
119     def _save_blacklist(self):
120         blpath = environ.homepath('plblacklist')
121         bl = open(blpath, "w")
122         try:
123             bl.writelines(
124                 map('%s\n'.__mod__, self._blacklist))
125         finally:
126             bl.close()
127     
128     def do_setup(self):
129         self._home_directory = self._attributes.\
130             get_attribute_value("homeDirectory")
131         self.slicename = self._attributes.\
132             get_attribute_value("slice")
133         self.authUser = self._attributes.\
134             get_attribute_value("authUser")
135         self.authString = self._attributes.\
136             get_attribute_value("authPass")
137         self.sliceSSHKey = self._attributes.\
138             get_attribute_value("sliceSSHKey")
139         self.sliceSSHKeyPass = None
140         self.plcHost = self._attributes.\
141             get_attribute_value("plcHost")
142         self.plcUrl = self._attributes.\
143             get_attribute_value("plcUrl")
144         self.logLevel = self._attributes.\
145             get_attribute_value("plLogLevel")
146         self.tapPortBase = self._attributes.\
147             get_attribute_value("tapPortBase")
148         self.p2pDeployment = self._attributes.\
149             get_attribute_value("p2pDeployment")
150         self.dedicatedSlice = self._attributes.\
151             get_attribute_value("dedicatedSlice")
152         
153         if not self.slicename:
154             raise RuntimeError, "Slice not set"
155         if not self.authUser:
156             raise RuntimeError, "PlanetLab account username not set"
157         if not self.authString:
158             raise RuntimeError, "PlanetLab account passphrase not set"
159         if not self.sliceSSHKey:
160             raise RuntimeError, "PlanetLab account key not specified"
161         if not os.path.exists(self.sliceSSHKey):
162             raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
163         
164         self._logger.setLevel(getattr(logging,self.logLevel))
165         
166         super(TestbedController, self).do_setup()
167
168     def do_post_asynclaunch(self, guid):
169         # Dependencies were launched asynchronously,
170         # so wait for them
171         dep = self._elements[guid]
172         if isinstance(dep, self._app.Dependency):
173             dep.async_setup_wait()
174     
175     # Two-phase configuration for asynchronous launch
176     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
177     do_poststep_configure = staticmethod(do_post_asynclaunch)
178
179     def do_preconfigure(self):
180         while True:
181             # Perform resource discovery if we don't have
182             # specific resources assigned yet
183             self.do_resource_discovery()
184
185             # Create PlanetLab slivers
186             self.do_provisioning()
187             
188             try:
189                 # Wait for provisioning
190                 self.do_wait_nodes()
191                 
192                 # Okkey...
193                 break
194             except self._node.UnresponsiveNodeError:
195                 # Oh... retry...
196                 pass
197         
198         if self.p2pDeployment:
199             # Plan application deployment
200             self.do_spanning_deployment_plan()
201
202         # Configure elements per XML data
203         super(TestbedController, self).do_preconfigure()
204
205     def do_resource_discovery(self, recover = False):
206         to_provision = self._to_provision = set()
207         
208         reserved = set(self._blacklist)
209         for guid, node in self._elements.iteritems():
210             if isinstance(node, self._node.Node) and node._node_id is not None:
211                 reserved.add(node._node_id)
212         
213         # Initial algo:
214         #   look for perfectly defined nodes
215         #   (ie: those with only one candidate)
216         for guid, node in self._elements.iteritems():
217             if isinstance(node, self._node.Node) and node._node_id is None:
218                 # Try existing nodes first
219                 # If we have only one candidate, simply use it
220                 candidates = node.find_candidates(
221                     filter_slice_id = self.slice_id)
222                 candidates -= reserved
223                 if len(candidates) == 1:
224                     node_id = iter(candidates).next()
225                     node.assign_node_id(node_id)
226                     reserved.add(node_id)
227                 elif not candidates:
228                     # Try again including unassigned nodes
229                     candidates = node.find_candidates()
230                     candidates -= reserved
231                     if len(candidates) > 1:
232                         continue
233                     if len(candidates) == 1:
234                         node_id = iter(candidates).next()
235                         node.assign_node_id(node_id)
236                         to_provision.add(node_id)
237                         reserved.add(node_id)
238                     elif not candidates:
239                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
240                             node.make_filter_description())
241         
242         # Now do the backtracking search for a suitable solution
243         # First with existing slice nodes
244         reqs = []
245         nodes = []
246         for guid, node in self._elements.iteritems():
247             if isinstance(node, self._node.Node) and node._node_id is None:
248                 # Try existing nodes first
249                 # If we have only one candidate, simply use it
250                 candidates = node.find_candidates(
251                     filter_slice_id = self.slice_id)
252                 candidates -= reserved
253                 reqs.append(candidates)
254                 nodes.append(node)
255         
256         if nodes and reqs:
257             if recover:
258                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
259
260             def pickbest(fullset, nreq, node=nodes[0]):
261                 if len(fullset) > nreq:
262                     fullset = zip(node.rate_nodes(fullset),fullset)
263                     fullset.sort(reverse=True)
264                     del fullset[nreq:]
265                     return set(map(operator.itemgetter(1),fullset))
266                 else:
267                     return fullset
268             
269             try:
270                 solution = resourcealloc.alloc(reqs, sample=pickbest)
271             except resourcealloc.ResourceAllocationError:
272                 # Failed, try again with all nodes
273                 reqs = []
274                 for node in nodes:
275                     candidates = node.find_candidates()
276                     candidates -= reserved
277                     reqs.append(candidates)
278                 
279                 solution = resourcealloc.alloc(reqs, sample=pickbest)
280                 to_provision.update(solution)
281             
282             # Do assign nodes
283             for node, node_id in zip(nodes, solution):
284                 node.assign_node_id(node_id)
285
286     def do_provisioning(self):
287         if self._to_provision:
288             # Add new nodes to the slice
289             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
290             new_nodes = list(set(cur_nodes) | self._to_provision)
291             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
292
293         # cleanup
294         self._just_provisioned = self._to_provision
295         del self._to_provision
296     
297     def do_wait_nodes(self):
298         for guid, node in self._elements.iteritems():
299             if isinstance(node, self._node.Node):
300                 # Just inject configuration stuff
301                 node.home_path = "nepi-node-%s" % (guid,)
302                 node.ident_path = self.sliceSSHKey
303                 node.slicename = self.slicename
304             
305                 # Show the magic
306                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
307             
308         try:
309             for guid, node in self._elements.iteritems():
310                 if isinstance(node, self._node.Node):
311                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
312                     
313                     node.wait_provisioning(
314                         (20*60 if node._node_id in self._just_provisioned else 60)
315                     )
316                     
317                     self._logger.info("READY Node %s at %s", guid, node.hostname)
318                     
319                     # Prepare dependency installer now
320                     node.prepare_dependencies()
321         except self._node.UnresponsiveNodeError:
322             # Uh... 
323             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
324             
325             # Mark all dead nodes (which are unresponsive) on the blacklist
326             # and re-raise
327             for guid, node in self._elements.iteritems():
328                 if isinstance(node, self._node.Node):
329                     if not node.is_alive():
330                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
331                         self._blacklist.add(node._node_id)
332                         node.unassign_node()
333             
334             try:
335                 self._save_blacklist()
336             except:
337                 # not important...
338                 import traceback
339                 traceback.print_exc()
340             
341             raise
342     
343     def do_spanning_deployment_plan(self):
344         # Create application groups by collecting all applications
345         # based on their hash - the hash should contain everything that
346         # defines them and the platform they're built
347         
348         def dephash(app):
349             return (
350                 frozenset((app.depends or "").split(' ')),
351                 frozenset((app.sources or "").split(' ')),
352                 app.build,
353                 app.install,
354                 app.node.architecture,
355                 app.node.operatingSystem,
356                 app.node.pl_distro,
357             )
358         
359         depgroups = collections.defaultdict(list)
360         
361         for element in self._elements.itervalues():
362             if isinstance(element, self._app.Dependency):
363                 depgroups[dephash(element)].append(element)
364             elif isinstance(element, self._node.Node):
365                 deps = element._yum_dependencies
366                 if deps:
367                     depgroups[dephash(deps)].append(deps)
368         
369         # Set up spanning deployment for those applications that
370         # have been deployed in several nodes.
371         for dh, group in depgroups.iteritems():
372             if len(group) > 1:
373                 # Pick root (deterministically)
374                 root = min(group, key=lambda app:app.node.hostname)
375                 
376                 # Obtain all IPs in numeric format
377                 # (which means faster distance computations)
378                 for dep in group:
379                     dep._ip = socket.gethostbyname(dep.node.hostname)
380                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
381                 
382                 # Compute plan
383                 # NOTE: the plan is an iterator
384                 plan = mst.mst(
385                     group,
386                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
387                     root = root,
388                     maxbranching = 2)
389                 
390                 # Re-sign private key
391                 try:
392                     tempprk, temppuk, tmppass = self._make_temp_private_key()
393                 except TempKeyError:
394                     continue
395                 
396                 # Set up slaves
397                 plan = list(plan)
398                 for slave, master in plan:
399                     slave.set_master(master)
400                     slave.install_keys(tempprk, temppuk, tmppass)
401                     
402         # We don't need the user's passphrase anymore
403         self.sliceSSHKeyPass = None
404     
405     def _make_temp_private_key(self):
406         # Get the user's key's passphrase
407         if not self.sliceSSHKeyPass:
408             if 'SSH_ASKPASS' in os.environ:
409                 proc = subprocess.Popen(
410                     [ os.environ['SSH_ASKPASS'],
411                       "Please type the passphrase for the %s SSH identity file. "
412                       "The passphrase will be used to re-cipher the identity file with "
413                       "a random 256-bit key for automated chain deployment on the "
414                       "%s PlanetLab slice" % ( 
415                         os.path.basename(self.sliceSSHKey), 
416                         self.slicename
417                     ) ],
418                     stdin = open("/dev/null"),
419                     stdout = subprocess.PIPE,
420                     stderr = subprocess.PIPE)
421                 out,err = proc.communicate()
422                 self.sliceSSHKeyPass = out.strip()
423         
424         if not self.sliceSSHKeyPass:
425             raise TempKeyError
426         
427         # Create temporary key files
428         prk = tempfile.NamedTemporaryFile(
429             dir = self.root_directory,
430             prefix = "pl_deploy_tmpk_",
431             suffix = "")
432
433         puk = tempfile.NamedTemporaryFile(
434             dir = self.root_directory,
435             prefix = "pl_deploy_tmpk_",
436             suffix = ".pub")
437             
438         # Create secure 256-bits temporary passphrase
439         passphrase = ''.join(map(chr,[rng.randint(0,255) 
440                                       for rng in (random.SystemRandom(),)
441                                       for i in xrange(32)] )).encode("hex")
442                 
443         # Copy keys
444         oprk = open(self.sliceSSHKey, "rb")
445         opuk = open(self.sliceSSHKey+".pub", "rb")
446         shutil.copymode(oprk.name, prk.name)
447         shutil.copymode(opuk.name, puk.name)
448         shutil.copyfileobj(oprk, prk)
449         shutil.copyfileobj(opuk, puk)
450         prk.flush()
451         puk.flush()
452         oprk.close()
453         opuk.close()
454         
455         # A descriptive comment
456         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
457         
458         # Recipher keys
459         proc = subprocess.Popen(
460             ["ssh-keygen", "-p",
461              "-f", prk.name,
462              "-P", self.sliceSSHKeyPass,
463              "-N", passphrase,
464              "-C", comment ],
465             stdout = subprocess.PIPE,
466             stderr = subprocess.PIPE,
467             stdin = subprocess.PIPE
468         )
469         out, err = proc.communicate()
470         
471         if err:
472             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
473                 out, err)
474         
475         prk.seek(0)
476         puk.seek(0)
477         
478         # Change comment on public key
479         puklines = puk.readlines()
480         puklines[0] = puklines[0].split(' ')
481         puklines[0][-1] = comment+'\n'
482         puklines[0] = ' '.join(puklines[0])
483         puk.seek(0)
484         puk.truncate()
485         puk.writelines(puklines)
486         del puklines
487         puk.flush()
488         
489         return prk, puk, passphrase
490     
491     def set(self, guid, name, value, time = TIME_NOW):
492         super(TestbedController, self).set(guid, name, value, time)
493         # TODO: take on account schedule time for the task
494         element = self._elements[guid]
495         if element:
496             try:
497                 setattr(element, name, value)
498             except:
499                 # We ignore these errors while recovering.
500                 # Some attributes are immutable, and setting
501                 # them is necessary (to recover the state), but
502                 # some are not (they throw an exception).
503                 if not self.recovering:
504                     raise
505
506             if hasattr(element, 'refresh'):
507                 # invoke attribute refresh hook
508                 element.refresh()
509
510     def get(self, guid, name, time = TIME_NOW):
511         value = super(TestbedController, self).get(guid, name, time)
512         # TODO: take on account schedule time for the task
513         factory_id = self._create[guid]
514         factory = self._factories[factory_id]
515         element = self._elements.get(guid)
516         try:
517             return getattr(element, name)
518         except (KeyError, AttributeError):
519             return value
520
521     def get_address(self, guid, index, attribute='Address'):
522         index = int(index)
523
524         # try the real stuff
525         iface = self._elements.get(guid)
526         if iface and index == 0:
527             if attribute == 'Address':
528                 return iface.address
529             elif attribute == 'NetPrefix':
530                 return iface.netprefix
531             elif attribute == 'Broadcast':
532                 return iface.broadcast
533
534         # if all else fails, query box
535         return super(TestbedController, self).get_address(guid, index, attribute)
536
537     def action(self, time, guid, action):
538         raise NotImplementedError
539
540     def shutdown(self):
541         for trace in self._traces.itervalues():
542             trace.close()
543         
544         def invokeif(action, testbed, guid):
545             element = self._elements[guid]
546             if hasattr(element, action):
547                 getattr(element, action)()
548         
549         self._do_in_factory_order(
550             functools.partial(invokeif, 'cleanup'),
551             metadata.shutdown_order)
552
553         self._do_in_factory_order(
554             functools.partial(invokeif, 'destroy'),
555             metadata.shutdown_order)
556             
557         self._elements.clear()
558         self._traces.clear()
559
560     def trace(self, guid, trace_id, attribute='value'):
561         elem = self._elements[guid]
562
563         if attribute == 'value':
564             path = elem.sync_trace(self.home_directory, trace_id)
565             if path:
566                 fd = open(path, "r")
567                 content = fd.read()
568                 fd.close()
569             else:
570                 content = None
571         elif attribute == 'path':
572             content = elem.remote_trace_path(trace_id)
573         elif attribute == 'name':
574             content = elem.remote_trace_name(trace_id)
575         else:
576             content = None
577         return content
578
579     def follow_trace(self, trace_id, trace):
580         self._traces[trace_id] = trace
581
582     def recover(self):
583         try:
584             # An internal flag, so we know to behave differently in
585             # a few corner cases.
586             self.recovering = True
587             
588             # Create and connect do not perform any real tasks against
589             # the nodes, it only sets up the object hierarchy,
590             # so we can run them normally
591             self.do_create()
592             self.do_connect_init()
593             self.do_connect_compl()
594             
595             # Manually recover nodes, to mark dependencies installed
596             # and clean up mutable attributes
597             self._do_in_factory_order(
598                 lambda self, guid : self._elements[guid].recover(), 
599                 [
600                     metadata.NODE,
601                 ])
602             
603             # Assign nodes - since we're working off exeucte XML, nodes
604             # have specific hostnames assigned and we don't need to do
605             # real assignment, only find out node ids and check liveliness
606             self.do_resource_discovery(recover = True)
607             self.do_wait_nodes()
608             
609             # Pre/post configure, however, tends to set up tunnels
610             # Execute configuration steps only for those object
611             # kinds that do not have side effects
612             
613             # Do the ones without side effects,
614             # including nodes that need to set up home 
615             # folders and all that
616             self._do_in_factory_order(
617                 "preconfigure_function", 
618                 [
619                     metadata.INTERNET,
620                     Parallel(metadata.NODE),
621                     metadata.NODEIFACE,
622                 ])
623             
624             # Tunnels require a home path that is configured
625             # at this step. Since we cannot run the step itself,
626             # we need to inject this homepath ourselves
627             for guid, element in self._elements.iteritems():
628                 if isinstance(element, self._interfaces.TunIface):
629                     element._home_path = "tun-%s" % (guid,)
630             
631             # Manually recover tunnels, applications and
632             # netpipes, negating the side effects
633             self._do_in_factory_order(
634                 lambda self, guid : self._elements[guid].recover(), 
635                 [
636                     Parallel(metadata.TAPIFACE),
637                     Parallel(metadata.TUNIFACE),
638                     metadata.NETPIPE,
639                     Parallel(metadata.NEPIDEPENDENCY),
640                     Parallel(metadata.NS3DEPENDENCY),
641                     Parallel(metadata.DEPENDENCY),
642                     Parallel(metadata.APPLICATION),
643                 ])
644
645             # Tunnels are not harmed by configuration after
646             # recovery, and some attributes get set this way
647             # like external_iface
648             self._do_in_factory_order(
649                 "preconfigure_function", 
650                 [
651                     Parallel(metadata.TAPIFACE),
652                     Parallel(metadata.TUNIFACE),
653                 ])
654
655             # Post-do the ones without side effects
656             self._do_in_factory_order(
657                 "configure_function", 
658                 [
659                     metadata.INTERNET,
660                     Parallel(metadata.NODE),
661                     metadata.NODEIFACE,
662                     Parallel(metadata.TAPIFACE),
663                     Parallel(metadata.TUNIFACE),
664                 ])
665             
666             # There are no required prestart steps
667             # to call upon recovery, so we're done
668         finally:
669             self.recovering = True
670     
671     def _make_generic(self, parameters, kind):
672         app = kind(self.plapi)
673         app.testbed = weakref.ref(self)
674
675         # Note: there is 1-to-1 correspondence between attribute names
676         #   If that changes, this has to change as well
677         for attr,val in parameters.iteritems():
678             try:
679                 setattr(app, attr, val)
680             except:
681                 # We ignore these errors while recovering.
682                 # Some attributes are immutable, and setting
683                 # them is necessary (to recover the state), but
684                 # some are not (they throw an exception).
685                 if not self.recovering:
686                     raise
687
688         return app
689
690     def _make_node(self, parameters):
691         node = self._make_generic(parameters, self._node.Node)
692         node.enable_cleanup = self.dedicatedSlice
693         return node
694
695     def _make_node_iface(self, parameters):
696         return self._make_generic(parameters, self._interfaces.NodeIface)
697
698     def _make_tun_iface(self, parameters):
699         return self._make_generic(parameters, self._interfaces.TunIface)
700
701     def _make_tap_iface(self, parameters):
702         return self._make_generic(parameters, self._interfaces.TapIface)
703
704     def _make_netpipe(self, parameters):
705         return self._make_generic(parameters, self._interfaces.NetPipe)
706
707     def _make_internet(self, parameters):
708         return self._make_generic(parameters, self._interfaces.Internet)
709
710     def _make_application(self, parameters):
711         return self._make_generic(parameters, self._app.Application)
712
713     def _make_dependency(self, parameters):
714         return self._make_generic(parameters, self._app.Dependency)
715
716     def _make_nepi_dependency(self, parameters):
717         return self._make_generic(parameters, self._app.NepiDependency)
718
719     def _make_ns3_dependency(self, parameters):
720         return self._make_generic(parameters, self._app.NS3Dependency)
721
722     def _make_tun_filter(self, parameters):
723         return self._make_generic(parameters, self._interfaces.TunFilter)
724
725     def _make_class_queue_filter(self, parameters):
726         return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
727
728     def _make_tos_queue_filter(self, parameters):
729         return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
730
731     def _make_multicast_forwarder(self, parameters):
732         return self._make_generic(parameters, self._multicast.MulticastForwarder)
733
734     def _make_multicast_announcer(self, parameters):
735         return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
736
737     def _make_multicast_router(self, parameters):
738         return self._make_generic(parameters, self._multicast.MulticastRouter)
739
740