Slice cleanup: dedicated slices can be cleaned up thoroughly.
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28
29 class TempKeyError(Exception):
30     pass
31
32 class TestbedController(testbed_impl.TestbedController):
33     def __init__(self):
34         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
35         self._home_directory = None
36         self.slicename = None
37         self._traces = dict()
38
39         import node, interfaces, application
40         self._node = node
41         self._interfaces = interfaces
42         self._app = application
43         
44         self._blacklist = set()
45         self._just_provisioned = set()
46         
47         self._load_blacklist()
48         
49         self._logger = logging.getLogger('nepi.testbeds.planetlab')
50
51     @property
52     def home_directory(self):
53         return self._home_directory
54
55     @property
56     def plapi(self):
57         if not hasattr(self, '_plapi'):
58             import plcapi
59
60             if self.authUser:
61                 self._plapi = plcapi.PLCAPI(
62                     username = self.authUser,
63                     password = self.authString,
64                     hostname = self.plcHost,
65                     urlpattern = self.plcUrl
66                     )
67             else:
68                 # anonymous access - may not be enough for much
69                 self._plapi = plcapi.PLCAPI()
70         return self._plapi
71
72     @property
73     def slice_id(self):
74         if not hasattr(self, '_slice_id'):
75             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
76             if slices:
77                 self._slice_id = slices[0]['slice_id']
78             else:
79                 # If it wasn't found, don't remember this failure, keep trying
80                 return None
81         return self._slice_id
82     
83     def _load_blacklist(self):
84         blpath = environ.homepath('plblacklist')
85         
86         try:
87             bl = open(blpath, "r")
88         except:
89             self._blacklist = set()
90             return
91             
92         try:
93             self._blacklist = set(
94                 map(int,
95                     map(str.strip, bl.readlines())
96                 )
97             )
98         finally:
99             bl.close()
100     
101     def _save_blacklist(self):
102         blpath = environ.homepath('plblacklist')
103         bl = open(blpath, "w")
104         try:
105             bl.writelines(
106                 map('%s\n'.__mod__, self._blacklist))
107         finally:
108             bl.close()
109     
110     def do_setup(self):
111         self._home_directory = self._attributes.\
112             get_attribute_value("homeDirectory")
113         self.slicename = self._attributes.\
114             get_attribute_value("slice")
115         self.authUser = self._attributes.\
116             get_attribute_value("authUser")
117         self.authString = self._attributes.\
118             get_attribute_value("authPass")
119         self.sliceSSHKey = self._attributes.\
120             get_attribute_value("sliceSSHKey")
121         self.sliceSSHKeyPass = None
122         self.plcHost = self._attributes.\
123             get_attribute_value("plcHost")
124         self.plcUrl = self._attributes.\
125             get_attribute_value("plcUrl")
126         self.logLevel = self._attributes.\
127             get_attribute_value("plLogLevel")
128         self.tapPortBase = self._attributes.\
129             get_attribute_value("tapPortBase")
130         self.p2pDeployment = self._attributes.\
131             get_attribute_value("p2pDeployment")
132         self.dedicatedSlice = self._attributes.\
133             get_attribute_value("dedicatedSlice")
134         
135         self._logger.setLevel(getattr(logging,self.logLevel))
136         
137         super(TestbedController, self).do_setup()
138
139     def do_post_asynclaunch(self, guid):
140         # Dependencies were launched asynchronously,
141         # so wait for them
142         dep = self._elements[guid]
143         if isinstance(dep, self._app.Dependency):
144             dep.async_setup_wait()
145     
146     # Two-phase configuration for asynchronous launch
147     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
148     do_poststep_configure = staticmethod(do_post_asynclaunch)
149
150     def do_preconfigure(self):
151         while True:
152             # Perform resource discovery if we don't have
153             # specific resources assigned yet
154             self.do_resource_discovery()
155
156             # Create PlanetLab slivers
157             self.do_provisioning()
158             
159             try:
160                 # Wait for provisioning
161                 self.do_wait_nodes()
162                 
163                 # Okkey...
164                 break
165             except self._node.UnresponsiveNodeError:
166                 # Oh... retry...
167                 pass
168         
169         if self.p2pDeployment:
170             # Plan application deployment
171             self.do_spanning_deployment_plan()
172
173         # Configure elements per XML data
174         super(TestbedController, self).do_preconfigure()
175
176     def do_resource_discovery(self, recover = False):
177         to_provision = self._to_provision = set()
178         
179         reserved = set(self._blacklist)
180         for guid, node in self._elements.iteritems():
181             if isinstance(node, self._node.Node) and node._node_id is not None:
182                 reserved.add(node._node_id)
183         
184         # Initial algo:
185         #   look for perfectly defined nodes
186         #   (ie: those with only one candidate)
187         for guid, node in self._elements.iteritems():
188             if isinstance(node, self._node.Node) and node._node_id is None:
189                 # Try existing nodes first
190                 # If we have only one candidate, simply use it
191                 candidates = node.find_candidates(
192                     filter_slice_id = self.slice_id)
193                 candidates -= reserved
194                 if len(candidates) == 1:
195                     node_id = iter(candidates).next()
196                     node.assign_node_id(node_id)
197                     reserved.add(node_id)
198                 elif not candidates:
199                     # Try again including unassigned nodes
200                     candidates = node.find_candidates()
201                     candidates -= reserved
202                     if len(candidates) > 1:
203                         continue
204                     if len(candidates) == 1:
205                         node_id = iter(candidates).next()
206                         node.assign_node_id(node_id)
207                         to_provision.add(node_id)
208                         reserved.add(node_id)
209                     elif not candidates:
210                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
211                             node.make_filter_description())
212         
213         # Now do the backtracking search for a suitable solution
214         # First with existing slice nodes
215         reqs = []
216         nodes = []
217         for guid, node in self._elements.iteritems():
218             if isinstance(node, self._node.Node) and node._node_id is None:
219                 # Try existing nodes first
220                 # If we have only one candidate, simply use it
221                 candidates = node.find_candidates(
222                     filter_slice_id = self.slice_id)
223                 candidates -= reserved
224                 reqs.append(candidates)
225                 nodes.append(node)
226         
227         if nodes and reqs:
228             if recover:
229                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
230             
231             try:
232                 solution = resourcealloc.alloc(reqs)
233             except resourcealloc.ResourceAllocationError:
234                 # Failed, try again with all nodes
235                 reqs = []
236                 for node in nodes:
237                     candidates = node.find_candidates()
238                     candidates -= reserved
239                     reqs.append(candidates)
240                 
241                 solution = resourcealloc.alloc(reqs)
242                 to_provision.update(solution)
243             
244             # Do assign nodes
245             for node, node_id in zip(nodes, solution):
246                 node.assign_node_id(node_id)
247
248     def do_provisioning(self):
249         if self._to_provision:
250             # Add new nodes to the slice
251             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
252             new_nodes = list(set(cur_nodes) | self._to_provision)
253             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
254
255         # cleanup
256         self._just_provisioned = self._to_provision
257         del self._to_provision
258     
259     def do_wait_nodes(self):
260         for guid, node in self._elements.iteritems():
261             if isinstance(node, self._node.Node):
262                 # Just inject configuration stuff
263                 node.home_path = "nepi-node-%s" % (guid,)
264                 node.ident_path = self.sliceSSHKey
265                 node.slicename = self.slicename
266             
267                 # Show the magic
268                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
269             
270         try:
271             for guid, node in self._elements.iteritems():
272                 if isinstance(node, self._node.Node):
273                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
274                     
275                     node.wait_provisioning(
276                         (20*60 if node._node_id in self._just_provisioned else 60)
277                     )
278                     
279                     self._logger.info("READY Node %s at %s", guid, node.hostname)
280                     
281                     # Prepare dependency installer now
282                     node.prepare_dependencies()
283         except self._node.UnresponsiveNodeError:
284             # Uh... 
285             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
286             
287             # Mark all dead nodes (which are unresponsive) on the blacklist
288             # and re-raise
289             for guid, node in self._elements.iteritems():
290                 if isinstance(node, self._node.Node):
291                     if not node.is_alive():
292                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
293                         self._blacklist.add(node._node_id)
294                         node.unassign_node()
295             
296             try:
297                 self._save_blacklist()
298             except:
299                 # not important...
300                 import traceback
301                 traceback.print_exc()
302             
303             raise
304     
305     def do_spanning_deployment_plan(self):
306         # Create application groups by collecting all applications
307         # based on their hash - the hash should contain everything that
308         # defines them and the platform they're built
309         
310         def dephash(app):
311             return (
312                 frozenset((app.depends or "").split(' ')),
313                 frozenset((app.sources or "").split(' ')),
314                 app.build,
315                 app.install,
316                 app.node.architecture,
317                 app.node.operatingSystem,
318                 app.node.pl_distro,
319             )
320         
321         depgroups = collections.defaultdict(list)
322         
323         for element in self._elements.itervalues():
324             if isinstance(element, self._app.Dependency):
325                 depgroups[dephash(element)].append(element)
326             elif isinstance(element, self._node.Node):
327                 deps = element._yum_dependencies
328                 if deps:
329                     depgroups[dephash(deps)].append(deps)
330         
331         # Set up spanning deployment for those applications that
332         # have been deployed in several nodes.
333         for dh, group in depgroups.iteritems():
334             if len(group) > 1:
335                 # Pick root (deterministically)
336                 root = min(group, key=lambda app:app.node.hostname)
337                 
338                 # Obtain all IPs in numeric format
339                 # (which means faster distance computations)
340                 for dep in group:
341                     dep._ip = socket.gethostbyname(dep.node.hostname)
342                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
343                 
344                 # Compute plan
345                 # NOTE: the plan is an iterator
346                 plan = mst.mst(
347                     group,
348                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
349                     root = root,
350                     maxbranching = 2)
351                 
352                 # Re-sign private key
353                 try:
354                     tempprk, temppuk, tmppass = self._make_temp_private_key()
355                 except TempKeyError:
356                     continue
357                 
358                 # Set up slaves
359                 plan = list(plan)
360                 for slave, master in plan:
361                     slave.set_master(master)
362                     slave.install_keys(tempprk, temppuk, tmppass)
363                     
364         # We don't need the user's passphrase anymore
365         self.sliceSSHKeyPass = None
366     
367     def _make_temp_private_key(self):
368         # Get the user's key's passphrase
369         if not self.sliceSSHKeyPass:
370             if 'SSH_ASKPASS' in os.environ:
371                 proc = subprocess.Popen(
372                     [ os.environ['SSH_ASKPASS'],
373                       "Please type the passphrase for the %s SSH identity file. "
374                       "The passphrase will be used to re-cipher the identity file with "
375                       "a random 256-bit key for automated chain deployment on the "
376                       "%s PlanetLab slice" % ( 
377                         os.path.basename(self.sliceSSHKey), 
378                         self.slicename
379                     ) ],
380                     stdin = open("/dev/null"),
381                     stdout = subprocess.PIPE,
382                     stderr = subprocess.PIPE)
383                 out,err = proc.communicate()
384                 self.sliceSSHKeyPass = out.strip()
385         
386         if not self.sliceSSHKeyPass:
387             raise TempKeyError
388         
389         # Create temporary key files
390         prk = tempfile.NamedTemporaryFile(
391             dir = self.root_directory,
392             prefix = "pl_deploy_tmpk_",
393             suffix = "")
394
395         puk = tempfile.NamedTemporaryFile(
396             dir = self.root_directory,
397             prefix = "pl_deploy_tmpk_",
398             suffix = ".pub")
399             
400         # Create secure 256-bits temporary passphrase
401         passphrase = ''.join(map(chr,[rng.randint(0,255) 
402                                       for rng in (random.SystemRandom(),)
403                                       for i in xrange(32)] )).encode("hex")
404                 
405         # Copy keys
406         oprk = open(self.sliceSSHKey, "rb")
407         opuk = open(self.sliceSSHKey+".pub", "rb")
408         shutil.copymode(oprk.name, prk.name)
409         shutil.copymode(opuk.name, puk.name)
410         shutil.copyfileobj(oprk, prk)
411         shutil.copyfileobj(opuk, puk)
412         prk.flush()
413         puk.flush()
414         oprk.close()
415         opuk.close()
416         
417         # A descriptive comment
418         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
419         
420         # Recipher keys
421         proc = subprocess.Popen(
422             ["ssh-keygen", "-p",
423              "-f", prk.name,
424              "-P", self.sliceSSHKeyPass,
425              "-N", passphrase,
426              "-C", comment ],
427             stdout = subprocess.PIPE,
428             stderr = subprocess.PIPE,
429             stdin = subprocess.PIPE
430         )
431         out, err = proc.communicate()
432         
433         if err:
434             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
435                 out, err)
436         
437         prk.seek(0)
438         puk.seek(0)
439         
440         # Change comment on public key
441         puklines = puk.readlines()
442         puklines[0] = puklines[0].split(' ')
443         puklines[0][-1] = comment+'\n'
444         puklines[0] = ' '.join(puklines[0])
445         puk.seek(0)
446         puk.truncate()
447         puk.writelines(puklines)
448         del puklines
449         puk.flush()
450         
451         return prk, puk, passphrase
452     
453     def set(self, guid, name, value, time = TIME_NOW):
454         super(TestbedController, self).set(guid, name, value, time)
455         # TODO: take on account schedule time for the task
456         element = self._elements[guid]
457         if element:
458             setattr(element, name, value)
459
460             if hasattr(element, 'refresh'):
461                 # invoke attribute refresh hook
462                 element.refresh()
463
464     def get(self, guid, name, time = TIME_NOW):
465         value = super(TestbedController, self).get(guid, name, time)
466         # TODO: take on account schedule time for the task
467         factory_id = self._create[guid]
468         factory = self._factories[factory_id]
469         element = self._elements.get(guid)
470         try:
471             return getattr(element, name)
472         except (KeyError, AttributeError):
473             return value
474
475     def get_address(self, guid, index, attribute='Address'):
476         index = int(index)
477
478         # try the real stuff
479         iface = self._elements.get(guid)
480         if iface and index == 0:
481             if attribute == 'Address':
482                 return iface.address
483             elif attribute == 'NetPrefix':
484                 return iface.netprefix
485             elif attribute == 'Broadcast':
486                 return iface.broadcast
487
488         # if all else fails, query box
489         return super(TestbedController, self).get_address(guid, index, attribute)
490
491     def action(self, time, guid, action):
492         raise NotImplementedError
493
494     def shutdown(self):
495         for trace in self._traces.itervalues():
496             trace.close()
497         
498         def invokeif(action, testbed, guid):
499             element = self._elements[guid]
500             if hasattr(element, action):
501                 getattr(element, action)()
502         
503         self._do_in_factory_order(
504             functools.partial(invokeif, 'cleanup'),
505             metadata.shutdown_order)
506
507         self._do_in_factory_order(
508             functools.partial(invokeif, 'destroy'),
509             metadata.shutdown_order)
510             
511         self._elements.clear()
512         self._traces.clear()
513
514     def trace(self, guid, trace_id, attribute='value'):
515         app = self._elements[guid]
516
517         if attribute == 'value':
518             path = app.sync_trace(self.home_directory, trace_id)
519             if path:
520                 fd = open(path, "r")
521                 content = fd.read()
522                 fd.close()
523             else:
524                 content = None
525         elif attribute == 'path':
526             content = app.remote_trace_path(trace_id)
527         else:
528             content = None
529         return content
530
531     def follow_trace(self, trace_id, trace):
532         self._traces[trace_id] = trace
533
534     def recover(self):
535         # Create and connect do not perform any real tasks against
536         # the nodes, it only sets up the object hierarchy,
537         # so we can run them normally
538         self.do_create()
539         self.do_connect_init()
540         self.do_connect_compl()
541         
542         # Manually recover nodes, to mark dependencies installed
543         # and clean up mutable attributes
544         self._do_in_factory_order(
545             lambda self, guid : self._elements[guid].recover(), 
546             [
547                 metadata.NODE,
548             ])
549         
550         # Assign nodes - since we're working off exeucte XML, nodes
551         # have specific hostnames assigned and we don't need to do
552         # real assignment, only find out node ids and check liveliness
553         self.do_resource_discovery(recover = True)
554         self.do_wait_nodes()
555         
556         # Pre/post configure, however, tends to set up tunnels
557         # Execute configuration steps only for those object
558         # kinds that do not have side effects
559         
560         # Do the ones without side effects,
561         # including nodes that need to set up home 
562         # folders and all that
563         self._do_in_factory_order(
564             "preconfigure_function", 
565             [
566                 metadata.INTERNET,
567                 Parallel(metadata.NODE),
568                 metadata.NODEIFACE,
569             ])
570         
571         # Tunnels require a home path that is configured
572         # at this step. Since we cannot run the step itself,
573         # we need to inject this homepath ourselves
574         for guid, element in self._elements.iteritems():
575             if isinstance(element, self._interfaces.TunIface):
576                 element._home_path = "tun-%s" % (guid,)
577         
578         # Manually recover tunnels, applications and
579         # netpipes, negating the side effects
580         self._do_in_factory_order(
581             lambda self, guid : self._elements[guid].recover(), 
582             [
583                 Parallel(metadata.TAPIFACE),
584                 Parallel(metadata.TUNIFACE),
585                 metadata.NETPIPE,
586                 Parallel(metadata.NEPIDEPENDENCY),
587                 Parallel(metadata.NS3DEPENDENCY),
588                 Parallel(metadata.DEPENDENCY),
589                 Parallel(metadata.APPLICATION),
590             ])
591
592         # Tunnels are not harmed by configuration after
593         # recovery, and some attributes get set this way
594         # like external_iface
595         self._do_in_factory_order(
596             "preconfigure_function", 
597             [
598                 Parallel(metadata.TAPIFACE),
599                 Parallel(metadata.TUNIFACE),
600             ])
601
602         # Post-do the ones without side effects
603         self._do_in_factory_order(
604             "configure_function", 
605             [
606                 metadata.INTERNET,
607                 Parallel(metadata.NODE),
608                 metadata.NODEIFACE,
609                 Parallel(metadata.TAPIFACE),
610                 Parallel(metadata.TUNIFACE),
611             ])
612         
613         # There are no required prestart steps
614         # to call upon recovery, so we're done
615         
616     
617     def _make_generic(self, parameters, kind):
618         app = kind(self.plapi)
619
620         # Note: there is 1-to-1 correspondence between attribute names
621         #   If that changes, this has to change as well
622         for attr,val in parameters.iteritems():
623             setattr(app, attr, val)
624
625         return app
626
627     def _make_node(self, parameters):
628         node = self._make_generic(parameters, self._node.Node)
629         node.enable_cleanup = self.dedicatedSlice
630         return node
631
632     def _make_node_iface(self, parameters):
633         return self._make_generic(parameters, self._interfaces.NodeIface)
634
635     def _make_tun_iface(self, parameters):
636         return self._make_generic(parameters, self._interfaces.TunIface)
637
638     def _make_tap_iface(self, parameters):
639         return self._make_generic(parameters, self._interfaces.TapIface)
640
641     def _make_netpipe(self, parameters):
642         return self._make_generic(parameters, self._interfaces.NetPipe)
643
644     def _make_internet(self, parameters):
645         return self._make_generic(parameters, self._interfaces.Internet)
646
647     def _make_application(self, parameters):
648         return self._make_generic(parameters, self._app.Application)
649
650     def _make_dependency(self, parameters):
651         return self._make_generic(parameters, self._app.Dependency)
652
653     def _make_nepi_dependency(self, parameters):
654         return self._make_generic(parameters, self._app.NepiDependency)
655
656     def _make_ns3_dependency(self, parameters):
657         return self._make_generic(parameters, self._app.NS3Dependency)
658