Ticket #71: inner parallelization of setup phases
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
11 import sys
12 import os
13 import os.path
14 import time
15 import resourcealloc
16 import collections
17 import operator
18 import functools
19 import socket
20 import struct
21 import tempfile
22 import subprocess
23 import random
24 import shutil
25
26 class TempKeyError(Exception):
27     pass
28
29 class TestbedController(testbed_impl.TestbedController):
30     def __init__(self):
31         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
32         self._home_directory = None
33         self.slicename = None
34         self._traces = dict()
35
36         import node, interfaces, application
37         self._node = node
38         self._interfaces = interfaces
39         self._app = application
40         
41         self._blacklist = set()
42         self._just_provisioned = set()
43         
44         self._load_blacklist()
45
46     @property
47     def home_directory(self):
48         return self._home_directory
49
50     @property
51     def plapi(self):
52         if not hasattr(self, '_plapi'):
53             import plcapi
54
55             if self.authUser:
56                 self._plapi = plcapi.PLCAPI(
57                     username = self.authUser,
58                     password = self.authString,
59                     hostname = self.plcHost,
60                     urlpattern = self.plcUrl
61                     )
62             else:
63                 # anonymous access - may not be enough for much
64                 self._plapi = plcapi.PLCAPI()
65         return self._plapi
66
67     @property
68     def slice_id(self):
69         if not hasattr(self, '_slice_id'):
70             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
71             if slices:
72                 self._slice_id = slices[0]['slice_id']
73             else:
74                 # If it wasn't found, don't remember this failure, keep trying
75                 return None
76         return self._slice_id
77     
78     def _load_blacklist(self):
79         blpath = environ.homepath('plblacklist')
80         
81         try:
82             bl = open(blpath, "r")
83         except:
84             self._blacklist = set()
85             return
86             
87         try:
88             self._blacklist = set(
89                 map(int,
90                     map(str.strip, bl.readlines())
91                 )
92             )
93         finally:
94             bl.close()
95     
96     def _save_blacklist(self):
97         blpath = environ.homepath('plblacklist')
98         bl = open(blpath, "w")
99         try:
100             bl.writelines(
101                 map('%s\n'.__mod__, self._blacklist))
102         finally:
103             bl.close()
104     
105     def do_setup(self):
106         self._home_directory = self._attributes.\
107             get_attribute_value("homeDirectory")
108         self.slicename = self._attributes.\
109             get_attribute_value("slice")
110         self.authUser = self._attributes.\
111             get_attribute_value("authUser")
112         self.authString = self._attributes.\
113             get_attribute_value("authPass")
114         self.sliceSSHKey = self._attributes.\
115             get_attribute_value("sliceSSHKey")
116         self.sliceSSHKeyPass = None
117         self.plcHost = self._attributes.\
118             get_attribute_value("plcHost")
119         self.plcUrl = self._attributes.\
120             get_attribute_value("plcUrl")
121         super(TestbedController, self).do_setup()
122
123     def do_post_asynclaunch(self, guid):
124         # Dependencies were launched asynchronously,
125         # so wait for them
126         dep = self._elements[guid]
127         if isinstance(dep, self._app.Dependency):
128             dep.async_setup_wait()
129     
130     # Two-phase configuration for asynchronous launch
131     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
132     do_poststep_configure = staticmethod(do_post_asynclaunch)
133
134     def do_preconfigure(self):
135         while True:
136             # Perform resource discovery if we don't have
137             # specific resources assigned yet
138             self.do_resource_discovery()
139
140             # Create PlanetLab slivers
141             self.do_provisioning()
142             
143             try:
144                 # Wait for provisioning
145                 self.do_wait_nodes()
146                 
147                 # Okkey...
148                 break
149             except self._node.UnresponsiveNodeError:
150                 # Oh... retry...
151                 pass
152         
153         # Plan application deployment
154         self.do_spanning_deployment_plan()
155
156         # Configure elements per XML data
157         super(TestbedController, self).do_preconfigure()
158
159     def do_resource_discovery(self):
160         to_provision = self._to_provision = set()
161         
162         reserved = set(self._blacklist)
163         for guid, node in self._elements.iteritems():
164             if isinstance(node, self._node.Node) and node._node_id is not None:
165                 reserved.add(node._node_id)
166         
167         # Initial algo:
168         #   look for perfectly defined nodes
169         #   (ie: those with only one candidate)
170         for guid, node in self._elements.iteritems():
171             if isinstance(node, self._node.Node) and node._node_id is None:
172                 # Try existing nodes first
173                 # If we have only one candidate, simply use it
174                 candidates = node.find_candidates(
175                     filter_slice_id = self.slice_id)
176                 candidates -= reserved
177                 if len(candidates) == 1:
178                     node_id = iter(candidates).next()
179                     node.assign_node_id(node_id)
180                     reserved.add(node_id)
181                 elif not candidates:
182                     # Try again including unassigned nodes
183                     candidates = node.find_candidates()
184                     candidates -= reserved
185                     if len(candidates) > 1:
186                         continue
187                     if len(candidates) == 1:
188                         node_id = iter(candidates).next()
189                         node.assign_node_id(node_id)
190                         to_provision.add(node_id)
191                         reserved.add(node_id)
192                     elif not candidates:
193                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
194                             node.make_filter_description())
195         
196         # Now do the backtracking search for a suitable solution
197         # First with existing slice nodes
198         reqs = []
199         nodes = []
200         for guid, node in self._elements.iteritems():
201             if isinstance(node, self._node.Node) and node._node_id is None:
202                 # Try existing nodes first
203                 # If we have only one candidate, simply use it
204                 candidates = node.find_candidates(
205                     filter_slice_id = self.slice_id)
206                 candidates -= reserved
207                 reqs.append(candidates)
208                 nodes.append(node)
209         
210         if nodes and reqs:
211             try:
212                 solution = resourcealloc.alloc(reqs)
213             except resourcealloc.ResourceAllocationError:
214                 # Failed, try again with all nodes
215                 reqs = []
216                 for node in nodes:
217                     candidates = node.find_candidates()
218                     candidates -= reserved
219                     reqs.append(candidates)
220                 
221                 solution = resourcealloc.alloc(reqs)
222                 to_provision.update(solution)
223             
224             # Do assign nodes
225             for node, node_id in zip(nodes, solution):
226                 node.assign_node_id(node_id)
227
228     def do_provisioning(self):
229         if self._to_provision:
230             # Add new nodes to the slice
231             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
232             new_nodes = list(set(cur_nodes) | self._to_provision)
233             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
234
235         # cleanup
236         self._just_provisioned = self._to_provision
237         del self._to_provision
238     
239     def do_wait_nodes(self):
240         for guid, node in self._elements.iteritems():
241             if isinstance(node, self._node.Node):
242                 # Just inject configuration stuff
243                 node.home_path = "nepi-node-%s" % (guid,)
244                 node.ident_path = self.sliceSSHKey
245                 node.slicename = self.slicename
246             
247                 # Show the magic
248                 print >>sys.stderr, "PlanetLab Node", guid, "configured at", node.hostname
249             
250         try:
251             for guid, node in self._elements.iteritems():
252                 if isinstance(node, self._node.Node):
253                     print >>sys.stderr, "Waiting for Node", guid, "configured at", node.hostname,
254                     sys.stdout.flush()
255                     
256                     node.wait_provisioning(
257                         (20*60 if node._node_id in self._just_provisioned else 60)
258                     )
259                     
260                     print >>sys.stderr, "READY"
261         except self._node.UnresponsiveNodeError:
262             # Uh... 
263             print >>sys.stderr, "UNRESPONSIVE"
264             
265             # Mark all dead nodes (which are unresponsive) on the blacklist
266             # and re-raise
267             for guid, node in self._elements.iteritems():
268                 if isinstance(node, self._node.Node):
269                     if not node.is_alive():
270                         print >>sys.stderr, "Blacklisting", node.hostname, "for unresponsiveness"
271                         self._blacklist.add(node._node_id)
272                         node.unassign_node()
273             
274             try:
275                 self._save_blacklist()
276             except:
277                 # not important...
278                 import traceback
279                 traceback.print_exc()
280             
281             raise
282     
283     def do_spanning_deployment_plan(self):
284         # Create application groups by collecting all applications
285         # based on their hash - the hash should contain everything that
286         # defines them and the platform they're built
287         
288         def dephash(app):
289             return (
290                 frozenset((app.depends or "").split(' ')),
291                 frozenset((app.sources or "").split(' ')),
292                 app.build,
293                 app.install,
294                 app.node.architecture,
295                 app.node.operatingSystem,
296                 app.node.pl_distro,
297             )
298         
299         depgroups = collections.defaultdict(list)
300         
301         for element in self._elements.itervalues():
302             if isinstance(element, self._app.Dependency):
303                 depgroups[dephash(element)].append(element)
304         
305         # Set up spanning deployment for those applications that
306         # have been deployed in several nodes.
307         for dh, group in depgroups.iteritems():
308             if len(group) > 1:
309                 # Pick root (deterministically)
310                 root = min(group, key=lambda app:app.node.hostname)
311                 
312                 # Obtain all IPs in numeric format
313                 # (which means faster distance computations)
314                 for dep in group:
315                     dep._ip = socket.gethostbyname(dep.node.hostname)
316                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
317                 
318                 # Compute plan
319                 # NOTE: the plan is an iterator
320                 plan = mst.mst(
321                     group,
322                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
323                     root = root,
324                     maxbranching = 2)
325                 
326                 # Re-sign private key
327                 try:
328                     tempprk, temppuk, tmppass = self._make_temp_private_key()
329                 except TempKeyError:
330                     continue
331                 
332                 # Set up slaves
333                 plan = list(plan)
334                 for slave, master in plan:
335                     slave.set_master(master)
336                     slave.install_keys(tempprk, temppuk, tmppass)
337                     
338         # We don't need the user's passphrase anymore
339         self.sliceSSHKeyPass = None
340     
341     def _make_temp_private_key(self):
342         # Get the user's key's passphrase
343         if not self.sliceSSHKeyPass:
344             if 'SSH_ASKPASS' in os.environ:
345                 proc = subprocess.Popen(
346                     [ os.environ['SSH_ASKPASS'],
347                       "Please type the passphrase for the %s SSH identity file. "
348                       "The passphrase will be used to re-cipher the identity file with "
349                       "a random 256-bit key for automated chain deployment on the "
350                       "%s PlanetLab slice" % ( 
351                         os.path.basename(self.sliceSSHKey), 
352                         self.slicename
353                     ) ],
354                     stdin = open("/dev/null"),
355                     stdout = subprocess.PIPE,
356                     stderr = subprocess.PIPE)
357                 out,err = proc.communicate()
358                 self.sliceSSHKeyPass = out.strip()
359         
360         if not self.sliceSSHKeyPass:
361             raise TempKeyError
362         
363         # Create temporary key files
364         prk = tempfile.NamedTemporaryFile(
365             dir = self.root_directory,
366             prefix = "pl_deploy_tmpk_",
367             suffix = "")
368
369         puk = tempfile.NamedTemporaryFile(
370             dir = self.root_directory,
371             prefix = "pl_deploy_tmpk_",
372             suffix = ".pub")
373             
374         # Create secure 256-bits temporary passphrase
375         passphrase = ''.join(map(chr,[rng.randint(0,255) 
376                                       for rng in (random.SystemRandom(),)
377                                       for i in xrange(32)] )).encode("hex")
378                 
379         # Copy keys
380         oprk = open(self.sliceSSHKey, "rb")
381         opuk = open(self.sliceSSHKey+".pub", "rb")
382         shutil.copymode(oprk.name, prk.name)
383         shutil.copymode(opuk.name, puk.name)
384         shutil.copyfileobj(oprk, prk)
385         shutil.copyfileobj(opuk, puk)
386         prk.flush()
387         puk.flush()
388         oprk.close()
389         opuk.close()
390         
391         # A descriptive comment
392         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
393         
394         # Recipher keys
395         proc = subprocess.Popen(
396             ["ssh-keygen", "-p",
397              "-f", prk.name,
398              "-P", self.sliceSSHKeyPass,
399              "-N", passphrase,
400              "-C", comment ],
401             stdout = subprocess.PIPE,
402             stderr = subprocess.PIPE,
403             stdin = subprocess.PIPE
404         )
405         out, err = proc.communicate()
406         
407         if err:
408             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
409                 out, err)
410         
411         prk.seek(0)
412         puk.seek(0)
413         
414         # Change comment on public key
415         puklines = puk.readlines()
416         puklines[0] = puklines[0].split(' ')
417         puklines[0][-1] = comment+'\n'
418         puklines[0] = ' '.join(puklines[0])
419         puk.seek(0)
420         puk.truncate()
421         puk.writelines(puklines)
422         del puklines
423         puk.flush()
424         
425         return prk, puk, passphrase
426     
427     def set(self, guid, name, value, time = TIME_NOW):
428         super(TestbedController, self).set(guid, name, value, time)
429         # TODO: take on account schedule time for the task
430         element = self._elements[guid]
431         if element:
432             setattr(element, name, value)
433
434             if hasattr(element, 'refresh'):
435                 # invoke attribute refresh hook
436                 element.refresh()
437
438     def get(self, guid, name, time = TIME_NOW):
439         value = super(TestbedController, self).get(guid, name, time)
440         # TODO: take on account schedule time for the task
441         factory_id = self._create[guid]
442         factory = self._factories[factory_id]
443         element = self._elements.get(guid)
444         try:
445             return getattr(element, name)
446         except (KeyError, AttributeError):
447             return value
448
449     def get_address(self, guid, index, attribute='Address'):
450         index = int(index)
451
452         # try the real stuff
453         iface = self._elements.get(guid)
454         if iface and index == 0:
455             if attribute == 'Address':
456                 return iface.address
457             elif attribute == 'NetPrefix':
458                 return iface.netprefix
459             elif attribute == 'Broadcast':
460                 return iface.broadcast
461
462         # if all else fails, query box
463         return super(TestbedController, self).get_address(guid, index, attribute)
464
465     def action(self, time, guid, action):
466         raise NotImplementedError
467
468     def shutdown(self):
469         for trace in self._traces.itervalues():
470             trace.close()
471         
472         runner = ParallelRun(16)
473         runner.start()
474         for element in self._elements.itervalues():
475             # invoke cleanup hooks
476             if hasattr(element, 'cleanup'):
477                 runner.put(element.cleanup)
478         runner.join()
479         
480         runner = ParallelRun(16)
481         runner.start()
482         for element in self._elements.itervalues():
483             # invoke destroy hooks
484             if hasattr(element, 'destroy'):
485                 runner.put(element.destroy)
486         runner.join()
487         
488         self._elements.clear()
489         self._traces.clear()
490
491     def trace(self, guid, trace_id, attribute='value'):
492         app = self._elements[guid]
493
494         if attribute == 'value':
495             path = app.sync_trace(self.home_directory, trace_id)
496             if path:
497                 fd = open(path, "r")
498                 content = fd.read()
499                 fd.close()
500             else:
501                 content = None
502         elif attribute == 'path':
503             content = app.remote_trace_path(trace_id)
504         else:
505             content = None
506         return content
507
508     def follow_trace(self, trace_id, trace):
509         self._traces[trace_id] = trace
510     
511     def _make_generic(self, parameters, kind):
512         app = kind(self.plapi)
513
514         # Note: there is 1-to-1 correspondence between attribute names
515         #   If that changes, this has to change as well
516         for attr,val in parameters.iteritems():
517             setattr(app, attr, val)
518
519         return app
520
521     def _make_node(self, parameters):
522         return self._make_generic(parameters, self._node.Node)
523
524     def _make_node_iface(self, parameters):
525         return self._make_generic(parameters, self._interfaces.NodeIface)
526
527     def _make_tun_iface(self, parameters):
528         return self._make_generic(parameters, self._interfaces.TunIface)
529
530     def _make_tap_iface(self, parameters):
531         return self._make_generic(parameters, self._interfaces.TapIface)
532
533     def _make_netpipe(self, parameters):
534         return self._make_generic(parameters, self._interfaces.NetPipe)
535
536     def _make_internet(self, parameters):
537         return self._make_generic(parameters, self._interfaces.Internet)
538
539     def _make_application(self, parameters):
540         return self._make_generic(parameters, self._app.Application)
541
542     def _make_dependency(self, parameters):
543         return self._make_generic(parameters, self._app.Dependency)
544
545     def _make_nepi_dependency(self, parameters):
546         return self._make_generic(parameters, self._app.NepiDependency)
547
548     def _make_ns3_dependency(self, parameters):
549         return self._make_generic(parameters, self._app.NS3Dependency)
550