code clean up
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 import sys
11 import os
12 import os.path
13 import time
14 import resourcealloc
15 import collections
16 import operator
17 import functools
18 import socket
19 import struct
20 import tempfile
21 import subprocess
22 import random
23 import shutil
24
25 class TempKeyError(Exception):
26     pass
27
28 class TestbedController(testbed_impl.TestbedController):
29     def __init__(self, testbed_version):
30         super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
31         self._home_directory = None
32         self.slicename = None
33         self._traces = dict()
34
35         import node, interfaces, application
36         self._node = node
37         self._interfaces = interfaces
38         self._app = application
39         
40         self._blacklist = set()
41         self._just_provisioned = set()
42         
43         self._load_blacklist()
44
45     @property
46     def home_directory(self):
47         return self._home_directory
48
49     @property
50     def plapi(self):
51         if not hasattr(self, '_plapi'):
52             import plcapi
53
54             if self.authUser:
55                 self._plapi = plcapi.PLCAPI(
56                     username = self.authUser,
57                     password = self.authString,
58                     hostname = self.plcHost,
59                     urlpattern = self.plcUrl
60                     )
61             else:
62                 # anonymous access - may not be enough for much
63                 self._plapi = plcapi.PLCAPI()
64         return self._plapi
65
66     @property
67     def slice_id(self):
68         if not hasattr(self, '_slice_id'):
69             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
70             if slices:
71                 self._slice_id = slices[0]['slice_id']
72             else:
73                 # If it wasn't found, don't remember this failure, keep trying
74                 return None
75         return self._slice_id
76     
77     def _load_blacklist(self):
78         blpath = environ.homepath('plblacklist')
79         
80         try:
81             bl = open(blpath, "r")
82         except:
83             self._blacklist = set()
84             return
85             
86         try:
87             self._blacklist = set(
88                 map(int,
89                     map(str.strip, bl.readlines())
90                 )
91             )
92         finally:
93             bl.close()
94     
95     def _save_blacklist(self):
96         blpath = environ.homepath('plblacklist')
97         bl = open(blpath, "w")
98         try:
99             bl.writelines(
100                 map('%s\n'.__mod__, self._blacklist))
101         finally:
102             bl.close()
103     
104     def do_setup(self):
105         self._home_directory = self._attributes.\
106             get_attribute_value("homeDirectory")
107         self.slicename = self._attributes.\
108             get_attribute_value("slice")
109         self.authUser = self._attributes.\
110             get_attribute_value("authUser")
111         self.authString = self._attributes.\
112             get_attribute_value("authPass")
113         self.sliceSSHKey = self._attributes.\
114             get_attribute_value("sliceSSHKey")
115         self.sliceSSHKeyPass = None
116         self.plcHost = self._attributes.\
117             get_attribute_value("plcHost")
118         self.plcUrl = self._attributes.\
119             get_attribute_value("plcUrl")
120         super(TestbedController, self).do_setup()
121
122     def do_post_asynclaunch(self, guid):
123         # Dependencies were launched asynchronously,
124         # so wait for them
125         dep = self._elements[guid]
126         if isinstance(dep, self._app.Dependency):
127             dep.async_setup_wait()
128     
129     # Two-phase configuration for asynchronous launch
130     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
131     do_poststep_configure = staticmethod(do_post_asynclaunch)
132
133     def do_preconfigure(self):
134         while True:
135             # Perform resource discovery if we don't have
136             # specific resources assigned yet
137             self.do_resource_discovery()
138
139             # Create PlanetLab slivers
140             self.do_provisioning()
141             
142             try:
143                 # Wait for provisioning
144                 self.do_wait_nodes()
145                 
146                 # Okkey...
147                 break
148             except self._node.UnresponsiveNodeError:
149                 # Oh... retry...
150                 pass
151         
152         # Plan application deployment
153         self.do_spanning_deployment_plan()
154
155         # Configure elements per XML data
156         super(TestbedController, self).do_preconfigure()
157
158     def do_resource_discovery(self):
159         to_provision = self._to_provision = set()
160         
161         reserved = set(self._blacklist)
162         for guid, node in self._elements.iteritems():
163             if isinstance(node, self._node.Node) and node._node_id is not None:
164                 reserved.add(node._node_id)
165         
166         # Initial algo:
167         #   look for perfectly defined nodes
168         #   (ie: those with only one candidate)
169         for guid, node in self._elements.iteritems():
170             if isinstance(node, self._node.Node) and node._node_id is None:
171                 # Try existing nodes first
172                 # If we have only one candidate, simply use it
173                 candidates = node.find_candidates(
174                     filter_slice_id = self.slice_id)
175                 candidates -= reserved
176                 if len(candidates) == 1:
177                     node_id = iter(candidates).next()
178                     node.assign_node_id(node_id)
179                     reserved.add(node_id)
180                 elif not candidates:
181                     # Try again including unassigned nodes
182                     candidates = node.find_candidates()
183                     candidates -= reserved
184                     if len(candidates) > 1:
185                         continue
186                     if len(candidates) == 1:
187                         node_id = iter(candidates).next()
188                         node.assign_node_id(node_id)
189                         to_provision.add(node_id)
190                         reserved.add(node_id)
191                     elif not candidates:
192                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
193                             node.make_filter_description())
194         
195         # Now do the backtracking search for a suitable solution
196         # First with existing slice nodes
197         reqs = []
198         nodes = []
199         for guid, node in self._elements.iteritems():
200             if isinstance(node, self._node.Node) and node._node_id is None:
201                 # Try existing nodes first
202                 # If we have only one candidate, simply use it
203                 candidates = node.find_candidates(
204                     filter_slice_id = self.slice_id)
205                 candidates -= reserved
206                 reqs.append(candidates)
207                 nodes.append(node)
208         
209         if nodes and reqs:
210             try:
211                 solution = resourcealloc.alloc(reqs)
212             except resourcealloc.ResourceAllocationError:
213                 # Failed, try again with all nodes
214                 reqs = []
215                 for node in nodes:
216                     candidates = node.find_candidates()
217                     candidates -= reserved
218                     reqs.append(candidates)
219                 
220                 solution = resourcealloc.alloc(reqs)
221                 to_provision.update(solution)
222             
223             # Do assign nodes
224             for node, node_id in zip(nodes, solution):
225                 node.assign_node_id(node_id)
226
227     def do_provisioning(self):
228         if self._to_provision:
229             # Add new nodes to the slice
230             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
231             new_nodes = list(set(cur_nodes) | self._to_provision)
232             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
233
234         # cleanup
235         self._just_provisioned = self._to_provision
236         del self._to_provision
237     
238     def do_wait_nodes(self):
239         for guid, node in self._elements.iteritems():
240             if isinstance(node, self._node.Node):
241                 # Just inject configuration stuff
242                 node.home_path = "nepi-node-%s" % (guid,)
243                 node.ident_path = self.sliceSSHKey
244                 node.slicename = self.slicename
245             
246                 # Show the magic
247                 print "PlanetLab Node", guid, "configured at", node.hostname
248             
249         try:
250             for guid, node in self._elements.iteritems():
251                 if isinstance(node, self._node.Node):
252                     print "Waiting for Node", guid, "configured at", node.hostname,
253                     sys.stdout.flush()
254                     
255                     node.wait_provisioning(
256                         (20*60 if node._node_id in self._just_provisioned else 60)
257                     )
258                     
259                     print "READY"
260         except self._node.UnresponsiveNodeError:
261             # Uh... 
262             print "UNRESPONSIVE"
263             
264             # Mark all dead nodes (which are unresponsive) on the blacklist
265             # and re-raise
266             for guid, node in self._elements.iteritems():
267                 if isinstance(node, self._node.Node):
268                     if not node.is_alive():
269                         print "Blacklisting", node.hostname, "for unresponsiveness"
270                         self._blacklist.add(node._node_id)
271                         node.unassign_node()
272             
273             try:
274                 self._save_blacklist()
275             except:
276                 # not important...
277                 import traceback
278                 traceback.print_exc()
279             
280             raise
281     
282     def do_spanning_deployment_plan(self):
283         # Create application groups by collecting all applications
284         # based on their hash - the hash should contain everything that
285         # defines them and the platform they're built
286         
287         def dephash(app):
288             return (
289                 frozenset((app.depends or "").split(' ')),
290                 frozenset((app.sources or "").split(' ')),
291                 app.build,
292                 app.install,
293                 app.node.architecture,
294                 app.node.operatingSystem,
295                 app.node.pl_distro,
296             )
297         
298         depgroups = collections.defaultdict(list)
299         
300         for element in self._elements.itervalues():
301             if isinstance(element, self._app.Dependency):
302                 depgroups[dephash(element)].append(element)
303         
304         # Set up spanning deployment for those applications that
305         # have been deployed in several nodes.
306         for dh, group in depgroups.iteritems():
307             if len(group) > 1:
308                 # Pick root (deterministically)
309                 root = min(group, key=lambda app:app.node.hostname)
310                 
311                 # Obtain all IPs in numeric format
312                 # (which means faster distance computations)
313                 for dep in group:
314                     dep._ip = socket.gethostbyname(dep.node.hostname)
315                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
316                 
317                 # Compute plan
318                 # NOTE: the plan is an iterator
319                 plan = mst.mst(
320                     group,
321                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
322                     root = root,
323                     maxbranching = 2)
324                 
325                 # Re-sign private key
326                 try:
327                     tempprk, temppuk, tmppass = self._make_temp_private_key()
328                 except TempKeyError:
329                     continue
330                 
331                 # Set up slaves
332                 plan = list(plan)
333                 for slave, master in plan:
334                     slave.set_master(master)
335                     slave.install_keys(tempprk, temppuk, tmppass)
336                     
337         # We don't need the user's passphrase anymore
338         self.sliceSSHKeyPass = None
339     
340     def _make_temp_private_key(self):
341         # Get the user's key's passphrase
342         if not self.sliceSSHKeyPass:
343             if 'SSH_ASKPASS' in os.environ:
344                 proc = subprocess.Popen(
345                     [ os.environ['SSH_ASKPASS'],
346                       "Please type the passphrase for the %s SSH identity file. "
347                       "The passphrase will be used to re-cipher the identity file with "
348                       "a random 256-bit key for automated chain deployment on the "
349                       "%s PlanetLab slice" % ( 
350                         os.path.basename(self.sliceSSHKey), 
351                         self.slicename
352                     ) ],
353                     stdin = open("/dev/null"),
354                     stdout = subprocess.PIPE,
355                     stderr = subprocess.PIPE)
356                 out,err = proc.communicate()
357                 self.sliceSSHKeyPass = out.strip()
358         
359         if not self.sliceSSHKeyPass:
360             raise TempKeyError
361         
362         # Create temporary key files
363         prk = tempfile.NamedTemporaryFile(
364             dir = self.root_directory,
365             prefix = "pl_deploy_tmpk_",
366             suffix = "")
367
368         puk = tempfile.NamedTemporaryFile(
369             dir = self.root_directory,
370             prefix = "pl_deploy_tmpk_",
371             suffix = ".pub")
372             
373         # Create secure 256-bits temporary passphrase
374         passphrase = ''.join(map(chr,[rng.randint(0,255) 
375                                       for rng in (random.SystemRandom(),)
376                                       for i in xrange(32)] )).encode("hex")
377                 
378         # Copy keys
379         oprk = open(self.sliceSSHKey, "rb")
380         opuk = open(self.sliceSSHKey+".pub", "rb")
381         shutil.copymode(oprk.name, prk.name)
382         shutil.copymode(opuk.name, puk.name)
383         shutil.copyfileobj(oprk, prk)
384         shutil.copyfileobj(opuk, puk)
385         prk.flush()
386         puk.flush()
387         oprk.close()
388         opuk.close()
389         
390         # A descriptive comment
391         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
392         
393         # Recipher keys
394         proc = subprocess.Popen(
395             ["ssh-keygen", "-p",
396              "-f", prk.name,
397              "-P", self.sliceSSHKeyPass,
398              "-N", passphrase,
399              "-C", comment ],
400             stdout = subprocess.PIPE,
401             stderr = subprocess.PIPE,
402             stdin = subprocess.PIPE
403         )
404         out, err = proc.communicate()
405         
406         if err:
407             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
408                 out, err)
409         
410         prk.seek(0)
411         puk.seek(0)
412         
413         # Change comment on public key
414         puklines = puk.readlines()
415         puklines[0] = puklines[0].split(' ')
416         puklines[0][-1] = comment+'\n'
417         puklines[0] = ' '.join(puklines[0])
418         puk.seek(0)
419         puk.truncate()
420         puk.writelines(puklines)
421         del puklines
422         puk.flush()
423         
424         return prk, puk, passphrase
425     
426     def set(self, guid, name, value, time = TIME_NOW):
427         super(TestbedController, self).set(guid, name, value, time)
428         # TODO: take on account schedule time for the task
429         element = self._elements[guid]
430         if element:
431             setattr(element, name, value)
432
433             if hasattr(element, 'refresh'):
434                 # invoke attribute refresh hook
435                 element.refresh()
436
437     def get(self, guid, name, time = TIME_NOW):
438         value = super(TestbedController, self).get(guid, name, time)
439         # TODO: take on account schedule time for the task
440         factory_id = self._create[guid]
441         factory = self._factories[factory_id]
442         element = self._elements.get(guid)
443         try:
444             return getattr(element, name)
445         except KeyError, AttributeError:
446             return value
447
448     def get_address(self, guid, index, attribute='Address'):
449         index = int(index)
450
451         # try the real stuff
452         iface = self._elements.get(guid)
453         if iface and index == 0:
454             if attribute == 'Address':
455                 return iface.address
456             elif attribute == 'NetPrefix':
457                 return iface.netprefix
458             elif attribute == 'Broadcast':
459                 return iface.broadcast
460
461         # if all else fails, query box
462         return super(TestbedController, self).get_address(guid, index, attribute)
463
464     def action(self, time, guid, action):
465         raise NotImplementedError
466
467     def shutdown(self):
468         for trace in self._traces.itervalues():
469             trace.close()
470         for element in self._elements.itervalues():
471             # invoke cleanup hooks
472             if hasattr(element, 'cleanup'):
473                 element.cleanup()
474         for element in self._elements.itervalues():
475             # invoke destroy hooks
476             if hasattr(element, 'destroy'):
477                 element.destroy()
478         self._elements.clear()
479         self._traces.clear()
480
481     def trace(self, guid, trace_id, attribute='value'):
482         app = self._elements[guid]
483
484         if attribute == 'value':
485             path = app.sync_trace(self.home_directory, trace_id)
486             if path:
487                 fd = open(path, "r")
488                 content = fd.read()
489                 fd.close()
490             else:
491                 content = None
492         elif attribute == 'path':
493             content = app.remote_trace_path(trace_id)
494         else:
495             content = None
496         return content
497
498     def follow_trace(self, trace_id, trace):
499         self._traces[trace_id] = trace
500     
501     def _make_generic(self, parameters, kind):
502         app = kind(self.plapi)
503
504         # Note: there is 1-to-1 correspondence between attribute names
505         #   If that changes, this has to change as well
506         for attr,val in parameters.iteritems():
507             setattr(app, attr, val)
508
509         return app
510
511     def _make_node(self, parameters):
512         node = self._make_generic(parameters, self._node.Node)
513
514         # If emulation is enabled, we automatically need
515         # some vsys interfaces and packages
516         if node.emulation:
517             node.required_vsys.add('ipfw-be')
518             node.required_packages.add('ipfwslice')
519
520         return node
521
522     def _make_node_iface(self, parameters):
523         return self._make_generic(parameters, self._interfaces.NodeIface)
524
525     def _make_tun_iface(self, parameters):
526         return self._make_generic(parameters, self._interfaces.TunIface)
527
528     def _make_tap_iface(self, parameters):
529         return self._make_generic(parameters, self._interfaces.TapIface)
530
531     def _make_netpipe(self, parameters):
532         return self._make_generic(parameters, self._interfaces.NetPipe)
533
534     def _make_internet(self, parameters):
535         return self._make_generic(parameters, self._interfaces.Internet)
536
537     def _make_application(self, parameters):
538         return self._make_generic(parameters, self._app.Application)
539
540     def _make_dependency(self, parameters):
541         return self._make_generic(parameters, self._app.Dependency)
542
543     def _make_nepi_dependency(self, parameters):
544         return self._make_generic(parameters, self._app.NepiDependency)
545
546     def _make_ns3_dependency(self, parameters):
547         return self._make_generic(parameters, self._app.NS3Dependency)
548