Merge with HEAD, close aly's branch.
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28 import weakref
29
30 class TempKeyError(Exception):
31     pass
32
33 class TestbedController(testbed_impl.TestbedController):
34     def __init__(self):
35         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
36         self._home_directory = None
37         self.slicename = None
38         self._traces = dict()
39
40         import node, interfaces, application, multicast
41         self._node = node
42         self._interfaces = interfaces
43         self._app = application
44         self._multicast = multicast
45         
46         self._blacklist = set()
47         self._just_provisioned = set()
48         
49         self._load_blacklist()
50         
51         self._logger = logging.getLogger('nepi.testbeds.planetlab')
52         
53         self.recovering = False
54
55     @property
56     def home_directory(self):
57         return self._home_directory
58
59     @property
60     def plapi(self):
61         if not hasattr(self, '_plapi'):
62             import plcapi
63
64             if self.authUser:
65                 self._plapi = plcapi.PLCAPI(
66                     username = self.authUser,
67                     password = self.authString,
68                     hostname = self.plcHost,
69                     urlpattern = self.plcUrl
70                     )
71             else:
72                 # anonymous access - may not be enough for much
73                 self._plapi = plcapi.PLCAPI()
74         return self._plapi
75
76     @property
77     def slice_id(self):
78         if not hasattr(self, '_slice_id'):
79             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
80             if slices:
81                 self._slice_id = slices[0]['slice_id']
82             else:
83                 # If it wasn't found, don't remember this failure, keep trying
84                 return None
85         return self._slice_id
86     
87     @property
88     def vsys_vnet(self):
89         if not hasattr(self, '_vsys_vnet'):
90             slicetags = self.plapi.GetSliceTags(
91                 name = self.slicename,
92                 tagname = 'vsys_vnet',
93                 fields=('value',))
94             if slicetags:
95                 self._vsys_vnet = slicetags[0]['value']
96             else:
97                 # If it wasn't found, don't remember this failure, keep trying
98                 return None
99         return self._vsys_vnet
100     
101     def _load_blacklist(self):
102         blpath = environ.homepath('plblacklist')
103         
104         try:
105             bl = open(blpath, "r")
106         except:
107             self._blacklist = set()
108             return
109             
110         try:
111             self._blacklist = set(
112                 map(int,
113                     map(str.strip, bl.readlines())
114                 )
115             )
116         finally:
117             bl.close()
118     
119     def _save_blacklist(self):
120         blpath = environ.homepath('plblacklist')
121         bl = open(blpath, "w")
122         try:
123             bl.writelines(
124                 map('%s\n'.__mod__, self._blacklist))
125         finally:
126             bl.close()
127     
128     def do_setup(self):
129         self._home_directory = self._attributes.\
130             get_attribute_value("homeDirectory")
131         self.slicename = self._attributes.\
132             get_attribute_value("slice")
133         self.authUser = self._attributes.\
134             get_attribute_value("authUser")
135         self.authString = self._attributes.\
136             get_attribute_value("authPass")
137         self.sliceSSHKey = self._attributes.\
138             get_attribute_value("sliceSSHKey")
139         self.sliceSSHKeyPass = None
140         self.plcHost = self._attributes.\
141             get_attribute_value("plcHost")
142         self.plcUrl = self._attributes.\
143             get_attribute_value("plcUrl")
144         self.logLevel = self._attributes.\
145             get_attribute_value("plLogLevel")
146         self.tapPortBase = self._attributes.\
147             get_attribute_value("tapPortBase")
148         self.p2pDeployment = self._attributes.\
149             get_attribute_value("p2pDeployment")
150         self.dedicatedSlice = self._attributes.\
151             get_attribute_value("dedicatedSlice")
152         
153         if not self.slicename:
154             raise RuntimeError, "Slice not set"
155         if not self.authUser:
156             raise RuntimeError, "PlanetLab account username not set"
157         if not self.authString:
158             raise RuntimeError, "PlanetLab account passphrase not set"
159         if not self.sliceSSHKey:
160             raise RuntimeError, "PlanetLab account key not specified"
161         if not os.path.exists(self.sliceSSHKey):
162             raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
163         
164         self._logger.setLevel(getattr(logging,self.logLevel))
165         
166         super(TestbedController, self).do_setup()
167
168     def do_post_asynclaunch(self, guid):
169         # Dependencies were launched asynchronously,
170         # so wait for them
171         dep = self._elements[guid]
172         if isinstance(dep, self._app.Dependency):
173             dep.async_setup_wait()
174     
175     # Two-phase configuration for asynchronous launch
176     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
177     do_poststep_configure = staticmethod(do_post_asynclaunch)
178
179     def do_preconfigure(self):
180         while True:
181             # Perform resource discovery if we don't have
182             # specific resources assigned yet
183             self.do_resource_discovery()
184
185             # Create PlanetLab slivers
186             self.do_provisioning()
187             
188             try:
189                 # Wait for provisioning
190                 self.do_wait_nodes()
191                 
192                 # Okkey...
193                 break
194             except self._node.UnresponsiveNodeError:
195                 # Oh... retry...
196                 pass
197         
198         if self.p2pDeployment:
199             # Plan application deployment
200             self.do_spanning_deployment_plan()
201
202         # Configure elements per XML data
203         super(TestbedController, self).do_preconfigure()
204
205     def do_resource_discovery(self, recover = False):
206         to_provision = self._to_provision = set()
207         
208         reserved = set(self._blacklist)
209         for guid, node in self._elements.iteritems():
210             if isinstance(node, self._node.Node) and node._node_id is not None:
211                 reserved.add(node._node_id)
212         
213         # Initial algo:
214         #   look for perfectly defined nodes
215         #   (ie: those with only one candidate)
216         for guid, node in self._elements.iteritems():
217             if isinstance(node, self._node.Node) and node._node_id is None:
218                 # Try existing nodes first
219                 # If we have only one candidate, simply use it
220                 candidates = node.find_candidates(
221                     filter_slice_id = self.slice_id)
222                 candidates -= reserved
223                 if len(candidates) == 1:
224                     node_id = iter(candidates).next()
225                     node.assign_node_id(node_id)
226                     reserved.add(node_id)
227                 elif not candidates:
228                     # Try again including unassigned nodes
229                     candidates = node.find_candidates()
230                     candidates -= reserved
231                     if len(candidates) > 1:
232                         continue
233                     if len(candidates) == 1:
234                         node_id = iter(candidates).next()
235                         node.assign_node_id(node_id)
236                         to_provision.add(node_id)
237                         reserved.add(node_id)
238                     elif not candidates:
239                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
240                             node.make_filter_description())
241         
242         # Now do the backtracking search for a suitable solution
243         # First with existing slice nodes
244         reqs = []
245         nodes = []
246         for guid, node in self._elements.iteritems():
247             if isinstance(node, self._node.Node) and node._node_id is None:
248                 # Try existing nodes first
249                 # If we have only one candidate, simply use it
250                 candidates = node.find_candidates(
251                     filter_slice_id = self.slice_id)
252                 candidates -= reserved
253                 reqs.append(candidates)
254                 nodes.append(node)
255         
256         if nodes and reqs:
257             if recover:
258                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
259             
260             try:
261                 solution = resourcealloc.alloc(reqs)
262             except resourcealloc.ResourceAllocationError:
263                 # Failed, try again with all nodes
264                 reqs = []
265                 for node in nodes:
266                     candidates = node.find_candidates()
267                     candidates -= reserved
268                     reqs.append(candidates)
269                 
270                 solution = resourcealloc.alloc(reqs)
271                 to_provision.update(solution)
272             
273             # Do assign nodes
274             for node, node_id in zip(nodes, solution):
275                 node.assign_node_id(node_id)
276
277     def do_provisioning(self):
278         if self._to_provision:
279             # Add new nodes to the slice
280             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
281             new_nodes = list(set(cur_nodes) | self._to_provision)
282             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
283
284         # cleanup
285         self._just_provisioned = self._to_provision
286         del self._to_provision
287     
288     def do_wait_nodes(self):
289         for guid, node in self._elements.iteritems():
290             if isinstance(node, self._node.Node):
291                 # Just inject configuration stuff
292                 node.home_path = "nepi-node-%s" % (guid,)
293                 node.ident_path = self.sliceSSHKey
294                 node.slicename = self.slicename
295             
296                 # Show the magic
297                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
298             
299         try:
300             for guid, node in self._elements.iteritems():
301                 if isinstance(node, self._node.Node):
302                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
303                     
304                     node.wait_provisioning(
305                         (20*60 if node._node_id in self._just_provisioned else 60)
306                     )
307                     
308                     self._logger.info("READY Node %s at %s", guid, node.hostname)
309                     
310                     # Prepare dependency installer now
311                     node.prepare_dependencies()
312         except self._node.UnresponsiveNodeError:
313             # Uh... 
314             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
315             
316             # Mark all dead nodes (which are unresponsive) on the blacklist
317             # and re-raise
318             for guid, node in self._elements.iteritems():
319                 if isinstance(node, self._node.Node):
320                     if not node.is_alive():
321                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
322                         self._blacklist.add(node._node_id)
323                         node.unassign_node()
324             
325             try:
326                 self._save_blacklist()
327             except:
328                 # not important...
329                 import traceback
330                 traceback.print_exc()
331             
332             raise
333     
334     def do_spanning_deployment_plan(self):
335         # Create application groups by collecting all applications
336         # based on their hash - the hash should contain everything that
337         # defines them and the platform they're built
338         
339         def dephash(app):
340             return (
341                 frozenset((app.depends or "").split(' ')),
342                 frozenset((app.sources or "").split(' ')),
343                 app.build,
344                 app.install,
345                 app.node.architecture,
346                 app.node.operatingSystem,
347                 app.node.pl_distro,
348             )
349         
350         depgroups = collections.defaultdict(list)
351         
352         for element in self._elements.itervalues():
353             if isinstance(element, self._app.Dependency):
354                 depgroups[dephash(element)].append(element)
355             elif isinstance(element, self._node.Node):
356                 deps = element._yum_dependencies
357                 if deps:
358                     depgroups[dephash(deps)].append(deps)
359         
360         # Set up spanning deployment for those applications that
361         # have been deployed in several nodes.
362         for dh, group in depgroups.iteritems():
363             if len(group) > 1:
364                 # Pick root (deterministically)
365                 root = min(group, key=lambda app:app.node.hostname)
366                 
367                 # Obtain all IPs in numeric format
368                 # (which means faster distance computations)
369                 for dep in group:
370                     dep._ip = socket.gethostbyname(dep.node.hostname)
371                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
372                 
373                 # Compute plan
374                 # NOTE: the plan is an iterator
375                 plan = mst.mst(
376                     group,
377                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
378                     root = root,
379                     maxbranching = 2)
380                 
381                 # Re-sign private key
382                 try:
383                     tempprk, temppuk, tmppass = self._make_temp_private_key()
384                 except TempKeyError:
385                     continue
386                 
387                 # Set up slaves
388                 plan = list(plan)
389                 for slave, master in plan:
390                     slave.set_master(master)
391                     slave.install_keys(tempprk, temppuk, tmppass)
392                     
393         # We don't need the user's passphrase anymore
394         self.sliceSSHKeyPass = None
395     
396     def _make_temp_private_key(self):
397         # Get the user's key's passphrase
398         if not self.sliceSSHKeyPass:
399             if 'SSH_ASKPASS' in os.environ:
400                 proc = subprocess.Popen(
401                     [ os.environ['SSH_ASKPASS'],
402                       "Please type the passphrase for the %s SSH identity file. "
403                       "The passphrase will be used to re-cipher the identity file with "
404                       "a random 256-bit key for automated chain deployment on the "
405                       "%s PlanetLab slice" % ( 
406                         os.path.basename(self.sliceSSHKey), 
407                         self.slicename
408                     ) ],
409                     stdin = open("/dev/null"),
410                     stdout = subprocess.PIPE,
411                     stderr = subprocess.PIPE)
412                 out,err = proc.communicate()
413                 self.sliceSSHKeyPass = out.strip()
414         
415         if not self.sliceSSHKeyPass:
416             raise TempKeyError
417         
418         # Create temporary key files
419         prk = tempfile.NamedTemporaryFile(
420             dir = self.root_directory,
421             prefix = "pl_deploy_tmpk_",
422             suffix = "")
423
424         puk = tempfile.NamedTemporaryFile(
425             dir = self.root_directory,
426             prefix = "pl_deploy_tmpk_",
427             suffix = ".pub")
428             
429         # Create secure 256-bits temporary passphrase
430         passphrase = ''.join(map(chr,[rng.randint(0,255) 
431                                       for rng in (random.SystemRandom(),)
432                                       for i in xrange(32)] )).encode("hex")
433                 
434         # Copy keys
435         oprk = open(self.sliceSSHKey, "rb")
436         opuk = open(self.sliceSSHKey+".pub", "rb")
437         shutil.copymode(oprk.name, prk.name)
438         shutil.copymode(opuk.name, puk.name)
439         shutil.copyfileobj(oprk, prk)
440         shutil.copyfileobj(opuk, puk)
441         prk.flush()
442         puk.flush()
443         oprk.close()
444         opuk.close()
445         
446         # A descriptive comment
447         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
448         
449         # Recipher keys
450         proc = subprocess.Popen(
451             ["ssh-keygen", "-p",
452              "-f", prk.name,
453              "-P", self.sliceSSHKeyPass,
454              "-N", passphrase,
455              "-C", comment ],
456             stdout = subprocess.PIPE,
457             stderr = subprocess.PIPE,
458             stdin = subprocess.PIPE
459         )
460         out, err = proc.communicate()
461         
462         if err:
463             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
464                 out, err)
465         
466         prk.seek(0)
467         puk.seek(0)
468         
469         # Change comment on public key
470         puklines = puk.readlines()
471         puklines[0] = puklines[0].split(' ')
472         puklines[0][-1] = comment+'\n'
473         puklines[0] = ' '.join(puklines[0])
474         puk.seek(0)
475         puk.truncate()
476         puk.writelines(puklines)
477         del puklines
478         puk.flush()
479         
480         return prk, puk, passphrase
481     
482     def set(self, guid, name, value, time = TIME_NOW):
483         super(TestbedController, self).set(guid, name, value, time)
484         # TODO: take on account schedule time for the task
485         element = self._elements[guid]
486         if element:
487             try:
488                 setattr(element, name, value)
489             except:
490                 # We ignore these errors while recovering.
491                 # Some attributes are immutable, and setting
492                 # them is necessary (to recover the state), but
493                 # some are not (they throw an exception).
494                 if not self.recovering:
495                     raise
496
497             if hasattr(element, 'refresh'):
498                 # invoke attribute refresh hook
499                 element.refresh()
500
501     def get(self, guid, name, time = TIME_NOW):
502         value = super(TestbedController, self).get(guid, name, time)
503         # TODO: take on account schedule time for the task
504         factory_id = self._create[guid]
505         factory = self._factories[factory_id]
506         element = self._elements.get(guid)
507         try:
508             return getattr(element, name)
509         except (KeyError, AttributeError):
510             return value
511
512     def get_address(self, guid, index, attribute='Address'):
513         index = int(index)
514
515         # try the real stuff
516         iface = self._elements.get(guid)
517         if iface and index == 0:
518             if attribute == 'Address':
519                 return iface.address
520             elif attribute == 'NetPrefix':
521                 return iface.netprefix
522             elif attribute == 'Broadcast':
523                 return iface.broadcast
524
525         # if all else fails, query box
526         return super(TestbedController, self).get_address(guid, index, attribute)
527
528     def action(self, time, guid, action):
529         raise NotImplementedError
530
531     def shutdown(self):
532         for trace in self._traces.itervalues():
533             trace.close()
534         
535         def invokeif(action, testbed, guid):
536             element = self._elements[guid]
537             if hasattr(element, action):
538                 getattr(element, action)()
539         
540         self._do_in_factory_order(
541             functools.partial(invokeif, 'cleanup'),
542             metadata.shutdown_order)
543
544         self._do_in_factory_order(
545             functools.partial(invokeif, 'destroy'),
546             metadata.shutdown_order)
547             
548         self._elements.clear()
549         self._traces.clear()
550
551     def trace(self, guid, trace_id, attribute='value'):
552         elem = self._elements[guid]
553
554         if attribute == 'value':
555             path = elem.sync_trace(self.home_directory, trace_id)
556             if path:
557                 fd = open(path, "r")
558                 content = fd.read()
559                 fd.close()
560             else:
561                 content = None
562         elif attribute == 'path':
563             content = elem.remote_trace_path(trace_id)
564         elif attribute == 'name':
565             content = elem.remote_trace_name(trace_id)
566         else:
567             content = None
568         return content
569
570     def follow_trace(self, trace_id, trace):
571         self._traces[trace_id] = trace
572
573     def recover(self):
574         try:
575             # An internal flag, so we know to behave differently in
576             # a few corner cases.
577             self.recovering = True
578             
579             # Create and connect do not perform any real tasks against
580             # the nodes, it only sets up the object hierarchy,
581             # so we can run them normally
582             self.do_create()
583             self.do_connect_init()
584             self.do_connect_compl()
585             
586             # Manually recover nodes, to mark dependencies installed
587             # and clean up mutable attributes
588             self._do_in_factory_order(
589                 lambda self, guid : self._elements[guid].recover(), 
590                 [
591                     metadata.NODE,
592                 ])
593             
594             # Assign nodes - since we're working off exeucte XML, nodes
595             # have specific hostnames assigned and we don't need to do
596             # real assignment, only find out node ids and check liveliness
597             self.do_resource_discovery(recover = True)
598             self.do_wait_nodes()
599             
600             # Pre/post configure, however, tends to set up tunnels
601             # Execute configuration steps only for those object
602             # kinds that do not have side effects
603             
604             # Do the ones without side effects,
605             # including nodes that need to set up home 
606             # folders and all that
607             self._do_in_factory_order(
608                 "preconfigure_function", 
609                 [
610                     metadata.INTERNET,
611                     Parallel(metadata.NODE),
612                     metadata.NODEIFACE,
613                 ])
614             
615             # Tunnels require a home path that is configured
616             # at this step. Since we cannot run the step itself,
617             # we need to inject this homepath ourselves
618             for guid, element in self._elements.iteritems():
619                 if isinstance(element, self._interfaces.TunIface):
620                     element._home_path = "tun-%s" % (guid,)
621             
622             # Manually recover tunnels, applications and
623             # netpipes, negating the side effects
624             self._do_in_factory_order(
625                 lambda self, guid : self._elements[guid].recover(), 
626                 [
627                     Parallel(metadata.TAPIFACE),
628                     Parallel(metadata.TUNIFACE),
629                     metadata.NETPIPE,
630                     Parallel(metadata.NEPIDEPENDENCY),
631                     Parallel(metadata.NS3DEPENDENCY),
632                     Parallel(metadata.DEPENDENCY),
633                     Parallel(metadata.APPLICATION),
634                 ])
635
636             # Tunnels are not harmed by configuration after
637             # recovery, and some attributes get set this way
638             # like external_iface
639             self._do_in_factory_order(
640                 "preconfigure_function", 
641                 [
642                     Parallel(metadata.TAPIFACE),
643                     Parallel(metadata.TUNIFACE),
644                 ])
645
646             # Post-do the ones without side effects
647             self._do_in_factory_order(
648                 "configure_function", 
649                 [
650                     metadata.INTERNET,
651                     Parallel(metadata.NODE),
652                     metadata.NODEIFACE,
653                     Parallel(metadata.TAPIFACE),
654                     Parallel(metadata.TUNIFACE),
655                 ])
656             
657             # There are no required prestart steps
658             # to call upon recovery, so we're done
659         finally:
660             self.recovering = True
661     
662     def _make_generic(self, parameters, kind):
663         app = kind(self.plapi)
664         app.testbed = weakref.ref(self)
665
666         # Note: there is 1-to-1 correspondence between attribute names
667         #   If that changes, this has to change as well
668         for attr,val in parameters.iteritems():
669             try:
670                 setattr(app, attr, val)
671             except:
672                 # We ignore these errors while recovering.
673                 # Some attributes are immutable, and setting
674                 # them is necessary (to recover the state), but
675                 # some are not (they throw an exception).
676                 if not self.recovering:
677                     raise
678
679         return app
680
681     def _make_node(self, parameters):
682         node = self._make_generic(parameters, self._node.Node)
683         node.enable_cleanup = self.dedicatedSlice
684         return node
685
686     def _make_node_iface(self, parameters):
687         return self._make_generic(parameters, self._interfaces.NodeIface)
688
689     def _make_tun_iface(self, parameters):
690         return self._make_generic(parameters, self._interfaces.TunIface)
691
692     def _make_tap_iface(self, parameters):
693         return self._make_generic(parameters, self._interfaces.TapIface)
694
695     def _make_netpipe(self, parameters):
696         return self._make_generic(parameters, self._interfaces.NetPipe)
697
698     def _make_internet(self, parameters):
699         return self._make_generic(parameters, self._interfaces.Internet)
700
701     def _make_application(self, parameters):
702         return self._make_generic(parameters, self._app.Application)
703
704     def _make_dependency(self, parameters):
705         return self._make_generic(parameters, self._app.Dependency)
706
707     def _make_nepi_dependency(self, parameters):
708         return self._make_generic(parameters, self._app.NepiDependency)
709
710     def _make_ns3_dependency(self, parameters):
711         return self._make_generic(parameters, self._app.NS3Dependency)
712
713     def _make_tun_filter(self, parameters):
714         return self._make_generic(parameters, self._interfaces.TunFilter)
715
716     def _make_class_queue_filter(self, parameters):
717         return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
718
719     def _make_tos_queue_filter(self, parameters):
720         return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
721
722     def _make_multicast_forwarder(self, parameters):
723         return self._make_generic(parameters, self._multicast.MulticastForwarder)
724
725     def _make_multicast_announcer(self, parameters):
726         return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
727
728     def _make_multicast_router(self, parameters):
729         return self._make_generic(parameters, self._multicast.MulticastRouter)
730
731