TUN/TAP filters, initial version, with tests.
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.core.metadata import Parallel
7 from nepi.util.constants import TIME_NOW
8 from nepi.util.graphtools import mst
9 from nepi.util import ipaddr2
10 from nepi.util import environ
11 from nepi.util.parallel import ParallelRun
12 import sys
13 import os
14 import os.path
15 import time
16 import resourcealloc
17 import collections
18 import operator
19 import functools
20 import socket
21 import struct
22 import tempfile
23 import subprocess
24 import random
25 import shutil
26 import logging
27 import metadata
28
29 class TempKeyError(Exception):
30     pass
31
32 class TestbedController(testbed_impl.TestbedController):
33     def __init__(self):
34         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
35         self._home_directory = None
36         self.slicename = None
37         self._traces = dict()
38
39         import node, interfaces, application
40         self._node = node
41         self._interfaces = interfaces
42         self._app = application
43         
44         self._blacklist = set()
45         self._just_provisioned = set()
46         
47         self._load_blacklist()
48         
49         self._logger = logging.getLogger('nepi.testbeds.planetlab')
50
51     @property
52     def home_directory(self):
53         return self._home_directory
54
55     @property
56     def plapi(self):
57         if not hasattr(self, '_plapi'):
58             import plcapi
59
60             if self.authUser:
61                 self._plapi = plcapi.PLCAPI(
62                     username = self.authUser,
63                     password = self.authString,
64                     hostname = self.plcHost,
65                     urlpattern = self.plcUrl
66                     )
67             else:
68                 # anonymous access - may not be enough for much
69                 self._plapi = plcapi.PLCAPI()
70         return self._plapi
71
72     @property
73     def slice_id(self):
74         if not hasattr(self, '_slice_id'):
75             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
76             if slices:
77                 self._slice_id = slices[0]['slice_id']
78             else:
79                 # If it wasn't found, don't remember this failure, keep trying
80                 return None
81         return self._slice_id
82     
83     @property
84     def vsys_vnet(self):
85         if not hasattr(self, '_vsys_vnet'):
86             slicetags = self.plapi.GetSliceTags(
87                 name = self.slicename,
88                 tagname = 'vsys_vnet',
89                 fields=('value',))
90             if slicetags:
91                 self._vsys_vnet = slicetags[0]['value']
92             else:
93                 # If it wasn't found, don't remember this failure, keep trying
94                 return None
95         return self._vsys_vnet
96     
97     def _load_blacklist(self):
98         blpath = environ.homepath('plblacklist')
99         
100         try:
101             bl = open(blpath, "r")
102         except:
103             self._blacklist = set()
104             return
105             
106         try:
107             self._blacklist = set(
108                 map(int,
109                     map(str.strip, bl.readlines())
110                 )
111             )
112         finally:
113             bl.close()
114     
115     def _save_blacklist(self):
116         blpath = environ.homepath('plblacklist')
117         bl = open(blpath, "w")
118         try:
119             bl.writelines(
120                 map('%s\n'.__mod__, self._blacklist))
121         finally:
122             bl.close()
123     
124     def do_setup(self):
125         self._home_directory = self._attributes.\
126             get_attribute_value("homeDirectory")
127         self.slicename = self._attributes.\
128             get_attribute_value("slice")
129         self.authUser = self._attributes.\
130             get_attribute_value("authUser")
131         self.authString = self._attributes.\
132             get_attribute_value("authPass")
133         self.sliceSSHKey = self._attributes.\
134             get_attribute_value("sliceSSHKey")
135         self.sliceSSHKeyPass = None
136         self.plcHost = self._attributes.\
137             get_attribute_value("plcHost")
138         self.plcUrl = self._attributes.\
139             get_attribute_value("plcUrl")
140         self.logLevel = self._attributes.\
141             get_attribute_value("plLogLevel")
142         self.tapPortBase = self._attributes.\
143             get_attribute_value("tapPortBase")
144         self.p2pDeployment = self._attributes.\
145             get_attribute_value("p2pDeployment")
146         self.dedicatedSlice = self._attributes.\
147             get_attribute_value("dedicatedSlice")
148         
149         self._logger.setLevel(getattr(logging,self.logLevel))
150         
151         super(TestbedController, self).do_setup()
152
153     def do_post_asynclaunch(self, guid):
154         # Dependencies were launched asynchronously,
155         # so wait for them
156         dep = self._elements[guid]
157         if isinstance(dep, self._app.Dependency):
158             dep.async_setup_wait()
159     
160     # Two-phase configuration for asynchronous launch
161     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
162     do_poststep_configure = staticmethod(do_post_asynclaunch)
163
164     def do_preconfigure(self):
165         while True:
166             # Perform resource discovery if we don't have
167             # specific resources assigned yet
168             self.do_resource_discovery()
169
170             # Create PlanetLab slivers
171             self.do_provisioning()
172             
173             try:
174                 # Wait for provisioning
175                 self.do_wait_nodes()
176                 
177                 # Okkey...
178                 break
179             except self._node.UnresponsiveNodeError:
180                 # Oh... retry...
181                 pass
182         
183         if self.p2pDeployment:
184             # Plan application deployment
185             self.do_spanning_deployment_plan()
186
187         # Configure elements per XML data
188         super(TestbedController, self).do_preconfigure()
189
190     def do_resource_discovery(self, recover = False):
191         to_provision = self._to_provision = set()
192         
193         reserved = set(self._blacklist)
194         for guid, node in self._elements.iteritems():
195             if isinstance(node, self._node.Node) and node._node_id is not None:
196                 reserved.add(node._node_id)
197         
198         # Initial algo:
199         #   look for perfectly defined nodes
200         #   (ie: those with only one candidate)
201         for guid, node in self._elements.iteritems():
202             if isinstance(node, self._node.Node) and node._node_id is None:
203                 # Try existing nodes first
204                 # If we have only one candidate, simply use it
205                 candidates = node.find_candidates(
206                     filter_slice_id = self.slice_id)
207                 candidates -= reserved
208                 if len(candidates) == 1:
209                     node_id = iter(candidates).next()
210                     node.assign_node_id(node_id)
211                     reserved.add(node_id)
212                 elif not candidates:
213                     # Try again including unassigned nodes
214                     candidates = node.find_candidates()
215                     candidates -= reserved
216                     if len(candidates) > 1:
217                         continue
218                     if len(candidates) == 1:
219                         node_id = iter(candidates).next()
220                         node.assign_node_id(node_id)
221                         to_provision.add(node_id)
222                         reserved.add(node_id)
223                     elif not candidates:
224                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
225                             node.make_filter_description())
226         
227         # Now do the backtracking search for a suitable solution
228         # First with existing slice nodes
229         reqs = []
230         nodes = []
231         for guid, node in self._elements.iteritems():
232             if isinstance(node, self._node.Node) and node._node_id is None:
233                 # Try existing nodes first
234                 # If we have only one candidate, simply use it
235                 candidates = node.find_candidates(
236                     filter_slice_id = self.slice_id)
237                 candidates -= reserved
238                 reqs.append(candidates)
239                 nodes.append(node)
240         
241         if nodes and reqs:
242             if recover:
243                 raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
244             
245             try:
246                 solution = resourcealloc.alloc(reqs)
247             except resourcealloc.ResourceAllocationError:
248                 # Failed, try again with all nodes
249                 reqs = []
250                 for node in nodes:
251                     candidates = node.find_candidates()
252                     candidates -= reserved
253                     reqs.append(candidates)
254                 
255                 solution = resourcealloc.alloc(reqs)
256                 to_provision.update(solution)
257             
258             # Do assign nodes
259             for node, node_id in zip(nodes, solution):
260                 node.assign_node_id(node_id)
261
262     def do_provisioning(self):
263         if self._to_provision:
264             # Add new nodes to the slice
265             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
266             new_nodes = list(set(cur_nodes) | self._to_provision)
267             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
268
269         # cleanup
270         self._just_provisioned = self._to_provision
271         del self._to_provision
272     
273     def do_wait_nodes(self):
274         for guid, node in self._elements.iteritems():
275             if isinstance(node, self._node.Node):
276                 # Just inject configuration stuff
277                 node.home_path = "nepi-node-%s" % (guid,)
278                 node.ident_path = self.sliceSSHKey
279                 node.slicename = self.slicename
280             
281                 # Show the magic
282                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
283             
284         try:
285             for guid, node in self._elements.iteritems():
286                 if isinstance(node, self._node.Node):
287                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
288                     
289                     node.wait_provisioning(
290                         (20*60 if node._node_id in self._just_provisioned else 60)
291                     )
292                     
293                     self._logger.info("READY Node %s at %s", guid, node.hostname)
294                     
295                     # Prepare dependency installer now
296                     node.prepare_dependencies()
297         except self._node.UnresponsiveNodeError:
298             # Uh... 
299             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
300             
301             # Mark all dead nodes (which are unresponsive) on the blacklist
302             # and re-raise
303             for guid, node in self._elements.iteritems():
304                 if isinstance(node, self._node.Node):
305                     if not node.is_alive():
306                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
307                         self._blacklist.add(node._node_id)
308                         node.unassign_node()
309             
310             try:
311                 self._save_blacklist()
312             except:
313                 # not important...
314                 import traceback
315                 traceback.print_exc()
316             
317             raise
318     
319     def do_spanning_deployment_plan(self):
320         # Create application groups by collecting all applications
321         # based on their hash - the hash should contain everything that
322         # defines them and the platform they're built
323         
324         def dephash(app):
325             return (
326                 frozenset((app.depends or "").split(' ')),
327                 frozenset((app.sources or "").split(' ')),
328                 app.build,
329                 app.install,
330                 app.node.architecture,
331                 app.node.operatingSystem,
332                 app.node.pl_distro,
333             )
334         
335         depgroups = collections.defaultdict(list)
336         
337         for element in self._elements.itervalues():
338             if isinstance(element, self._app.Dependency):
339                 depgroups[dephash(element)].append(element)
340             elif isinstance(element, self._node.Node):
341                 deps = element._yum_dependencies
342                 if deps:
343                     depgroups[dephash(deps)].append(deps)
344         
345         # Set up spanning deployment for those applications that
346         # have been deployed in several nodes.
347         for dh, group in depgroups.iteritems():
348             if len(group) > 1:
349                 # Pick root (deterministically)
350                 root = min(group, key=lambda app:app.node.hostname)
351                 
352                 # Obtain all IPs in numeric format
353                 # (which means faster distance computations)
354                 for dep in group:
355                     dep._ip = socket.gethostbyname(dep.node.hostname)
356                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
357                 
358                 # Compute plan
359                 # NOTE: the plan is an iterator
360                 plan = mst.mst(
361                     group,
362                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
363                     root = root,
364                     maxbranching = 2)
365                 
366                 # Re-sign private key
367                 try:
368                     tempprk, temppuk, tmppass = self._make_temp_private_key()
369                 except TempKeyError:
370                     continue
371                 
372                 # Set up slaves
373                 plan = list(plan)
374                 for slave, master in plan:
375                     slave.set_master(master)
376                     slave.install_keys(tempprk, temppuk, tmppass)
377                     
378         # We don't need the user's passphrase anymore
379         self.sliceSSHKeyPass = None
380     
381     def _make_temp_private_key(self):
382         # Get the user's key's passphrase
383         if not self.sliceSSHKeyPass:
384             if 'SSH_ASKPASS' in os.environ:
385                 proc = subprocess.Popen(
386                     [ os.environ['SSH_ASKPASS'],
387                       "Please type the passphrase for the %s SSH identity file. "
388                       "The passphrase will be used to re-cipher the identity file with "
389                       "a random 256-bit key for automated chain deployment on the "
390                       "%s PlanetLab slice" % ( 
391                         os.path.basename(self.sliceSSHKey), 
392                         self.slicename
393                     ) ],
394                     stdin = open("/dev/null"),
395                     stdout = subprocess.PIPE,
396                     stderr = subprocess.PIPE)
397                 out,err = proc.communicate()
398                 self.sliceSSHKeyPass = out.strip()
399         
400         if not self.sliceSSHKeyPass:
401             raise TempKeyError
402         
403         # Create temporary key files
404         prk = tempfile.NamedTemporaryFile(
405             dir = self.root_directory,
406             prefix = "pl_deploy_tmpk_",
407             suffix = "")
408
409         puk = tempfile.NamedTemporaryFile(
410             dir = self.root_directory,
411             prefix = "pl_deploy_tmpk_",
412             suffix = ".pub")
413             
414         # Create secure 256-bits temporary passphrase
415         passphrase = ''.join(map(chr,[rng.randint(0,255) 
416                                       for rng in (random.SystemRandom(),)
417                                       for i in xrange(32)] )).encode("hex")
418                 
419         # Copy keys
420         oprk = open(self.sliceSSHKey, "rb")
421         opuk = open(self.sliceSSHKey+".pub", "rb")
422         shutil.copymode(oprk.name, prk.name)
423         shutil.copymode(opuk.name, puk.name)
424         shutil.copyfileobj(oprk, prk)
425         shutil.copyfileobj(opuk, puk)
426         prk.flush()
427         puk.flush()
428         oprk.close()
429         opuk.close()
430         
431         # A descriptive comment
432         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
433         
434         # Recipher keys
435         proc = subprocess.Popen(
436             ["ssh-keygen", "-p",
437              "-f", prk.name,
438              "-P", self.sliceSSHKeyPass,
439              "-N", passphrase,
440              "-C", comment ],
441             stdout = subprocess.PIPE,
442             stderr = subprocess.PIPE,
443             stdin = subprocess.PIPE
444         )
445         out, err = proc.communicate()
446         
447         if err:
448             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
449                 out, err)
450         
451         prk.seek(0)
452         puk.seek(0)
453         
454         # Change comment on public key
455         puklines = puk.readlines()
456         puklines[0] = puklines[0].split(' ')
457         puklines[0][-1] = comment+'\n'
458         puklines[0] = ' '.join(puklines[0])
459         puk.seek(0)
460         puk.truncate()
461         puk.writelines(puklines)
462         del puklines
463         puk.flush()
464         
465         return prk, puk, passphrase
466     
467     def set(self, guid, name, value, time = TIME_NOW):
468         super(TestbedController, self).set(guid, name, value, time)
469         # TODO: take on account schedule time for the task
470         element = self._elements[guid]
471         if element:
472             setattr(element, name, value)
473
474             if hasattr(element, 'refresh'):
475                 # invoke attribute refresh hook
476                 element.refresh()
477
478     def get(self, guid, name, time = TIME_NOW):
479         value = super(TestbedController, self).get(guid, name, time)
480         # TODO: take on account schedule time for the task
481         factory_id = self._create[guid]
482         factory = self._factories[factory_id]
483         element = self._elements.get(guid)
484         try:
485             return getattr(element, name)
486         except (KeyError, AttributeError):
487             return value
488
489     def get_address(self, guid, index, attribute='Address'):
490         index = int(index)
491
492         # try the real stuff
493         iface = self._elements.get(guid)
494         if iface and index == 0:
495             if attribute == 'Address':
496                 return iface.address
497             elif attribute == 'NetPrefix':
498                 return iface.netprefix
499             elif attribute == 'Broadcast':
500                 return iface.broadcast
501
502         # if all else fails, query box
503         return super(TestbedController, self).get_address(guid, index, attribute)
504
505     def action(self, time, guid, action):
506         raise NotImplementedError
507
508     def shutdown(self):
509         for trace in self._traces.itervalues():
510             trace.close()
511         
512         def invokeif(action, testbed, guid):
513             element = self._elements[guid]
514             if hasattr(element, action):
515                 getattr(element, action)()
516         
517         self._do_in_factory_order(
518             functools.partial(invokeif, 'cleanup'),
519             metadata.shutdown_order)
520
521         self._do_in_factory_order(
522             functools.partial(invokeif, 'destroy'),
523             metadata.shutdown_order)
524             
525         self._elements.clear()
526         self._traces.clear()
527
528     def trace(self, guid, trace_id, attribute='value'):
529         app = self._elements[guid]
530
531         if attribute == 'value':
532             path = app.sync_trace(self.home_directory, trace_id)
533             if path:
534                 fd = open(path, "r")
535                 content = fd.read()
536                 fd.close()
537             else:
538                 content = None
539         elif attribute == 'path':
540             content = app.remote_trace_path(trace_id)
541         else:
542             content = None
543         return content
544
545     def follow_trace(self, trace_id, trace):
546         self._traces[trace_id] = trace
547
548     def recover(self):
549         # Create and connect do not perform any real tasks against
550         # the nodes, it only sets up the object hierarchy,
551         # so we can run them normally
552         self.do_create()
553         self.do_connect_init()
554         self.do_connect_compl()
555         
556         # Manually recover nodes, to mark dependencies installed
557         # and clean up mutable attributes
558         self._do_in_factory_order(
559             lambda self, guid : self._elements[guid].recover(), 
560             [
561                 metadata.NODE,
562             ])
563         
564         # Assign nodes - since we're working off exeucte XML, nodes
565         # have specific hostnames assigned and we don't need to do
566         # real assignment, only find out node ids and check liveliness
567         self.do_resource_discovery(recover = True)
568         self.do_wait_nodes()
569         
570         # Pre/post configure, however, tends to set up tunnels
571         # Execute configuration steps only for those object
572         # kinds that do not have side effects
573         
574         # Do the ones without side effects,
575         # including nodes that need to set up home 
576         # folders and all that
577         self._do_in_factory_order(
578             "preconfigure_function", 
579             [
580                 metadata.INTERNET,
581                 Parallel(metadata.NODE),
582                 metadata.NODEIFACE,
583             ])
584         
585         # Tunnels require a home path that is configured
586         # at this step. Since we cannot run the step itself,
587         # we need to inject this homepath ourselves
588         for guid, element in self._elements.iteritems():
589             if isinstance(element, self._interfaces.TunIface):
590                 element._home_path = "tun-%s" % (guid,)
591         
592         # Manually recover tunnels, applications and
593         # netpipes, negating the side effects
594         self._do_in_factory_order(
595             lambda self, guid : self._elements[guid].recover(), 
596             [
597                 Parallel(metadata.TAPIFACE),
598                 Parallel(metadata.TUNIFACE),
599                 metadata.NETPIPE,
600                 Parallel(metadata.NEPIDEPENDENCY),
601                 Parallel(metadata.NS3DEPENDENCY),
602                 Parallel(metadata.DEPENDENCY),
603                 Parallel(metadata.APPLICATION),
604             ])
605
606         # Tunnels are not harmed by configuration after
607         # recovery, and some attributes get set this way
608         # like external_iface
609         self._do_in_factory_order(
610             "preconfigure_function", 
611             [
612                 Parallel(metadata.TAPIFACE),
613                 Parallel(metadata.TUNIFACE),
614             ])
615
616         # Post-do the ones without side effects
617         self._do_in_factory_order(
618             "configure_function", 
619             [
620                 metadata.INTERNET,
621                 Parallel(metadata.NODE),
622                 metadata.NODEIFACE,
623                 Parallel(metadata.TAPIFACE),
624                 Parallel(metadata.TUNIFACE),
625             ])
626         
627         # There are no required prestart steps
628         # to call upon recovery, so we're done
629         
630     
631     def _make_generic(self, parameters, kind):
632         app = kind(self.plapi)
633
634         # Note: there is 1-to-1 correspondence between attribute names
635         #   If that changes, this has to change as well
636         for attr,val in parameters.iteritems():
637             setattr(app, attr, val)
638
639         return app
640
641     def _make_node(self, parameters):
642         node = self._make_generic(parameters, self._node.Node)
643         node.enable_cleanup = self.dedicatedSlice
644         return node
645
646     def _make_node_iface(self, parameters):
647         return self._make_generic(parameters, self._interfaces.NodeIface)
648
649     def _make_tun_iface(self, parameters):
650         return self._make_generic(parameters, self._interfaces.TunIface)
651
652     def _make_tap_iface(self, parameters):
653         return self._make_generic(parameters, self._interfaces.TapIface)
654
655     def _make_netpipe(self, parameters):
656         return self._make_generic(parameters, self._interfaces.NetPipe)
657
658     def _make_internet(self, parameters):
659         return self._make_generic(parameters, self._interfaces.Internet)
660
661     def _make_application(self, parameters):
662         return self._make_generic(parameters, self._app.Application)
663
664     def _make_dependency(self, parameters):
665         return self._make_generic(parameters, self._app.Dependency)
666
667     def _make_nepi_dependency(self, parameters):
668         return self._make_generic(parameters, self._app.NepiDependency)
669
670     def _make_ns3_dependency(self, parameters):
671         return self._make_generic(parameters, self._app.NS3Dependency)
672
673     def _make_tun_filter(self, parameters):
674         return self._make_generic(parameters, self._interfaces.TunFilter)
675