Merge with head
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28 import weakref
29
30 class TempKeyError(Exception):
31     pass
32
33 class TestbedController(testbed_impl.TestbedController):
34     def __init__(self):
35         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
36         self._home_directory = None
37         self.slicename = None
38         self._traces = dict()
39
40         import node, interfaces, application
41         self._node = node
42         self._interfaces = interfaces
43         self._app = application
44         
45         self._blacklist = set()
46         self._just_provisioned = set()
47         
48         self._load_blacklist()
49         
50         self._logger = logging.getLogger('nepi.testbeds.planetlab')
51         
52         self.recovering = False
53
54     @property
55     def home_directory(self):
56         return self._home_directory
57
58     @property
59     def plapi(self):
60         if not hasattr(self, '_plapi'):
61             import plcapi
62
63             if self.authUser:
64                 self._plapi = plcapi.PLCAPI(
65                     username = self.authUser,
66                     password = self.authString,
67                     hostname = self.plcHost,
68                     urlpattern = self.plcUrl
69                     )
70             else:
71                 # anonymous access - may not be enough for much
72                 self._plapi = plcapi.PLCAPI()
73         return self._plapi
74
75     @property
76     def slice_id(self):
77         if not hasattr(self, '_slice_id'):
78             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
79             if slices:
80                 self._slice_id = slices[0]['slice_id']
81             else:
82                 # If it wasn't found, don't remember this failure, keep trying
83                 return None
84         return self._slice_id
85     
86     @property
87     def vsys_vnet(self):
88         if not hasattr(self, '_vsys_vnet'):
89             slicetags = self.plapi.GetSliceTags(
90                 name = self.slicename,
91                 tagname = 'vsys_vnet',
92                 fields=('value',))
93             if slicetags:
94                 self._vsys_vnet = slicetags[0]['value']
95             else:
96                 # If it wasn't found, don't remember this failure, keep trying
97                 return None
98         return self._vsys_vnet
99     
100     def _load_blacklist(self):
101         blpath = environ.homepath('plblacklist')
102         
103         try:
104             bl = open(blpath, "r")
105         except:
106             self._blacklist = set()
107             return
108             
109         try:
110             self._blacklist = set(
111                 map(int,
112                     map(str.strip, bl.readlines())
113                 )
114             )
115         finally:
116             bl.close()
117     
118     def _save_blacklist(self):
119         blpath = environ.homepath('plblacklist')
120         bl = open(blpath, "w")
121         try:
122             bl.writelines(
123                 map('%s\n'.__mod__, self._blacklist))
124         finally:
125             bl.close()
126     
127     def do_setup(self):
128         self._home_directory = self._attributes.\
129             get_attribute_value("homeDirectory")
130         self.slicename = self._attributes.\
131             get_attribute_value("slice")
132         self.authUser = self._attributes.\
133             get_attribute_value("authUser")
134         self.authString = self._attributes.\
135             get_attribute_value("authPass")
136         self.sliceSSHKey = self._attributes.\
137             get_attribute_value("sliceSSHKey")
138         self.sliceSSHKeyPass = None
139         self.plcHost = self._attributes.\
140             get_attribute_value("plcHost")
141         self.plcUrl = self._attributes.\
142             get_attribute_value("plcUrl")
143         self.logLevel = self._attributes.\
144             get_attribute_value("plLogLevel")
145         self.tapPortBase = self._attributes.\
146             get_attribute_value("tapPortBase")
147         self.p2pDeployment = self._attributes.\
148             get_attribute_value("p2pDeployment")
149         self.dedicatedSlice = self._attributes.\
150             get_attribute_value("dedicatedSlice")
151         
152         if not self.slicename:
153             raise RuntimeError, "Slice not set"
154         if not self.authUser:
155             raise RuntimeError, "PlanetLab account username not set"
156         if not self.authString:
157             raise RuntimeError, "PlanetLab account passphrase not set"
158         
159         self._logger.setLevel(getattr(logging,self.logLevel))
160         
161         super(TestbedController, self).do_setup()
162
163     def do_post_asynclaunch(self, guid):
164         # Dependencies were launched asynchronously,
165         # so wait for them
166         dep = self._elements[guid]
167         if isinstance(dep, self._app.Dependency):
168             dep.async_setup_wait()
169     
170     # Two-phase configuration for asynchronous launch
171     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
172     do_poststep_configure = staticmethod(do_post_asynclaunch)
173
174     def do_preconfigure(self):
175         while True:
176             # Perform resource discovery if we don't have
177             # specific resources assigned yet
178             self.do_resource_discovery()
179
180             # Create PlanetLab slivers
181             self.do_provisioning()
182             
183             try:
184                 # Wait for provisioning
185                 self.do_wait_nodes()
186                 
187                 # Okkey...
188                 break
189             except self._node.UnresponsiveNodeError:
190                 # Oh... retry...
191                 pass
192         
193         if self.p2pDeployment:
194             # Plan application deployment
195             self.do_spanning_deployment_plan()
196
197         # Configure elements per XML data
198         super(TestbedController, self).do_preconfigure()
199
200     def do_resource_discovery(self, recover = False):
201         to_provision = self._to_provision = set()
202         
203         reserved = set(self._blacklist)
204         for guid, node in self._elements.iteritems():
205             if isinstance(node, self._node.Node) and node._node_id is not None:
206                 reserved.add(node._node_id)
207         
208         # Initial algo:
209         #   look for perfectly defined nodes
210         #   (ie: those with only one candidate)
211         for guid, node in self._elements.iteritems():
212             if isinstance(node, self._node.Node) and node._node_id is None:
213                 # Try existing nodes first
214                 # If we have only one candidate, simply use it
215                 candidates = node.find_candidates(
216                     filter_slice_id = self.slice_id)
217                 candidates -= reserved
218                 if len(candidates) == 1:
219                     node_id = iter(candidates).next()
220                     node.assign_node_id(node_id)
221                     reserved.add(node_id)
222                 elif not candidates:
223                     # Try again including unassigned nodes
224                     candidates = node.find_candidates()
225                     candidates -= reserved
226                     if len(candidates) > 1:
227                         continue
228                     if len(candidates) == 1:
229                         node_id = iter(candidates).next()
230                         node.assign_node_id(node_id)
231                         to_provision.add(node_id)
232                         reserved.add(node_id)
233                     elif not candidates:
234                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
235                             node.make_filter_description())
236         
237         # Now do the backtracking search for a suitable solution
238         # First with existing slice nodes
239         reqs = []
240         nodes = []
241         for guid, node in self._elements.iteritems():
242             if isinstance(node, self._node.Node) and node._node_id is None:
243                 # Try existing nodes first
244                 # If we have only one candidate, simply use it
245                 candidates = node.find_candidates(
246                     filter_slice_id = self.slice_id)
247                 candidates -= reserved
248                 reqs.append(candidates)
249                 nodes.append(node)
250         
251         if nodes and reqs:
252             if recover:
253                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
254             
255             try:
256                 solution = resourcealloc.alloc(reqs)
257             except resourcealloc.ResourceAllocationError:
258                 # Failed, try again with all nodes
259                 reqs = []
260                 for node in nodes:
261                     candidates = node.find_candidates()
262                     candidates -= reserved
263                     reqs.append(candidates)
264                 
265                 solution = resourcealloc.alloc(reqs)
266                 to_provision.update(solution)
267             
268             # Do assign nodes
269             for node, node_id in zip(nodes, solution):
270                 node.assign_node_id(node_id)
271
272     def do_provisioning(self):
273         if self._to_provision:
274             # Add new nodes to the slice
275             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
276             new_nodes = list(set(cur_nodes) | self._to_provision)
277             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
278
279         # cleanup
280         self._just_provisioned = self._to_provision
281         del self._to_provision
282     
283     def do_wait_nodes(self):
284         for guid, node in self._elements.iteritems():
285             if isinstance(node, self._node.Node):
286                 # Just inject configuration stuff
287                 node.home_path = "nepi-node-%s" % (guid,)
288                 node.ident_path = self.sliceSSHKey
289                 node.slicename = self.slicename
290             
291                 # Show the magic
292                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
293             
294         try:
295             for guid, node in self._elements.iteritems():
296                 if isinstance(node, self._node.Node):
297                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
298                     
299                     node.wait_provisioning(
300                         (20*60 if node._node_id in self._just_provisioned else 60)
301                     )
302                     
303                     self._logger.info("READY Node %s at %s", guid, node.hostname)
304                     
305                     # Prepare dependency installer now
306                     node.prepare_dependencies()
307         except self._node.UnresponsiveNodeError:
308             # Uh... 
309             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
310             
311             # Mark all dead nodes (which are unresponsive) on the blacklist
312             # and re-raise
313             for guid, node in self._elements.iteritems():
314                 if isinstance(node, self._node.Node):
315                     if not node.is_alive():
316                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
317                         self._blacklist.add(node._node_id)
318                         node.unassign_node()
319             
320             try:
321                 self._save_blacklist()
322             except:
323                 # not important...
324                 import traceback
325                 traceback.print_exc()
326             
327             raise
328     
329     def do_spanning_deployment_plan(self):
330         # Create application groups by collecting all applications
331         # based on their hash - the hash should contain everything that
332         # defines them and the platform they're built
333         
334         def dephash(app):
335             return (
336                 frozenset((app.depends or "").split(' ')),
337                 frozenset((app.sources or "").split(' ')),
338                 app.build,
339                 app.install,
340                 app.node.architecture,
341                 app.node.operatingSystem,
342                 app.node.pl_distro,
343             )
344         
345         depgroups = collections.defaultdict(list)
346         
347         for element in self._elements.itervalues():
348             if isinstance(element, self._app.Dependency):
349                 depgroups[dephash(element)].append(element)
350             elif isinstance(element, self._node.Node):
351                 deps = element._yum_dependencies
352                 if deps:
353                     depgroups[dephash(deps)].append(deps)
354         
355         # Set up spanning deployment for those applications that
356         # have been deployed in several nodes.
357         for dh, group in depgroups.iteritems():
358             if len(group) > 1:
359                 # Pick root (deterministically)
360                 root = min(group, key=lambda app:app.node.hostname)
361                 
362                 # Obtain all IPs in numeric format
363                 # (which means faster distance computations)
364                 for dep in group:
365                     dep._ip = socket.gethostbyname(dep.node.hostname)
366                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
367                 
368                 # Compute plan
369                 # NOTE: the plan is an iterator
370                 plan = mst.mst(
371                     group,
372                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
373                     root = root,
374                     maxbranching = 2)
375                 
376                 # Re-sign private key
377                 try:
378                     tempprk, temppuk, tmppass = self._make_temp_private_key()
379                 except TempKeyError:
380                     continue
381                 
382                 # Set up slaves
383                 plan = list(plan)
384                 for slave, master in plan:
385                     slave.set_master(master)
386                     slave.install_keys(tempprk, temppuk, tmppass)
387                     
388         # We don't need the user's passphrase anymore
389         self.sliceSSHKeyPass = None
390     
391     def _make_temp_private_key(self):
392         # Get the user's key's passphrase
393         if not self.sliceSSHKeyPass:
394             if 'SSH_ASKPASS' in os.environ:
395                 proc = subprocess.Popen(
396                     [ os.environ['SSH_ASKPASS'],
397                       "Please type the passphrase for the %s SSH identity file. "
398                       "The passphrase will be used to re-cipher the identity file with "
399                       "a random 256-bit key for automated chain deployment on the "
400                       "%s PlanetLab slice" % ( 
401                         os.path.basename(self.sliceSSHKey), 
402                         self.slicename
403                     ) ],
404                     stdin = open("/dev/null"),
405                     stdout = subprocess.PIPE,
406                     stderr = subprocess.PIPE)
407                 out,err = proc.communicate()
408                 self.sliceSSHKeyPass = out.strip()
409         
410         if not self.sliceSSHKeyPass:
411             raise TempKeyError
412         
413         # Create temporary key files
414         prk = tempfile.NamedTemporaryFile(
415             dir = self.root_directory,
416             prefix = "pl_deploy_tmpk_",
417             suffix = "")
418
419         puk = tempfile.NamedTemporaryFile(
420             dir = self.root_directory,
421             prefix = "pl_deploy_tmpk_",
422             suffix = ".pub")
423             
424         # Create secure 256-bits temporary passphrase
425         passphrase = ''.join(map(chr,[rng.randint(0,255) 
426                                       for rng in (random.SystemRandom(),)
427                                       for i in xrange(32)] )).encode("hex")
428                 
429         # Copy keys
430         oprk = open(self.sliceSSHKey, "rb")
431         opuk = open(self.sliceSSHKey+".pub", "rb")
432         shutil.copymode(oprk.name, prk.name)
433         shutil.copymode(opuk.name, puk.name)
434         shutil.copyfileobj(oprk, prk)
435         shutil.copyfileobj(opuk, puk)
436         prk.flush()
437         puk.flush()
438         oprk.close()
439         opuk.close()
440         
441         # A descriptive comment
442         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
443         
444         # Recipher keys
445         proc = subprocess.Popen(
446             ["ssh-keygen", "-p",
447              "-f", prk.name,
448              "-P", self.sliceSSHKeyPass,
449              "-N", passphrase,
450              "-C", comment ],
451             stdout = subprocess.PIPE,
452             stderr = subprocess.PIPE,
453             stdin = subprocess.PIPE
454         )
455         out, err = proc.communicate()
456         
457         if err:
458             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
459                 out, err)
460         
461         prk.seek(0)
462         puk.seek(0)
463         
464         # Change comment on public key
465         puklines = puk.readlines()
466         puklines[0] = puklines[0].split(' ')
467         puklines[0][-1] = comment+'\n'
468         puklines[0] = ' '.join(puklines[0])
469         puk.seek(0)
470         puk.truncate()
471         puk.writelines(puklines)
472         del puklines
473         puk.flush()
474         
475         return prk, puk, passphrase
476     
477     def set(self, guid, name, value, time = TIME_NOW):
478         super(TestbedController, self).set(guid, name, value, time)
479         # TODO: take on account schedule time for the task
480         element = self._elements[guid]
481         if element:
482             try:
483                 setattr(element, name, value)
484             except:
485                 # We ignore these errors while recovering.
486                 # Some attributes are immutable, and setting
487                 # them is necessary (to recover the state), but
488                 # some are not (they throw an exception).
489                 if not self.recovering:
490                     raise
491
492             if hasattr(element, 'refresh'):
493                 # invoke attribute refresh hook
494                 element.refresh()
495
496     def get(self, guid, name, time = TIME_NOW):
497         value = super(TestbedController, self).get(guid, name, time)
498         # TODO: take on account schedule time for the task
499         factory_id = self._create[guid]
500         factory = self._factories[factory_id]
501         element = self._elements.get(guid)
502         try:
503             return getattr(element, name)
504         except (KeyError, AttributeError):
505             return value
506
507     def get_address(self, guid, index, attribute='Address'):
508         index = int(index)
509
510         # try the real stuff
511         iface = self._elements.get(guid)
512         if iface and index == 0:
513             if attribute == 'Address':
514                 return iface.address
515             elif attribute == 'NetPrefix':
516                 return iface.netprefix
517             elif attribute == 'Broadcast':
518                 return iface.broadcast
519
520         # if all else fails, query box
521         return super(TestbedController, self).get_address(guid, index, attribute)
522
523     def action(self, time, guid, action):
524         raise NotImplementedError
525
526     def shutdown(self):
527         for trace in self._traces.itervalues():
528             trace.close()
529         
530         def invokeif(action, testbed, guid):
531             element = self._elements[guid]
532             if hasattr(element, action):
533                 getattr(element, action)()
534         
535         self._do_in_factory_order(
536             functools.partial(invokeif, 'cleanup'),
537             metadata.shutdown_order)
538
539         self._do_in_factory_order(
540             functools.partial(invokeif, 'destroy'),
541             metadata.shutdown_order)
542             
543         self._elements.clear()
544         self._traces.clear()
545
546     def trace(self, guid, trace_id, attribute='value'):
547         app = self._elements[guid]
548
549         if attribute == 'value':
550             path = app.sync_trace(self.home_directory, trace_id)
551             if path:
552                 fd = open(path, "r")
553                 content = fd.read()
554                 fd.close()
555             else:
556                 content = None
557         elif attribute == 'path':
558             content = app.remote_trace_path(trace_id)
559         elif attribute == 'name':
560             content = app.remote_trace_name(trace_id)
561         else:
562             content = None
563         return content
564
565     def follow_trace(self, trace_id, trace):
566         self._traces[trace_id] = trace
567
568     def recover(self):
569         try:
570             # An internal flag, so we know to behave differently in
571             # a few corner cases.
572             self.recovering = True
573             
574             # Create and connect do not perform any real tasks against
575             # the nodes, it only sets up the object hierarchy,
576             # so we can run them normally
577             self.do_create()
578             self.do_connect_init()
579             self.do_connect_compl()
580             
581             # Manually recover nodes, to mark dependencies installed
582             # and clean up mutable attributes
583             self._do_in_factory_order(
584                 lambda self, guid : self._elements[guid].recover(), 
585                 [
586                     metadata.NODE,
587                 ])
588             
589             # Assign nodes - since we're working off exeucte XML, nodes
590             # have specific hostnames assigned and we don't need to do
591             # real assignment, only find out node ids and check liveliness
592             self.do_resource_discovery(recover = True)
593             self.do_wait_nodes()
594             
595             # Pre/post configure, however, tends to set up tunnels
596             # Execute configuration steps only for those object
597             # kinds that do not have side effects
598             
599             # Do the ones without side effects,
600             # including nodes that need to set up home 
601             # folders and all that
602             self._do_in_factory_order(
603                 "preconfigure_function", 
604                 [
605                     metadata.INTERNET,
606                     Parallel(metadata.NODE),
607                     metadata.NODEIFACE,
608                 ])
609             
610             # Tunnels require a home path that is configured
611             # at this step. Since we cannot run the step itself,
612             # we need to inject this homepath ourselves
613             for guid, element in self._elements.iteritems():
614                 if isinstance(element, self._interfaces.TunIface):
615                     element._home_path = "tun-%s" % (guid,)
616             
617             # Manually recover tunnels, applications and
618             # netpipes, negating the side effects
619             self._do_in_factory_order(
620                 lambda self, guid : self._elements[guid].recover(), 
621                 [
622                     Parallel(metadata.TAPIFACE),
623                     Parallel(metadata.TUNIFACE),
624                     metadata.NETPIPE,
625                     Parallel(metadata.NEPIDEPENDENCY),
626                     Parallel(metadata.NS3DEPENDENCY),
627                     Parallel(metadata.DEPENDENCY),
628                     Parallel(metadata.APPLICATION),
629                 ])
630
631             # Tunnels are not harmed by configuration after
632             # recovery, and some attributes get set this way
633             # like external_iface
634             self._do_in_factory_order(
635                 "preconfigure_function", 
636                 [
637                     Parallel(metadata.TAPIFACE),
638                     Parallel(metadata.TUNIFACE),
639                 ])
640
641             # Post-do the ones without side effects
642             self._do_in_factory_order(
643                 "configure_function", 
644                 [
645                     metadata.INTERNET,
646                     Parallel(metadata.NODE),
647                     metadata.NODEIFACE,
648                     Parallel(metadata.TAPIFACE),
649                     Parallel(metadata.TUNIFACE),
650                 ])
651             
652             # There are no required prestart steps
653             # to call upon recovery, so we're done
654         finally:
655             self.recovering = True
656     
657     def _make_generic(self, parameters, kind):
658         app = kind(self.plapi)
659         app.testbed = weakref.ref(self)
660
661         # Note: there is 1-to-1 correspondence between attribute names
662         #   If that changes, this has to change as well
663         for attr,val in parameters.iteritems():
664             try:
665                 setattr(app, attr, val)
666             except:
667                 # We ignore these errors while recovering.
668                 # Some attributes are immutable, and setting
669                 # them is necessary (to recover the state), but
670                 # some are not (they throw an exception).
671                 if not self.recovering:
672                     raise
673
674         return app
675
676     def _make_node(self, parameters):
677         node = self._make_generic(parameters, self._node.Node)
678         node.enable_cleanup = self.dedicatedSlice
679         return node
680
681     def _make_node_iface(self, parameters):
682         return self._make_generic(parameters, self._interfaces.NodeIface)
683
684     def _make_tun_iface(self, parameters):
685         return self._make_generic(parameters, self._interfaces.TunIface)
686
687     def _make_tap_iface(self, parameters):
688         return self._make_generic(parameters, self._interfaces.TapIface)
689
690     def _make_netpipe(self, parameters):
691         return self._make_generic(parameters, self._interfaces.NetPipe)
692
693     def _make_internet(self, parameters):
694         return self._make_generic(parameters, self._interfaces.Internet)
695
696     def _make_application(self, parameters):
697         return self._make_generic(parameters, self._app.Application)
698
699     def _make_dependency(self, parameters):
700         return self._make_generic(parameters, self._app.Dependency)
701
702     def _make_nepi_dependency(self, parameters):
703         return self._make_generic(parameters, self._app.NepiDependency)
704
705     def _make_ns3_dependency(self, parameters):
706         return self._make_generic(parameters, self._app.NS3Dependency)
707
708     def _make_tun_filter(self, parameters):
709         return self._make_generic(parameters, self._interfaces.TunFilter)
710