log "Connected" after succefull handshake in tunchannel_impl.py
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28 import weakref
29
30 class TempKeyError(Exception):
31     pass
32
33 class TestbedController(testbed_impl.TestbedController):
34     def __init__(self):
35         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
36         self._home_directory = None
37         self.slicename = None
38         self._traces = dict()
39
40         import node, interfaces, application
41         self._node = node
42         self._interfaces = interfaces
43         self._app = application
44         
45         self._blacklist = set()
46         self._just_provisioned = set()
47         
48         self._load_blacklist()
49         
50         self._logger = logging.getLogger('nepi.testbeds.planetlab')
51         
52         self.recovering = False
53
54     @property
55     def home_directory(self):
56         return self._home_directory
57
58     @property
59     def plapi(self):
60         if not hasattr(self, '_plapi'):
61             import plcapi
62
63             if self.authUser:
64                 self._plapi = plcapi.PLCAPI(
65                     username = self.authUser,
66                     password = self.authString,
67                     hostname = self.plcHost,
68                     urlpattern = self.plcUrl
69                     )
70             else:
71                 # anonymous access - may not be enough for much
72                 self._plapi = plcapi.PLCAPI()
73         return self._plapi
74
75     @property
76     def slice_id(self):
77         if not hasattr(self, '_slice_id'):
78             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
79             if slices:
80                 self._slice_id = slices[0]['slice_id']
81             else:
82                 # If it wasn't found, don't remember this failure, keep trying
83                 return None
84         return self._slice_id
85     
86     @property
87     def vsys_vnet(self):
88         if not hasattr(self, '_vsys_vnet'):
89             slicetags = self.plapi.GetSliceTags(
90                 name = self.slicename,
91                 tagname = 'vsys_vnet',
92                 fields=('value',))
93             if slicetags:
94                 self._vsys_vnet = slicetags[0]['value']
95             else:
96                 # If it wasn't found, don't remember this failure, keep trying
97                 return None
98         return self._vsys_vnet
99     
100     def _load_blacklist(self):
101         blpath = environ.homepath('plblacklist')
102         
103         try:
104             bl = open(blpath, "r")
105         except:
106             self._blacklist = set()
107             return
108             
109         try:
110             self._blacklist = set(
111                 map(int,
112                     map(str.strip, bl.readlines())
113                 )
114             )
115         finally:
116             bl.close()
117     
118     def _save_blacklist(self):
119         blpath = environ.homepath('plblacklist')
120         bl = open(blpath, "w")
121         try:
122             bl.writelines(
123                 map('%s\n'.__mod__, self._blacklist))
124         finally:
125             bl.close()
126     
127     def do_setup(self):
128         self._home_directory = self._attributes.\
129             get_attribute_value("homeDirectory")
130         self.slicename = self._attributes.\
131             get_attribute_value("slice")
132         self.authUser = self._attributes.\
133             get_attribute_value("authUser")
134         self.authString = self._attributes.\
135             get_attribute_value("authPass")
136         self.sliceSSHKey = self._attributes.\
137             get_attribute_value("sliceSSHKey")
138         self.sliceSSHKeyPass = None
139         self.plcHost = self._attributes.\
140             get_attribute_value("plcHost")
141         self.plcUrl = self._attributes.\
142             get_attribute_value("plcUrl")
143         self.logLevel = self._attributes.\
144             get_attribute_value("plLogLevel")
145         self.tapPortBase = self._attributes.\
146             get_attribute_value("tapPortBase")
147         self.p2pDeployment = self._attributes.\
148             get_attribute_value("p2pDeployment")
149         self.dedicatedSlice = self._attributes.\
150             get_attribute_value("dedicatedSlice")
151         
152         if not self.slicename:
153             raise RuntimeError, "Slice not set"
154         if not self.authUser:
155             raise RuntimeError, "PlanetLab account username not set"
156         if not self.authString:
157             raise RuntimeError, "PlanetLab account passphrase not set"
158         if not self.sliceSSHKey:
159             raise RuntimeError, "PlanetLab account key not specified"
160         if not os.path.exists(self.sliceSSHKey):
161             raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
162         
163         self._logger.setLevel(getattr(logging,self.logLevel))
164         
165         super(TestbedController, self).do_setup()
166
167     def do_post_asynclaunch(self, guid):
168         # Dependencies were launched asynchronously,
169         # so wait for them
170         dep = self._elements[guid]
171         if isinstance(dep, self._app.Dependency):
172             dep.async_setup_wait()
173     
174     # Two-phase configuration for asynchronous launch
175     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
176     do_poststep_configure = staticmethod(do_post_asynclaunch)
177
178     def do_preconfigure(self):
179         while True:
180             # Perform resource discovery if we don't have
181             # specific resources assigned yet
182             self.do_resource_discovery()
183
184             # Create PlanetLab slivers
185             self.do_provisioning()
186             
187             try:
188                 # Wait for provisioning
189                 self.do_wait_nodes()
190                 
191                 # Okkey...
192                 break
193             except self._node.UnresponsiveNodeError:
194                 # Oh... retry...
195                 pass
196         
197         if self.p2pDeployment:
198             # Plan application deployment
199             self.do_spanning_deployment_plan()
200
201         # Configure elements per XML data
202         super(TestbedController, self).do_preconfigure()
203
204     def do_resource_discovery(self, recover = False):
205         to_provision = self._to_provision = set()
206         
207         reserved = set(self._blacklist)
208         for guid, node in self._elements.iteritems():
209             if isinstance(node, self._node.Node) and node._node_id is not None:
210                 reserved.add(node._node_id)
211         
212         # Initial algo:
213         #   look for perfectly defined nodes
214         #   (ie: those with only one candidate)
215         for guid, node in self._elements.iteritems():
216             if isinstance(node, self._node.Node) and node._node_id is None:
217                 # Try existing nodes first
218                 # If we have only one candidate, simply use it
219                 candidates = node.find_candidates(
220                     filter_slice_id = self.slice_id)
221                 candidates -= reserved
222                 if len(candidates) == 1:
223                     node_id = iter(candidates).next()
224                     node.assign_node_id(node_id)
225                     reserved.add(node_id)
226                 elif not candidates:
227                     # Try again including unassigned nodes
228                     candidates = node.find_candidates()
229                     candidates -= reserved
230                     if len(candidates) > 1:
231                         continue
232                     if len(candidates) == 1:
233                         node_id = iter(candidates).next()
234                         node.assign_node_id(node_id)
235                         to_provision.add(node_id)
236                         reserved.add(node_id)
237                     elif not candidates:
238                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
239                             node.make_filter_description())
240         
241         # Now do the backtracking search for a suitable solution
242         # First with existing slice nodes
243         reqs = []
244         nodes = []
245         for guid, node in self._elements.iteritems():
246             if isinstance(node, self._node.Node) and node._node_id is None:
247                 # Try existing nodes first
248                 # If we have only one candidate, simply use it
249                 candidates = node.find_candidates(
250                     filter_slice_id = self.slice_id)
251                 candidates -= reserved
252                 reqs.append(candidates)
253                 nodes.append(node)
254         
255         if nodes and reqs:
256             if recover:
257                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
258             
259             try:
260                 solution = resourcealloc.alloc(reqs)
261             except resourcealloc.ResourceAllocationError:
262                 # Failed, try again with all nodes
263                 reqs = []
264                 for node in nodes:
265                     candidates = node.find_candidates()
266                     candidates -= reserved
267                     reqs.append(candidates)
268                 
269                 solution = resourcealloc.alloc(reqs)
270                 to_provision.update(solution)
271             
272             # Do assign nodes
273             for node, node_id in zip(nodes, solution):
274                 node.assign_node_id(node_id)
275
276     def do_provisioning(self):
277         if self._to_provision:
278             # Add new nodes to the slice
279             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
280             new_nodes = list(set(cur_nodes) | self._to_provision)
281             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
282
283         # cleanup
284         self._just_provisioned = self._to_provision
285         del self._to_provision
286     
287     def do_wait_nodes(self):
288         for guid, node in self._elements.iteritems():
289             if isinstance(node, self._node.Node):
290                 # Just inject configuration stuff
291                 node.home_path = "nepi-node-%s" % (guid,)
292                 node.ident_path = self.sliceSSHKey
293                 node.slicename = self.slicename
294             
295                 # Show the magic
296                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
297             
298         try:
299             for guid, node in self._elements.iteritems():
300                 if isinstance(node, self._node.Node):
301                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
302                     
303                     node.wait_provisioning(
304                         (20*60 if node._node_id in self._just_provisioned else 60)
305                     )
306                     
307                     self._logger.info("READY Node %s at %s", guid, node.hostname)
308                     
309                     # Prepare dependency installer now
310                     node.prepare_dependencies()
311         except self._node.UnresponsiveNodeError:
312             # Uh... 
313             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
314             
315             # Mark all dead nodes (which are unresponsive) on the blacklist
316             # and re-raise
317             for guid, node in self._elements.iteritems():
318                 if isinstance(node, self._node.Node):
319                     if not node.is_alive():
320                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
321                         self._blacklist.add(node._node_id)
322                         node.unassign_node()
323             
324             try:
325                 self._save_blacklist()
326             except:
327                 # not important...
328                 import traceback
329                 traceback.print_exc()
330             
331             raise
332     
333     def do_spanning_deployment_plan(self):
334         # Create application groups by collecting all applications
335         # based on their hash - the hash should contain everything that
336         # defines them and the platform they're built
337         
338         def dephash(app):
339             return (
340                 frozenset((app.depends or "").split(' ')),
341                 frozenset((app.sources or "").split(' ')),
342                 app.build,
343                 app.install,
344                 app.node.architecture,
345                 app.node.operatingSystem,
346                 app.node.pl_distro,
347             )
348         
349         depgroups = collections.defaultdict(list)
350         
351         for element in self._elements.itervalues():
352             if isinstance(element, self._app.Dependency):
353                 depgroups[dephash(element)].append(element)
354             elif isinstance(element, self._node.Node):
355                 deps = element._yum_dependencies
356                 if deps:
357                     depgroups[dephash(deps)].append(deps)
358         
359         # Set up spanning deployment for those applications that
360         # have been deployed in several nodes.
361         for dh, group in depgroups.iteritems():
362             if len(group) > 1:
363                 # Pick root (deterministically)
364                 root = min(group, key=lambda app:app.node.hostname)
365                 
366                 # Obtain all IPs in numeric format
367                 # (which means faster distance computations)
368                 for dep in group:
369                     dep._ip = socket.gethostbyname(dep.node.hostname)
370                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
371                 
372                 # Compute plan
373                 # NOTE: the plan is an iterator
374                 plan = mst.mst(
375                     group,
376                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
377                     root = root,
378                     maxbranching = 2)
379                 
380                 # Re-sign private key
381                 try:
382                     tempprk, temppuk, tmppass = self._make_temp_private_key()
383                 except TempKeyError:
384                     continue
385                 
386                 # Set up slaves
387                 plan = list(plan)
388                 for slave, master in plan:
389                     slave.set_master(master)
390                     slave.install_keys(tempprk, temppuk, tmppass)
391                     
392         # We don't need the user's passphrase anymore
393         self.sliceSSHKeyPass = None
394     
395     def _make_temp_private_key(self):
396         # Get the user's key's passphrase
397         if not self.sliceSSHKeyPass:
398             if 'SSH_ASKPASS' in os.environ:
399                 proc = subprocess.Popen(
400                     [ os.environ['SSH_ASKPASS'],
401                       "Please type the passphrase for the %s SSH identity file. "
402                       "The passphrase will be used to re-cipher the identity file with "
403                       "a random 256-bit key for automated chain deployment on the "
404                       "%s PlanetLab slice" % ( 
405                         os.path.basename(self.sliceSSHKey), 
406                         self.slicename
407                     ) ],
408                     stdin = open("/dev/null"),
409                     stdout = subprocess.PIPE,
410                     stderr = subprocess.PIPE)
411                 out,err = proc.communicate()
412                 self.sliceSSHKeyPass = out.strip()
413         
414         if not self.sliceSSHKeyPass:
415             raise TempKeyError
416         
417         # Create temporary key files
418         prk = tempfile.NamedTemporaryFile(
419             dir = self.root_directory,
420             prefix = "pl_deploy_tmpk_",
421             suffix = "")
422
423         puk = tempfile.NamedTemporaryFile(
424             dir = self.root_directory,
425             prefix = "pl_deploy_tmpk_",
426             suffix = ".pub")
427             
428         # Create secure 256-bits temporary passphrase
429         passphrase = ''.join(map(chr,[rng.randint(0,255) 
430                                       for rng in (random.SystemRandom(),)
431                                       for i in xrange(32)] )).encode("hex")
432                 
433         # Copy keys
434         oprk = open(self.sliceSSHKey, "rb")
435         opuk = open(self.sliceSSHKey+".pub", "rb")
436         shutil.copymode(oprk.name, prk.name)
437         shutil.copymode(opuk.name, puk.name)
438         shutil.copyfileobj(oprk, prk)
439         shutil.copyfileobj(opuk, puk)
440         prk.flush()
441         puk.flush()
442         oprk.close()
443         opuk.close()
444         
445         # A descriptive comment
446         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
447         
448         # Recipher keys
449         proc = subprocess.Popen(
450             ["ssh-keygen", "-p",
451              "-f", prk.name,
452              "-P", self.sliceSSHKeyPass,
453              "-N", passphrase,
454              "-C", comment ],
455             stdout = subprocess.PIPE,
456             stderr = subprocess.PIPE,
457             stdin = subprocess.PIPE
458         )
459         out, err = proc.communicate()
460         
461         if err:
462             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
463                 out, err)
464         
465         prk.seek(0)
466         puk.seek(0)
467         
468         # Change comment on public key
469         puklines = puk.readlines()
470         puklines[0] = puklines[0].split(' ')
471         puklines[0][-1] = comment+'\n'
472         puklines[0] = ' '.join(puklines[0])
473         puk.seek(0)
474         puk.truncate()
475         puk.writelines(puklines)
476         del puklines
477         puk.flush()
478         
479         return prk, puk, passphrase
480     
481     def set(self, guid, name, value, time = TIME_NOW):
482         super(TestbedController, self).set(guid, name, value, time)
483         # TODO: take on account schedule time for the task
484         element = self._elements[guid]
485         if element:
486             try:
487                 setattr(element, name, value)
488             except:
489                 # We ignore these errors while recovering.
490                 # Some attributes are immutable, and setting
491                 # them is necessary (to recover the state), but
492                 # some are not (they throw an exception).
493                 if not self.recovering:
494                     raise
495
496             if hasattr(element, 'refresh'):
497                 # invoke attribute refresh hook
498                 element.refresh()
499
500     def get(self, guid, name, time = TIME_NOW):
501         value = super(TestbedController, self).get(guid, name, time)
502         # TODO: take on account schedule time for the task
503         factory_id = self._create[guid]
504         factory = self._factories[factory_id]
505         element = self._elements.get(guid)
506         try:
507             return getattr(element, name)
508         except (KeyError, AttributeError):
509             return value
510
511     def get_address(self, guid, index, attribute='Address'):
512         index = int(index)
513
514         # try the real stuff
515         iface = self._elements.get(guid)
516         if iface and index == 0:
517             if attribute == 'Address':
518                 return iface.address
519             elif attribute == 'NetPrefix':
520                 return iface.netprefix
521             elif attribute == 'Broadcast':
522                 return iface.broadcast
523
524         # if all else fails, query box
525         return super(TestbedController, self).get_address(guid, index, attribute)
526
527     def action(self, time, guid, action):
528         raise NotImplementedError
529
530     def shutdown(self):
531         for trace in self._traces.itervalues():
532             trace.close()
533         
534         def invokeif(action, testbed, guid):
535             element = self._elements[guid]
536             if hasattr(element, action):
537                 getattr(element, action)()
538         
539         self._do_in_factory_order(
540             functools.partial(invokeif, 'cleanup'),
541             metadata.shutdown_order)
542
543         self._do_in_factory_order(
544             functools.partial(invokeif, 'destroy'),
545             metadata.shutdown_order)
546             
547         self._elements.clear()
548         self._traces.clear()
549
550     def trace(self, guid, trace_id, attribute='value'):
551         elem = self._elements[guid]
552
553         if attribute == 'value':
554             path = elem.sync_trace(self.home_directory, trace_id)
555             if path:
556                 fd = open(path, "r")
557                 content = fd.read()
558                 fd.close()
559             else:
560                 content = None
561         elif attribute == 'path':
562             content = elem.remote_trace_path(trace_id)
563         elif attribute == 'name':
564             content = elem.remote_trace_name(trace_id)
565         else:
566             content = None
567         return content
568
569     def follow_trace(self, trace_id, trace):
570         self._traces[trace_id] = trace
571
572     def recover(self):
573         try:
574             # An internal flag, so we know to behave differently in
575             # a few corner cases.
576             self.recovering = True
577             
578             # Create and connect do not perform any real tasks against
579             # the nodes, it only sets up the object hierarchy,
580             # so we can run them normally
581             self.do_create()
582             self.do_connect_init()
583             self.do_connect_compl()
584             
585             # Manually recover nodes, to mark dependencies installed
586             # and clean up mutable attributes
587             self._do_in_factory_order(
588                 lambda self, guid : self._elements[guid].recover(), 
589                 [
590                     metadata.NODE,
591                 ])
592             
593             # Assign nodes - since we're working off exeucte XML, nodes
594             # have specific hostnames assigned and we don't need to do
595             # real assignment, only find out node ids and check liveliness
596             self.do_resource_discovery(recover = True)
597             self.do_wait_nodes()
598             
599             # Pre/post configure, however, tends to set up tunnels
600             # Execute configuration steps only for those object
601             # kinds that do not have side effects
602             
603             # Do the ones without side effects,
604             # including nodes that need to set up home 
605             # folders and all that
606             self._do_in_factory_order(
607                 "preconfigure_function", 
608                 [
609                     metadata.INTERNET,
610                     Parallel(metadata.NODE),
611                     metadata.NODEIFACE,
612                 ])
613             
614             # Tunnels require a home path that is configured
615             # at this step. Since we cannot run the step itself,
616             # we need to inject this homepath ourselves
617             for guid, element in self._elements.iteritems():
618                 if isinstance(element, self._interfaces.TunIface):
619                     element._home_path = "tun-%s" % (guid,)
620             
621             # Manually recover tunnels, applications and
622             # netpipes, negating the side effects
623             self._do_in_factory_order(
624                 lambda self, guid : self._elements[guid].recover(), 
625                 [
626                     Parallel(metadata.TAPIFACE),
627                     Parallel(metadata.TUNIFACE),
628                     metadata.NETPIPE,
629                     Parallel(metadata.NEPIDEPENDENCY),
630                     Parallel(metadata.NS3DEPENDENCY),
631                     Parallel(metadata.DEPENDENCY),
632                     Parallel(metadata.APPLICATION),
633                 ])
634
635             # Tunnels are not harmed by configuration after
636             # recovery, and some attributes get set this way
637             # like external_iface
638             self._do_in_factory_order(
639                 "preconfigure_function", 
640                 [
641                     Parallel(metadata.TAPIFACE),
642                     Parallel(metadata.TUNIFACE),
643                 ])
644
645             # Post-do the ones without side effects
646             self._do_in_factory_order(
647                 "configure_function", 
648                 [
649                     metadata.INTERNET,
650                     Parallel(metadata.NODE),
651                     metadata.NODEIFACE,
652                     Parallel(metadata.TAPIFACE),
653                     Parallel(metadata.TUNIFACE),
654                 ])
655             
656             # There are no required prestart steps
657             # to call upon recovery, so we're done
658         finally:
659             self.recovering = True
660     
661     def _make_generic(self, parameters, kind):
662         app = kind(self.plapi)
663         app.testbed = weakref.ref(self)
664
665         # Note: there is 1-to-1 correspondence between attribute names
666         #   If that changes, this has to change as well
667         for attr,val in parameters.iteritems():
668             try:
669                 setattr(app, attr, val)
670             except:
671                 # We ignore these errors while recovering.
672                 # Some attributes are immutable, and setting
673                 # them is necessary (to recover the state), but
674                 # some are not (they throw an exception).
675                 if not self.recovering:
676                     raise
677
678         return app
679
680     def _make_node(self, parameters):
681         node = self._make_generic(parameters, self._node.Node)
682         node.enable_cleanup = self.dedicatedSlice
683         return node
684
685     def _make_node_iface(self, parameters):
686         return self._make_generic(parameters, self._interfaces.NodeIface)
687
688     def _make_tun_iface(self, parameters):
689         return self._make_generic(parameters, self._interfaces.TunIface)
690
691     def _make_tap_iface(self, parameters):
692         return self._make_generic(parameters, self._interfaces.TapIface)
693
694     def _make_netpipe(self, parameters):
695         return self._make_generic(parameters, self._interfaces.NetPipe)
696
697     def _make_internet(self, parameters):
698         return self._make_generic(parameters, self._interfaces.Internet)
699
700     def _make_application(self, parameters):
701         return self._make_generic(parameters, self._app.Application)
702
703     def _make_dependency(self, parameters):
704         return self._make_generic(parameters, self._app.Dependency)
705
706     def _make_nepi_dependency(self, parameters):
707         return self._make_generic(parameters, self._app.NepiDependency)
708
709     def _make_ns3_dependency(self, parameters):
710         return self._make_generic(parameters, self._app.NS3Dependency)
711
712     def _make_tun_filter(self, parameters):
713         return self._make_generic(parameters, self._interfaces.TunFilter)
714
715     def _make_class_queue_filter(self, parameters):
716         return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
717
718     def _make_tos_queue_filter(self, parameters):
719         return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
720
721