5db391e0b2ad40bc6e6c61262c0f3979c9bc9022
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28 import weakref
29
30 class TempKeyError(Exception):
31     pass
32
33 class TestbedController(testbed_impl.TestbedController):
34     def __init__(self):
35         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
36         self._home_directory = None
37         self.slicename = None
38         self._traces = dict()
39
40         import node, interfaces, application
41         self._node = node
42         self._interfaces = interfaces
43         self._app = application
44         
45         self._blacklist = set()
46         self._just_provisioned = set()
47         
48         self._load_blacklist()
49         
50         self._logger = logging.getLogger('nepi.testbeds.planetlab')
51         
52         self.recovering = False
53
54     @property
55     def home_directory(self):
56         return self._home_directory
57
58     @property
59     def plapi(self):
60         if not hasattr(self, '_plapi'):
61             import plcapi
62
63             if self.authUser:
64                 self._plapi = plcapi.PLCAPI(
65                     username = self.authUser,
66                     password = self.authString,
67                     hostname = self.plcHost,
68                     urlpattern = self.plcUrl
69                     )
70             else:
71                 # anonymous access - may not be enough for much
72                 self._plapi = plcapi.PLCAPI()
73         return self._plapi
74
75     @property
76     def slice_id(self):
77         if not hasattr(self, '_slice_id'):
78             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
79             if slices:
80                 self._slice_id = slices[0]['slice_id']
81             else:
82                 # If it wasn't found, don't remember this failure, keep trying
83                 return None
84         return self._slice_id
85     
86     @property
87     def vsys_vnet(self):
88         if not hasattr(self, '_vsys_vnet'):
89             slicetags = self.plapi.GetSliceTags(
90                 name = self.slicename,
91                 tagname = 'vsys_vnet',
92                 fields=('value',))
93             if slicetags:
94                 self._vsys_vnet = slicetags[0]['value']
95             else:
96                 # If it wasn't found, don't remember this failure, keep trying
97                 return None
98         return self._vsys_vnet
99     
100     def _load_blacklist(self):
101         blpath = environ.homepath('plblacklist')
102         
103         try:
104             bl = open(blpath, "r")
105         except:
106             self._blacklist = set()
107             return
108             
109         try:
110             self._blacklist = set(
111                 map(int,
112                     map(str.strip, bl.readlines())
113                 )
114             )
115         finally:
116             bl.close()
117     
118     def _save_blacklist(self):
119         blpath = environ.homepath('plblacklist')
120         bl = open(blpath, "w")
121         try:
122             bl.writelines(
123                 map('%s\n'.__mod__, self._blacklist))
124         finally:
125             bl.close()
126     
127     def do_setup(self):
128         self._home_directory = self._attributes.\
129             get_attribute_value("homeDirectory")
130         self.slicename = self._attributes.\
131             get_attribute_value("slice")
132         self.authUser = self._attributes.\
133             get_attribute_value("authUser")
134         self.authString = self._attributes.\
135             get_attribute_value("authPass")
136         self.sliceSSHKey = self._attributes.\
137             get_attribute_value("sliceSSHKey")
138         self.sliceSSHKeyPass = None
139         self.plcHost = self._attributes.\
140             get_attribute_value("plcHost")
141         self.plcUrl = self._attributes.\
142             get_attribute_value("plcUrl")
143         self.logLevel = self._attributes.\
144             get_attribute_value("plLogLevel")
145         self.tapPortBase = self._attributes.\
146             get_attribute_value("tapPortBase")
147         self.p2pDeployment = self._attributes.\
148             get_attribute_value("p2pDeployment")
149         self.dedicatedSlice = self._attributes.\
150             get_attribute_value("dedicatedSlice")
151         
152         self._logger.setLevel(getattr(logging,self.logLevel))
153         
154         super(TestbedController, self).do_setup()
155
156     def do_post_asynclaunch(self, guid):
157         # Dependencies were launched asynchronously,
158         # so wait for them
159         dep = self._elements[guid]
160         if isinstance(dep, self._app.Dependency):
161             dep.async_setup_wait()
162     
163     # Two-phase configuration for asynchronous launch
164     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
165     do_poststep_configure = staticmethod(do_post_asynclaunch)
166
167     def do_preconfigure(self):
168         while True:
169             # Perform resource discovery if we don't have
170             # specific resources assigned yet
171             self.do_resource_discovery()
172
173             # Create PlanetLab slivers
174             self.do_provisioning()
175             
176             try:
177                 # Wait for provisioning
178                 self.do_wait_nodes()
179                 
180                 # Okkey...
181                 break
182             except self._node.UnresponsiveNodeError:
183                 # Oh... retry...
184                 pass
185         
186         if self.p2pDeployment:
187             # Plan application deployment
188             self.do_spanning_deployment_plan()
189
190         # Configure elements per XML data
191         super(TestbedController, self).do_preconfigure()
192
193     def do_resource_discovery(self, recover = False):
194         to_provision = self._to_provision = set()
195         
196         reserved = set(self._blacklist)
197         for guid, node in self._elements.iteritems():
198             if isinstance(node, self._node.Node) and node._node_id is not None:
199                 reserved.add(node._node_id)
200         
201         # Initial algo:
202         #   look for perfectly defined nodes
203         #   (ie: those with only one candidate)
204         for guid, node in self._elements.iteritems():
205             if isinstance(node, self._node.Node) and node._node_id is None:
206                 # Try existing nodes first
207                 # If we have only one candidate, simply use it
208                 candidates = node.find_candidates(
209                     filter_slice_id = self.slice_id)
210                 candidates -= reserved
211                 if len(candidates) == 1:
212                     node_id = iter(candidates).next()
213                     node.assign_node_id(node_id)
214                     reserved.add(node_id)
215                 elif not candidates:
216                     # Try again including unassigned nodes
217                     candidates = node.find_candidates()
218                     candidates -= reserved
219                     if len(candidates) > 1:
220                         continue
221                     if len(candidates) == 1:
222                         node_id = iter(candidates).next()
223                         node.assign_node_id(node_id)
224                         to_provision.add(node_id)
225                         reserved.add(node_id)
226                     elif not candidates:
227                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
228                             node.make_filter_description())
229         
230         # Now do the backtracking search for a suitable solution
231         # First with existing slice nodes
232         reqs = []
233         nodes = []
234         for guid, node in self._elements.iteritems():
235             if isinstance(node, self._node.Node) and node._node_id is None:
236                 # Try existing nodes first
237                 # If we have only one candidate, simply use it
238                 candidates = node.find_candidates(
239                     filter_slice_id = self.slice_id)
240                 candidates -= reserved
241                 reqs.append(candidates)
242                 nodes.append(node)
243         
244         if nodes and reqs:
245             if recover:
246                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
247             
248             try:
249                 solution = resourcealloc.alloc(reqs)
250             except resourcealloc.ResourceAllocationError:
251                 # Failed, try again with all nodes
252                 reqs = []
253                 for node in nodes:
254                     candidates = node.find_candidates()
255                     candidates -= reserved
256                     reqs.append(candidates)
257                 
258                 solution = resourcealloc.alloc(reqs)
259                 to_provision.update(solution)
260             
261             # Do assign nodes
262             for node, node_id in zip(nodes, solution):
263                 node.assign_node_id(node_id)
264
265     def do_provisioning(self):
266         if self._to_provision:
267             # Add new nodes to the slice
268             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
269             new_nodes = list(set(cur_nodes) | self._to_provision)
270             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
271
272         # cleanup
273         self._just_provisioned = self._to_provision
274         del self._to_provision
275     
276     def do_wait_nodes(self):
277         for guid, node in self._elements.iteritems():
278             if isinstance(node, self._node.Node):
279                 # Just inject configuration stuff
280                 node.home_path = "nepi-node-%s" % (guid,)
281                 node.ident_path = self.sliceSSHKey
282                 node.slicename = self.slicename
283             
284                 # Show the magic
285                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
286             
287         try:
288             for guid, node in self._elements.iteritems():
289                 if isinstance(node, self._node.Node):
290                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
291                     
292                     node.wait_provisioning(
293                         (20*60 if node._node_id in self._just_provisioned else 60)
294                     )
295                     
296                     self._logger.info("READY Node %s at %s", guid, node.hostname)
297                     
298                     # Prepare dependency installer now
299                     node.prepare_dependencies()
300         except self._node.UnresponsiveNodeError:
301             # Uh... 
302             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
303             
304             # Mark all dead nodes (which are unresponsive) on the blacklist
305             # and re-raise
306             for guid, node in self._elements.iteritems():
307                 if isinstance(node, self._node.Node):
308                     if not node.is_alive():
309                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
310                         self._blacklist.add(node._node_id)
311                         node.unassign_node()
312             
313             try:
314                 self._save_blacklist()
315             except:
316                 # not important...
317                 import traceback
318                 traceback.print_exc()
319             
320             raise
321     
322     def do_spanning_deployment_plan(self):
323         # Create application groups by collecting all applications
324         # based on their hash - the hash should contain everything that
325         # defines them and the platform they're built
326         
327         def dephash(app):
328             return (
329                 frozenset((app.depends or "").split(' ')),
330                 frozenset((app.sources or "").split(' ')),
331                 app.build,
332                 app.install,
333                 app.node.architecture,
334                 app.node.operatingSystem,
335                 app.node.pl_distro,
336             )
337         
338         depgroups = collections.defaultdict(list)
339         
340         for element in self._elements.itervalues():
341             if isinstance(element, self._app.Dependency):
342                 depgroups[dephash(element)].append(element)
343             elif isinstance(element, self._node.Node):
344                 deps = element._yum_dependencies
345                 if deps:
346                     depgroups[dephash(deps)].append(deps)
347         
348         # Set up spanning deployment for those applications that
349         # have been deployed in several nodes.
350         for dh, group in depgroups.iteritems():
351             if len(group) > 1:
352                 # Pick root (deterministically)
353                 root = min(group, key=lambda app:app.node.hostname)
354                 
355                 # Obtain all IPs in numeric format
356                 # (which means faster distance computations)
357                 for dep in group:
358                     dep._ip = socket.gethostbyname(dep.node.hostname)
359                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
360                 
361                 # Compute plan
362                 # NOTE: the plan is an iterator
363                 plan = mst.mst(
364                     group,
365                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
366                     root = root,
367                     maxbranching = 2)
368                 
369                 # Re-sign private key
370                 try:
371                     tempprk, temppuk, tmppass = self._make_temp_private_key()
372                 except TempKeyError:
373                     continue
374                 
375                 # Set up slaves
376                 plan = list(plan)
377                 for slave, master in plan:
378                     slave.set_master(master)
379                     slave.install_keys(tempprk, temppuk, tmppass)
380                     
381         # We don't need the user's passphrase anymore
382         self.sliceSSHKeyPass = None
383     
384     def _make_temp_private_key(self):
385         # Get the user's key's passphrase
386         if not self.sliceSSHKeyPass:
387             if 'SSH_ASKPASS' in os.environ:
388                 proc = subprocess.Popen(
389                     [ os.environ['SSH_ASKPASS'],
390                       "Please type the passphrase for the %s SSH identity file. "
391                       "The passphrase will be used to re-cipher the identity file with "
392                       "a random 256-bit key for automated chain deployment on the "
393                       "%s PlanetLab slice" % ( 
394                         os.path.basename(self.sliceSSHKey), 
395                         self.slicename
396                     ) ],
397                     stdin = open("/dev/null"),
398                     stdout = subprocess.PIPE,
399                     stderr = subprocess.PIPE)
400                 out,err = proc.communicate()
401                 self.sliceSSHKeyPass = out.strip()
402         
403         if not self.sliceSSHKeyPass:
404             raise TempKeyError
405         
406         # Create temporary key files
407         prk = tempfile.NamedTemporaryFile(
408             dir = self.root_directory,
409             prefix = "pl_deploy_tmpk_",
410             suffix = "")
411
412         puk = tempfile.NamedTemporaryFile(
413             dir = self.root_directory,
414             prefix = "pl_deploy_tmpk_",
415             suffix = ".pub")
416             
417         # Create secure 256-bits temporary passphrase
418         passphrase = ''.join(map(chr,[rng.randint(0,255) 
419                                       for rng in (random.SystemRandom(),)
420                                       for i in xrange(32)] )).encode("hex")
421                 
422         # Copy keys
423         oprk = open(self.sliceSSHKey, "rb")
424         opuk = open(self.sliceSSHKey+".pub", "rb")
425         shutil.copymode(oprk.name, prk.name)
426         shutil.copymode(opuk.name, puk.name)
427         shutil.copyfileobj(oprk, prk)
428         shutil.copyfileobj(opuk, puk)
429         prk.flush()
430         puk.flush()
431         oprk.close()
432         opuk.close()
433         
434         # A descriptive comment
435         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
436         
437         # Recipher keys
438         proc = subprocess.Popen(
439             ["ssh-keygen", "-p",
440              "-f", prk.name,
441              "-P", self.sliceSSHKeyPass,
442              "-N", passphrase,
443              "-C", comment ],
444             stdout = subprocess.PIPE,
445             stderr = subprocess.PIPE,
446             stdin = subprocess.PIPE
447         )
448         out, err = proc.communicate()
449         
450         if err:
451             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
452                 out, err)
453         
454         prk.seek(0)
455         puk.seek(0)
456         
457         # Change comment on public key
458         puklines = puk.readlines()
459         puklines[0] = puklines[0].split(' ')
460         puklines[0][-1] = comment+'\n'
461         puklines[0] = ' '.join(puklines[0])
462         puk.seek(0)
463         puk.truncate()
464         puk.writelines(puklines)
465         del puklines
466         puk.flush()
467         
468         return prk, puk, passphrase
469     
470     def set(self, guid, name, value, time = TIME_NOW):
471         super(TestbedController, self).set(guid, name, value, time)
472         # TODO: take on account schedule time for the task
473         element = self._elements[guid]
474         if element:
475             try:
476                 setattr(element, name, value)
477             except:
478                 # We ignore these errors while recovering.
479                 # Some attributes are immutable, and setting
480                 # them is necessary (to recover the state), but
481                 # some are not (they throw an exception).
482                 if not self.recovering:
483                     raise
484
485             if hasattr(element, 'refresh'):
486                 # invoke attribute refresh hook
487                 element.refresh()
488
489     def get(self, guid, name, time = TIME_NOW):
490         value = super(TestbedController, self).get(guid, name, time)
491         # TODO: take on account schedule time for the task
492         factory_id = self._create[guid]
493         factory = self._factories[factory_id]
494         element = self._elements.get(guid)
495         try:
496             return getattr(element, name)
497         except (KeyError, AttributeError):
498             return value
499
500     def get_address(self, guid, index, attribute='Address'):
501         index = int(index)
502
503         # try the real stuff
504         iface = self._elements.get(guid)
505         if iface and index == 0:
506             if attribute == 'Address':
507                 return iface.address
508             elif attribute == 'NetPrefix':
509                 return iface.netprefix
510             elif attribute == 'Broadcast':
511                 return iface.broadcast
512
513         # if all else fails, query box
514         return super(TestbedController, self).get_address(guid, index, attribute)
515
516     def action(self, time, guid, action):
517         raise NotImplementedError
518
519     def shutdown(self):
520         for trace in self._traces.itervalues():
521             trace.close()
522         
523         def invokeif(action, testbed, guid):
524             element = self._elements[guid]
525             if hasattr(element, action):
526                 getattr(element, action)()
527         
528         self._do_in_factory_order(
529             functools.partial(invokeif, 'cleanup'),
530             metadata.shutdown_order)
531
532         self._do_in_factory_order(
533             functools.partial(invokeif, 'destroy'),
534             metadata.shutdown_order)
535             
536         self._elements.clear()
537         self._traces.clear()
538
539     def trace(self, guid, trace_id, attribute='value'):
540         app = self._elements[guid]
541
542         if attribute == 'value':
543             path = app.sync_trace(self.home_directory, trace_id)
544             if path:
545                 fd = open(path, "r")
546                 content = fd.read()
547                 fd.close()
548             else:
549                 content = None
550         elif attribute == 'path':
551             content = app.remote_trace_path(trace_id)
552         else:
553             content = None
554         return content
555
556     def follow_trace(self, trace_id, trace):
557         self._traces[trace_id] = trace
558
559     def recover(self):
560         try:
561             # An internal flag, so we know to behave differently in
562             # a few corner cases.
563             self.recovering = True
564             
565             # Create and connect do not perform any real tasks against
566             # the nodes, it only sets up the object hierarchy,
567             # so we can run them normally
568             self.do_create()
569             self.do_connect_init()
570             self.do_connect_compl()
571             
572             # Manually recover nodes, to mark dependencies installed
573             # and clean up mutable attributes
574             self._do_in_factory_order(
575                 lambda self, guid : self._elements[guid].recover(), 
576                 [
577                     metadata.NODE,
578                 ])
579             
580             # Assign nodes - since we're working off exeucte XML, nodes
581             # have specific hostnames assigned and we don't need to do
582             # real assignment, only find out node ids and check liveliness
583             self.do_resource_discovery(recover = True)
584             self.do_wait_nodes()
585             
586             # Pre/post configure, however, tends to set up tunnels
587             # Execute configuration steps only for those object
588             # kinds that do not have side effects
589             
590             # Do the ones without side effects,
591             # including nodes that need to set up home 
592             # folders and all that
593             self._do_in_factory_order(
594                 "preconfigure_function", 
595                 [
596                     metadata.INTERNET,
597                     Parallel(metadata.NODE),
598                     metadata.NODEIFACE,
599                 ])
600             
601             # Tunnels require a home path that is configured
602             # at this step. Since we cannot run the step itself,
603             # we need to inject this homepath ourselves
604             for guid, element in self._elements.iteritems():
605                 if isinstance(element, self._interfaces.TunIface):
606                     element._home_path = "tun-%s" % (guid,)
607             
608             # Manually recover tunnels, applications and
609             # netpipes, negating the side effects
610             self._do_in_factory_order(
611                 lambda self, guid : self._elements[guid].recover(), 
612                 [
613                     Parallel(metadata.TAPIFACE),
614                     Parallel(metadata.TUNIFACE),
615                     metadata.NETPIPE,
616                     Parallel(metadata.NEPIDEPENDENCY),
617                     Parallel(metadata.NS3DEPENDENCY),
618                     Parallel(metadata.DEPENDENCY),
619                     Parallel(metadata.APPLICATION),
620                 ])
621
622             # Tunnels are not harmed by configuration after
623             # recovery, and some attributes get set this way
624             # like external_iface
625             self._do_in_factory_order(
626                 "preconfigure_function", 
627                 [
628                     Parallel(metadata.TAPIFACE),
629                     Parallel(metadata.TUNIFACE),
630                 ])
631
632             # Post-do the ones without side effects
633             self._do_in_factory_order(
634                 "configure_function", 
635                 [
636                     metadata.INTERNET,
637                     Parallel(metadata.NODE),
638                     metadata.NODEIFACE,
639                     Parallel(metadata.TAPIFACE),
640                     Parallel(metadata.TUNIFACE),
641                 ])
642             
643             # There are no required prestart steps
644             # to call upon recovery, so we're done
645         finally:
646             self.recovering = True
647     
648     def _make_generic(self, parameters, kind):
649         app = kind(self.plapi)
650         app.testbed = weakref.ref(self)
651
652         # Note: there is 1-to-1 correspondence between attribute names
653         #   If that changes, this has to change as well
654         for attr,val in parameters.iteritems():
655             try:
656                 setattr(app, attr, val)
657             except:
658                 # We ignore these errors while recovering.
659                 # Some attributes are immutable, and setting
660                 # them is necessary (to recover the state), but
661                 # some are not (they throw an exception).
662                 if not self.recovering:
663                     raise
664
665         return app
666
667     def _make_node(self, parameters):
668         node = self._make_generic(parameters, self._node.Node)
669         node.enable_cleanup = self.dedicatedSlice
670         return node
671
672     def _make_node_iface(self, parameters):
673         return self._make_generic(parameters, self._interfaces.NodeIface)
674
675     def _make_tun_iface(self, parameters):
676         return self._make_generic(parameters, self._interfaces.TunIface)
677
678     def _make_tap_iface(self, parameters):
679         return self._make_generic(parameters, self._interfaces.TapIface)
680
681     def _make_netpipe(self, parameters):
682         return self._make_generic(parameters, self._interfaces.NetPipe)
683
684     def _make_internet(self, parameters):
685         return self._make_generic(parameters, self._interfaces.Internet)
686
687     def _make_application(self, parameters):
688         return self._make_generic(parameters, self._app.Application)
689
690     def _make_dependency(self, parameters):
691         return self._make_generic(parameters, self._app.Dependency)
692
693     def _make_nepi_dependency(self, parameters):
694         return self._make_generic(parameters, self._app.NepiDependency)
695
696     def _make_ns3_dependency(self, parameters):
697         return self._make_generic(parameters, self._app.NS3Dependency)
698
699     def _make_tun_filter(self, parameters):
700         return self._make_generic(parameters, self._interfaces.TunFilter)
701