Recovery policy for testbeds, and recovery implementation in PlanetLab.
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28
29 class TempKeyError(Exception):
30     pass
31
32 class TestbedController(testbed_impl.TestbedController):
33     def __init__(self):
34         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
35         self._home_directory = None
36         self.slicename = None
37         self._traces = dict()
38
39         import node, interfaces, application
40         self._node = node
41         self._interfaces = interfaces
42         self._app = application
43         
44         self._blacklist = set()
45         self._just_provisioned = set()
46         
47         self._load_blacklist()
48         
49         self._logger = logging.getLogger('nepi.testbeds.planetlab')
50
51     @property
52     def home_directory(self):
53         return self._home_directory
54
55     @property
56     def plapi(self):
57         if not hasattr(self, '_plapi'):
58             import plcapi
59
60             if self.authUser:
61                 self._plapi = plcapi.PLCAPI(
62                     username = self.authUser,
63                     password = self.authString,
64                     hostname = self.plcHost,
65                     urlpattern = self.plcUrl
66                     )
67             else:
68                 # anonymous access - may not be enough for much
69                 self._plapi = plcapi.PLCAPI()
70         return self._plapi
71
72     @property
73     def slice_id(self):
74         if not hasattr(self, '_slice_id'):
75             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
76             if slices:
77                 self._slice_id = slices[0]['slice_id']
78             else:
79                 # If it wasn't found, don't remember this failure, keep trying
80                 return None
81         return self._slice_id
82     
83     def _load_blacklist(self):
84         blpath = environ.homepath('plblacklist')
85         
86         try:
87             bl = open(blpath, "r")
88         except:
89             self._blacklist = set()
90             return
91             
92         try:
93             self._blacklist = set(
94                 map(int,
95                     map(str.strip, bl.readlines())
96                 )
97             )
98         finally:
99             bl.close()
100     
101     def _save_blacklist(self):
102         blpath = environ.homepath('plblacklist')
103         bl = open(blpath, "w")
104         try:
105             bl.writelines(
106                 map('%s\n'.__mod__, self._blacklist))
107         finally:
108             bl.close()
109     
110     def do_setup(self):
111         self._home_directory = self._attributes.\
112             get_attribute_value("homeDirectory")
113         self.slicename = self._attributes.\
114             get_attribute_value("slice")
115         self.authUser = self._attributes.\
116             get_attribute_value("authUser")
117         self.authString = self._attributes.\
118             get_attribute_value("authPass")
119         self.sliceSSHKey = self._attributes.\
120             get_attribute_value("sliceSSHKey")
121         self.sliceSSHKeyPass = None
122         self.plcHost = self._attributes.\
123             get_attribute_value("plcHost")
124         self.plcUrl = self._attributes.\
125             get_attribute_value("plcUrl")
126         self.logLevel = self._attributes.\
127             get_attribute_value("plLogLevel")
128         self.tapPortBase = self._attributes.\
129             get_attribute_value("tapPortBase")
130         self.p2pDeployment = self._attributes.\
131             get_attribute_value("p2pDeployment")
132         
133         self._logger.setLevel(getattr(logging,self.logLevel))
134         
135         super(TestbedController, self).do_setup()
136
137     def do_post_asynclaunch(self, guid):
138         # Dependencies were launched asynchronously,
139         # so wait for them
140         dep = self._elements[guid]
141         if isinstance(dep, self._app.Dependency):
142             dep.async_setup_wait()
143     
144     # Two-phase configuration for asynchronous launch
145     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
146     do_poststep_configure = staticmethod(do_post_asynclaunch)
147
148     def do_preconfigure(self):
149         while True:
150             # Perform resource discovery if we don't have
151             # specific resources assigned yet
152             self.do_resource_discovery()
153
154             # Create PlanetLab slivers
155             self.do_provisioning()
156             
157             try:
158                 # Wait for provisioning
159                 self.do_wait_nodes()
160                 
161                 # Okkey...
162                 break
163             except self._node.UnresponsiveNodeError:
164                 # Oh... retry...
165                 pass
166         
167         if self.p2pDeployment:
168             # Plan application deployment
169             self.do_spanning_deployment_plan()
170
171         # Configure elements per XML data
172         super(TestbedController, self).do_preconfigure()
173
174     def do_resource_discovery(self, recover = False):
175         to_provision = self._to_provision = set()
176         
177         reserved = set(self._blacklist)
178         for guid, node in self._elements.iteritems():
179             if isinstance(node, self._node.Node) and node._node_id is not None:
180                 reserved.add(node._node_id)
181         
182         # Initial algo:
183         #   look for perfectly defined nodes
184         #   (ie: those with only one candidate)
185         for guid, node in self._elements.iteritems():
186             if isinstance(node, self._node.Node) and node._node_id is None:
187                 # Try existing nodes first
188                 # If we have only one candidate, simply use it
189                 candidates = node.find_candidates(
190                     filter_slice_id = self.slice_id)
191                 candidates -= reserved
192                 if len(candidates) == 1:
193                     node_id = iter(candidates).next()
194                     node.assign_node_id(node_id)
195                     reserved.add(node_id)
196                 elif not candidates:
197                     # Try again including unassigned nodes
198                     candidates = node.find_candidates()
199                     candidates -= reserved
200                     if len(candidates) > 1:
201                         continue
202                     if len(candidates) == 1:
203                         node_id = iter(candidates).next()
204                         node.assign_node_id(node_id)
205                         to_provision.add(node_id)
206                         reserved.add(node_id)
207                     elif not candidates:
208                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
209                             node.make_filter_description())
210         
211         # Now do the backtracking search for a suitable solution
212         # First with existing slice nodes
213         reqs = []
214         nodes = []
215         for guid, node in self._elements.iteritems():
216             if isinstance(node, self._node.Node) and node._node_id is None:
217                 # Try existing nodes first
218                 # If we have only one candidate, simply use it
219                 candidates = node.find_candidates(
220                     filter_slice_id = self.slice_id)
221                 candidates -= reserved
222                 reqs.append(candidates)
223                 nodes.append(node)
224         
225         if nodes and reqs:
226             if recover:
227                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
228             
229             try:
230                 solution = resourcealloc.alloc(reqs)
231             except resourcealloc.ResourceAllocationError:
232                 # Failed, try again with all nodes
233                 reqs = []
234                 for node in nodes:
235                     candidates = node.find_candidates()
236                     candidates -= reserved
237                     reqs.append(candidates)
238                 
239                 solution = resourcealloc.alloc(reqs)
240                 to_provision.update(solution)
241             
242             # Do assign nodes
243             for node, node_id in zip(nodes, solution):
244                 node.assign_node_id(node_id)
245
246     def do_provisioning(self):
247         if self._to_provision:
248             # Add new nodes to the slice
249             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
250             new_nodes = list(set(cur_nodes) | self._to_provision)
251             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
252
253         # cleanup
254         self._just_provisioned = self._to_provision
255         del self._to_provision
256     
257     def do_wait_nodes(self):
258         for guid, node in self._elements.iteritems():
259             if isinstance(node, self._node.Node):
260                 # Just inject configuration stuff
261                 node.home_path = "nepi-node-%s" % (guid,)
262                 node.ident_path = self.sliceSSHKey
263                 node.slicename = self.slicename
264             
265                 # Show the magic
266                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
267             
268         try:
269             for guid, node in self._elements.iteritems():
270                 if isinstance(node, self._node.Node):
271                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
272                     
273                     node.wait_provisioning(
274                         (20*60 if node._node_id in self._just_provisioned else 60)
275                     )
276                     
277                     self._logger.info("READY Node %s at %s", guid, node.hostname)
278                     
279                     # Prepare dependency installer now
280                     node.prepare_dependencies()
281         except self._node.UnresponsiveNodeError:
282             # Uh... 
283             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
284             
285             # Mark all dead nodes (which are unresponsive) on the blacklist
286             # and re-raise
287             for guid, node in self._elements.iteritems():
288                 if isinstance(node, self._node.Node):
289                     if not node.is_alive():
290                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
291                         self._blacklist.add(node._node_id)
292                         node.unassign_node()
293             
294             try:
295                 self._save_blacklist()
296             except:
297                 # not important...
298                 import traceback
299                 traceback.print_exc()
300             
301             raise
302     
303     def do_spanning_deployment_plan(self):
304         # Create application groups by collecting all applications
305         # based on their hash - the hash should contain everything that
306         # defines them and the platform they're built
307         
308         def dephash(app):
309             return (
310                 frozenset((app.depends or "").split(' ')),
311                 frozenset((app.sources or "").split(' ')),
312                 app.build,
313                 app.install,
314                 app.node.architecture,
315                 app.node.operatingSystem,
316                 app.node.pl_distro,
317             )
318         
319         depgroups = collections.defaultdict(list)
320         
321         for element in self._elements.itervalues():
322             if isinstance(element, self._app.Dependency):
323                 depgroups[dephash(element)].append(element)
324             elif isinstance(element, self._node.Node):
325                 deps = element._yum_dependencies
326                 if deps:
327                     depgroups[dephash(deps)].append(deps)
328         
329         # Set up spanning deployment for those applications that
330         # have been deployed in several nodes.
331         for dh, group in depgroups.iteritems():
332             if len(group) > 1:
333                 # Pick root (deterministically)
334                 root = min(group, key=lambda app:app.node.hostname)
335                 
336                 # Obtain all IPs in numeric format
337                 # (which means faster distance computations)
338                 for dep in group:
339                     dep._ip = socket.gethostbyname(dep.node.hostname)
340                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
341                 
342                 # Compute plan
343                 # NOTE: the plan is an iterator
344                 plan = mst.mst(
345                     group,
346                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
347                     root = root,
348                     maxbranching = 2)
349                 
350                 # Re-sign private key
351                 try:
352                     tempprk, temppuk, tmppass = self._make_temp_private_key()
353                 except TempKeyError:
354                     continue
355                 
356                 # Set up slaves
357                 plan = list(plan)
358                 for slave, master in plan:
359                     slave.set_master(master)
360                     slave.install_keys(tempprk, temppuk, tmppass)
361                     
362         # We don't need the user's passphrase anymore
363         self.sliceSSHKeyPass = None
364     
365     def _make_temp_private_key(self):
366         # Get the user's key's passphrase
367         if not self.sliceSSHKeyPass:
368             if 'SSH_ASKPASS' in os.environ:
369                 proc = subprocess.Popen(
370                     [ os.environ['SSH_ASKPASS'],
371                       "Please type the passphrase for the %s SSH identity file. "
372                       "The passphrase will be used to re-cipher the identity file with "
373                       "a random 256-bit key for automated chain deployment on the "
374                       "%s PlanetLab slice" % ( 
375                         os.path.basename(self.sliceSSHKey), 
376                         self.slicename
377                     ) ],
378                     stdin = open("/dev/null"),
379                     stdout = subprocess.PIPE,
380                     stderr = subprocess.PIPE)
381                 out,err = proc.communicate()
382                 self.sliceSSHKeyPass = out.strip()
383         
384         if not self.sliceSSHKeyPass:
385             raise TempKeyError
386         
387         # Create temporary key files
388         prk = tempfile.NamedTemporaryFile(
389             dir = self.root_directory,
390             prefix = "pl_deploy_tmpk_",
391             suffix = "")
392
393         puk = tempfile.NamedTemporaryFile(
394             dir = self.root_directory,
395             prefix = "pl_deploy_tmpk_",
396             suffix = ".pub")
397             
398         # Create secure 256-bits temporary passphrase
399         passphrase = ''.join(map(chr,[rng.randint(0,255) 
400                                       for rng in (random.SystemRandom(),)
401                                       for i in xrange(32)] )).encode("hex")
402                 
403         # Copy keys
404         oprk = open(self.sliceSSHKey, "rb")
405         opuk = open(self.sliceSSHKey+".pub", "rb")
406         shutil.copymode(oprk.name, prk.name)
407         shutil.copymode(opuk.name, puk.name)
408         shutil.copyfileobj(oprk, prk)
409         shutil.copyfileobj(opuk, puk)
410         prk.flush()
411         puk.flush()
412         oprk.close()
413         opuk.close()
414         
415         # A descriptive comment
416         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
417         
418         # Recipher keys
419         proc = subprocess.Popen(
420             ["ssh-keygen", "-p",
421              "-f", prk.name,
422              "-P", self.sliceSSHKeyPass,
423              "-N", passphrase,
424              "-C", comment ],
425             stdout = subprocess.PIPE,
426             stderr = subprocess.PIPE,
427             stdin = subprocess.PIPE
428         )
429         out, err = proc.communicate()
430         
431         if err:
432             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
433                 out, err)
434         
435         prk.seek(0)
436         puk.seek(0)
437         
438         # Change comment on public key
439         puklines = puk.readlines()
440         puklines[0] = puklines[0].split(' ')
441         puklines[0][-1] = comment+'\n'
442         puklines[0] = ' '.join(puklines[0])
443         puk.seek(0)
444         puk.truncate()
445         puk.writelines(puklines)
446         del puklines
447         puk.flush()
448         
449         return prk, puk, passphrase
450     
451     def set(self, guid, name, value, time = TIME_NOW):
452         super(TestbedController, self).set(guid, name, value, time)
453         # TODO: take on account schedule time for the task
454         element = self._elements[guid]
455         if element:
456             setattr(element, name, value)
457
458             if hasattr(element, 'refresh'):
459                 # invoke attribute refresh hook
460                 element.refresh()
461
462     def get(self, guid, name, time = TIME_NOW):
463         value = super(TestbedController, self).get(guid, name, time)
464         # TODO: take on account schedule time for the task
465         factory_id = self._create[guid]
466         factory = self._factories[factory_id]
467         element = self._elements.get(guid)
468         try:
469             return getattr(element, name)
470         except (KeyError, AttributeError):
471             return value
472
473     def get_address(self, guid, index, attribute='Address'):
474         index = int(index)
475
476         # try the real stuff
477         iface = self._elements.get(guid)
478         if iface and index == 0:
479             if attribute == 'Address':
480                 return iface.address
481             elif attribute == 'NetPrefix':
482                 return iface.netprefix
483             elif attribute == 'Broadcast':
484                 return iface.broadcast
485
486         # if all else fails, query box
487         return super(TestbedController, self).get_address(guid, index, attribute)
488
489     def action(self, time, guid, action):
490         raise NotImplementedError
491
492     def shutdown(self):
493         for trace in self._traces.itervalues():
494             trace.close()
495         
496         runner = ParallelRun(16)
497         runner.start()
498         for element in self._elements.itervalues():
499             # invoke cleanup hooks
500             if hasattr(element, 'cleanup'):
501                 runner.put(element.cleanup)
502         runner.join()
503         
504         runner = ParallelRun(16)
505         runner.start()
506         for element in self._elements.itervalues():
507             # invoke destroy hooks
508             if hasattr(element, 'destroy'):
509                 runner.put(element.destroy)
510         runner.join()
511         
512         self._elements.clear()
513         self._traces.clear()
514
515     def trace(self, guid, trace_id, attribute='value'):
516         app = self._elements[guid]
517
518         if attribute == 'value':
519             path = app.sync_trace(self.home_directory, trace_id)
520             if path:
521                 fd = open(path, "r")
522                 content = fd.read()
523                 fd.close()
524             else:
525                 content = None
526         elif attribute == 'path':
527             content = app.remote_trace_path(trace_id)
528         else:
529             content = None
530         return content
531
532     def follow_trace(self, trace_id, trace):
533         self._traces[trace_id] = trace
534
535     def recover(self):
536         # Create and connect do not perform any real tasks against
537         # the nodes, it only sets up the object hierarchy,
538         # so we can run them normally
539         self.do_create()
540         self.do_connect_init()
541         self.do_connect_compl()
542         
543         # Assign nodes - since we're working off exeucte XML, nodes
544         # have specific hostnames assigned and we don't need to do
545         # real assignment, only find out node ids and check liveliness
546         self.do_resource_discovery(recover = True)
547         self.do_wait_nodes()
548         
549         # Pre/post configure, however, tends to set up tunnels
550         # Execute configuration steps only for those object
551         # kinds that do not have side effects
552         
553         # Manually recover nodes, to mark dependencies installed
554         self._do_in_factory_order(
555             lambda self, guid : self._elements[guid].recover(), 
556             [
557                 metadata.NODE,
558             ])
559         
560         # Do the ones without side effects,
561         # including nodes that need to set up home 
562         # folders and all that
563         self._do_in_factory_order(
564             "preconfigure_function", 
565             [
566                 metadata.INTERNET,
567                 Parallel(metadata.NODE),
568                 metadata.NODEIFACE,
569             ])
570         
571         # Tunnels require a home path that is configured
572         # at this step. Since we cannot run the step itself,
573         # we need to inject this homepath ourselves
574         for guid, element in self._elements.iteritems():
575             if isinstance(element, self._interfaces.TunIface):
576                 element._home_path = "tun-%s" % (guid,)
577         
578         # Manually recover tunnels, applications and
579         # netpipes, negating the side effects
580         self._do_in_factory_order(
581             lambda self, guid : self._elements[guid].recover(), 
582             [
583                 Parallel(metadata.TAPIFACE),
584                 Parallel(metadata.TUNIFACE),
585                 metadata.NETPIPE,
586                 Parallel(metadata.NEPIDEPENDENCY),
587                 Parallel(metadata.NS3DEPENDENCY),
588                 Parallel(metadata.DEPENDENCY),
589                 Parallel(metadata.APPLICATION),
590             ])
591
592         # Tunnels are not harmed by configuration after
593         # recovery, and some attributes get set this way
594         # like external_iface
595         self._do_in_factory_order(
596             "preconfigure_function", 
597             [
598                 Parallel(metadata.TAPIFACE),
599                 Parallel(metadata.TUNIFACE),
600             ])
601
602         # Post-do the ones without side effects
603         self._do_in_factory_order(
604             "configure_function", 
605             [
606                 metadata.INTERNET,
607                 Parallel(metadata.NODE),
608                 metadata.NODEIFACE,
609                 Parallel(metadata.TAPIFACE),
610                 Parallel(metadata.TUNIFACE),
611             ])
612         
613         # There are no required prestart steps
614         # to call upon recovery, so we're done
615         
616     
617     def _make_generic(self, parameters, kind):
618         app = kind(self.plapi)
619
620         # Note: there is 1-to-1 correspondence between attribute names
621         #   If that changes, this has to change as well
622         for attr,val in parameters.iteritems():
623             setattr(app, attr, val)
624
625         return app
626
627     def _make_node(self, parameters):
628         return self._make_generic(parameters, self._node.Node)
629
630     def _make_node_iface(self, parameters):
631         return self._make_generic(parameters, self._interfaces.NodeIface)
632
633     def _make_tun_iface(self, parameters):
634         return self._make_generic(parameters, self._interfaces.TunIface)
635
636     def _make_tap_iface(self, parameters):
637         return self._make_generic(parameters, self._interfaces.TapIface)
638
639     def _make_netpipe(self, parameters):
640         return self._make_generic(parameters, self._interfaces.NetPipe)
641
642     def _make_internet(self, parameters):
643         return self._make_generic(parameters, self._interfaces.Internet)
644
645     def _make_application(self, parameters):
646         return self._make_generic(parameters, self._app.Application)
647
648     def _make_dependency(self, parameters):
649         return self._make_generic(parameters, self._app.Dependency)
650
651     def _make_nepi_dependency(self, parameters):
652         return self._make_generic(parameters, self._app.NepiDependency)
653
654     def _make_ns3_dependency(self, parameters):
655         return self._make_generic(parameters, self._app.NS3Dependency)
656