3a1b65ab0f12bd44448c3d10d153e288fa90ad21
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 import sys
10 import os
11 import os.path
12 import time
13 import resourcealloc
14 import collections
15 import operator
16 import functools
17 import socket
18 import struct
19 import tempfile
20 import subprocess
21 import random
22 import shutil
23
24 from nepi.util.constants import TESTBED_STATUS_CONFIGURED
25
26 class TempKeyError(Exception):
27     pass
28
29 class TestbedController(testbed_impl.TestbedController):
30     def __init__(self, testbed_version):
31         super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
32         self._home_directory = None
33         self.slicename = None
34         self._traces = dict()
35
36         import node, interfaces, application
37         self._node = node
38         self._interfaces = interfaces
39         self._app = application
40         
41         self._blacklist = set()
42
43     @property
44     def home_directory(self):
45         return self._home_directory
46
47     @property
48     def plapi(self):
49         if not hasattr(self, '_plapi'):
50             import plcapi
51
52             if self.authUser:
53                 self._plapi = plcapi.PLCAPI(
54                     username = self.authUser,
55                     password = self.authString,
56                     hostname = self.plcHost,
57                     urlpattern = self.plcUrl
58                     )
59             else:
60                 # anonymous access - may not be enough for much
61                 self._plapi = plcapi.PLCAPI()
62         return self._plapi
63
64     @property
65     def slice_id(self):
66         if not hasattr(self, '_slice_id'):
67             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
68             if slices:
69                 self._slice_id = slices[0]['slice_id']
70             else:
71                 # If it wasn't found, don't remember this failure, keep trying
72                 return None
73         return self._slice_id
74
75     def do_setup(self):
76         self._home_directory = self._attributes.\
77             get_attribute_value("homeDirectory")
78         self.slicename = self._attributes.\
79             get_attribute_value("slice")
80         self.authUser = self._attributes.\
81             get_attribute_value("authUser")
82         self.authString = self._attributes.\
83             get_attribute_value("authPass")
84         self.sliceSSHKey = self._attributes.\
85             get_attribute_value("sliceSSHKey")
86         self.sliceSSHKeyPass = None
87         self.plcHost = self._attributes.\
88             get_attribute_value("plcHost")
89         self.plcUrl = self._attributes.\
90             get_attribute_value("plcUrl")
91         super(TestbedController, self).do_setup()
92
93     def do_post_asynclaunch(self, guid):
94         # Dependencies were launched asynchronously,
95         # so wait for them
96         dep = self._elements[guid]
97         if isinstance(dep, self._app.Dependency):
98             dep.async_setup_wait()
99     
100     # Two-phase configuration for asynchronous launch
101     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
102     do_poststep_configure = staticmethod(do_post_asynclaunch)
103
104     def do_preconfigure(self):
105         while True:
106             # Perform resource discovery if we don't have
107             # specific resources assigned yet
108             self.do_resource_discovery()
109
110             # Create PlanetLab slivers
111             self.do_provisioning()
112             
113             try:
114                 # Wait for provisioning
115                 self.do_wait_nodes()
116                 
117                 # Okkey...
118                 break
119             except self._node.UnresponsiveNodeError:
120                 # Oh... retry...
121                 pass
122         
123         # Plan application deployment
124         self.do_spanning_deployment_plan()
125
126         # Configure elements per XML data
127         super(TestbedController, self).do_preconfigure()
128
129     def do_resource_discovery(self):
130         to_provision = self._to_provision = set()
131         
132         reserved = set(self._blacklist)
133         for guid, node in self._elements.iteritems():
134             if isinstance(node, self._node.Node) and node._node_id is not None:
135                 reserved.add(node._node_id)
136         
137         # Initial algo:
138         #   look for perfectly defined nodes
139         #   (ie: those with only one candidate)
140         for guid, node in self._elements.iteritems():
141             if isinstance(node, self._node.Node) and node._node_id is None:
142                 # Try existing nodes first
143                 # If we have only one candidate, simply use it
144                 candidates = node.find_candidates(
145                     filter_slice_id = self.slice_id)
146                 candidates -= reserved
147                 if len(candidates) == 1:
148                     node_id = iter(candidates).next()
149                     node.assign_node_id(node_id)
150                     reserved.add(node_id)
151                 elif not candidates:
152                     # Try again including unassigned nodes
153                     candidates = node.find_candidates()
154                     if len(candidates) > 1:
155                         continue
156                     if len(candidates) == 1:
157                         node_id = iter(candidates).next()
158                         node.assign_node_id(node_id)
159                         to_provision.add(node_id)
160                         reserved.add(node_id)
161                     elif not candidates:
162                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
163                             node.make_filter_description())
164         
165         # Now do the backtracking search for a suitable solution
166         # First with existing slice nodes
167         reqs = []
168         nodes = []
169         for guid, node in self._elements.iteritems():
170             if isinstance(node, self._node.Node) and node._node_id is None:
171                 # Try existing nodes first
172                 # If we have only one candidate, simply use it
173                 candidates = node.find_candidates(
174                     filter_slice_id = self.slice_id)
175                 candidates -= reserved
176                 reqs.append(candidates)
177                 nodes.append(node)
178         
179         if nodes and reqs:
180             try:
181                 solution = resourcealloc.alloc(reqs)
182             except resourcealloc.ResourceAllocationError:
183                 # Failed, try again with all nodes
184                 reqs = []
185                 for node in nodes:
186                     candidates = node.find_candidates()
187                     reqs.append(candidates)
188                 
189                 solution = resourcealloc.alloc(reqs)
190                 to_provision.update(solution)
191             
192             # Do assign nodes
193             for node, node_id in zip(nodes, solution):
194                 node.assign_node_id(node_id)
195
196     def do_provisioning(self):
197         if self._to_provision:
198             # Add new nodes to the slice
199             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
200             new_nodes = list(set(cur_nodes) | self._to_provision)
201             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
202
203         # cleanup
204         del self._to_provision
205     
206     def do_wait_nodes(self):
207         for guid, node in self._elements.iteritems():
208             if isinstance(node, self._node.Node):
209                 # Just inject configuration stuff
210                 node.home_path = "nepi-node-%s" % (guid,)
211                 node.ident_path = self.sliceSSHKey
212                 node.slicename = self.slicename
213             
214                 # Show the magic
215                 print "PlanetLab Node", guid, "configured at", node.hostname
216             
217         try:
218             for guid, node in self._elements.iteritems():
219                 if isinstance(node, self._node.Node):
220                     print "Waiting for Node", guid, "configured at", node.hostname,
221                     sys.stdout.flush()
222                     
223                     node.wait_provisioning()
224                     
225                     print "READY"
226         except self._node.UnresponsiveNodeError:
227             # Uh... 
228             print "UNRESPONSIVE"
229             
230             # Mark all dead nodes (which are unresponsive) on the blacklist
231             # and re-raise
232             for guid, node in self._elements.iteritems():
233                 if isinstance(node, self._node.Node):
234                     if not node.is_alive():
235                         print "Blacklisting", node.hostname, "for unresponsiveness"
236                         self._blacklist.add(node._node_id)
237                         node.unassign_node()
238             raise
239     
240     def do_spanning_deployment_plan(self):
241         # Create application groups by collecting all applications
242         # based on their hash - the hash should contain everything that
243         # defines them and the platform they're built
244         
245         def dephash(app):
246             return (
247                 frozenset((app.depends or "").split(' ')),
248                 frozenset((app.sources or "").split(' ')),
249                 app.build,
250                 app.install,
251                 app.node.architecture,
252                 app.node.operatingSystem,
253                 app.node.pl_distro,
254             )
255         
256         depgroups = collections.defaultdict(list)
257         
258         for element in self._elements.itervalues():
259             if isinstance(element, self._app.Dependency):
260                 depgroups[dephash(element)].append(element)
261         
262         # Set up spanning deployment for those applications that
263         # have been deployed in several nodes.
264         for dh, group in depgroups.iteritems():
265             if len(group) > 1:
266                 # Pick root (deterministically)
267                 root = min(group, key=lambda app:app.node.hostname)
268                 
269                 # Obtain all IPs in numeric format
270                 # (which means faster distance computations)
271                 for dep in group:
272                     dep._ip = socket.gethostbyname(dep.node.hostname)
273                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
274                 
275                 # Compute plan
276                 # NOTE: the plan is an iterator
277                 plan = mst.mst(
278                     group,
279                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
280                     root = root,
281                     maxbranching = 2)
282                 
283                 # Re-sign private key
284                 try:
285                     tempprk, temppuk, tmppass = self._make_temp_private_key()
286                 except TempKeyError:
287                     continue
288                 
289                 # Set up slaves
290                 plan = list(plan)
291                 for slave, master in plan:
292                     slave.set_master(master)
293                     slave.install_keys(tempprk, temppuk, tmppass)
294                     
295         # We don't need the user's passphrase anymore
296         self.sliceSSHKeyPass = None
297     
298     def _make_temp_private_key(self):
299         # Get the user's key's passphrase
300         if not self.sliceSSHKeyPass:
301             if 'SSH_ASKPASS' in os.environ:
302                 proc = subprocess.Popen(
303                     [ os.environ['SSH_ASKPASS'],
304                       "Please type the passphrase for the %s SSH identity file. "
305                       "The passphrase will be used to re-cipher the identity file with "
306                       "a random 256-bit key for automated chain deployment on the "
307                       "%s PlanetLab slice" % ( 
308                         os.path.basename(self.sliceSSHKey), 
309                         self.slicename
310                     ) ],
311                     stdin = open("/dev/null"),
312                     stdout = subprocess.PIPE,
313                     stderr = subprocess.PIPE)
314                 out,err = proc.communicate()
315                 self.sliceSSHKeyPass = out.strip()
316         
317         if not self.sliceSSHKeyPass:
318             raise TempKeyError
319         
320         # Create temporary key files
321         prk = tempfile.NamedTemporaryFile(
322             dir = self.root_directory,
323             prefix = "pl_deploy_tmpk_",
324             suffix = "")
325
326         puk = tempfile.NamedTemporaryFile(
327             dir = self.root_directory,
328             prefix = "pl_deploy_tmpk_",
329             suffix = ".pub")
330             
331         # Create secure 256-bits temporary passphrase
332         passphrase = ''.join(map(chr,[rng.randint(0,255) 
333                                       for rng in (random.SystemRandom(),)
334                                       for i in xrange(32)] )).encode("hex")
335                 
336         # Copy keys
337         oprk = open(self.sliceSSHKey, "rb")
338         opuk = open(self.sliceSSHKey+".pub", "rb")
339         shutil.copymode(oprk.name, prk.name)
340         shutil.copymode(opuk.name, puk.name)
341         shutil.copyfileobj(oprk, prk)
342         shutil.copyfileobj(opuk, puk)
343         prk.flush()
344         puk.flush()
345         oprk.close()
346         opuk.close()
347         
348         # A descriptive comment
349         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
350         
351         # Recipher keys
352         proc = subprocess.Popen(
353             ["ssh-keygen", "-p",
354              "-f", prk.name,
355              "-P", self.sliceSSHKeyPass,
356              "-N", passphrase,
357              "-C", comment ],
358             stdout = subprocess.PIPE,
359             stderr = subprocess.PIPE,
360             stdin = subprocess.PIPE
361         )
362         out, err = proc.communicate()
363         
364         if err:
365             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
366                 out, err)
367         
368         prk.seek(0)
369         puk.seek(0)
370         
371         # Change comment on public key
372         puklines = puk.readlines()
373         puklines[0] = puklines[0].split(' ')
374         puklines[0][-1] = comment+'\n'
375         puklines[0] = ' '.join(puklines[0])
376         puk.seek(0)
377         puk.truncate()
378         puk.writelines(puklines)
379         del puklines
380         puk.flush()
381         
382         return prk, puk, passphrase
383     
384     def set(self, guid, name, value, time = TIME_NOW):
385         super(TestbedController, self).set(guid, name, value, time)
386         # TODO: take on account schedule time for the task
387         element = self._elements[guid]
388         if element:
389             setattr(element, name, value)
390
391             if hasattr(element, 'refresh'):
392                 # invoke attribute refresh hook
393                 element.refresh()
394
395     def get(self, guid, name, time = TIME_NOW):
396         value = super(TestbedController, self).get(guid, name, time)
397         # TODO: take on account schedule time for the task
398         factory_id = self._create[guid]
399         factory = self._factories[factory_id]
400         if factory.box_attributes.is_attribute_design_only(name):
401             return value
402         element = self._elements.get(guid)
403         try:
404             return getattr(element, name)
405         except KeyError, AttributeError:
406             return value
407
408     def get_address(self, guid, index, attribute='Address'):
409         index = int(index)
410
411         # try the real stuff
412         iface = self._elements.get(guid)
413         if iface and index == 0:
414             if attribute == 'Address':
415                 return iface.address
416             elif attribute == 'NetPrefix':
417                 return iface.netprefix
418             elif attribute == 'Broadcast':
419                 return iface.broadcast
420
421         # if all else fails, query box
422         return super(TestbedController, self).get_address(guid, index, attribute)
423
424     def action(self, time, guid, action):
425         raise NotImplementedError
426
427     def shutdown(self):
428         for trace in self._traces.itervalues():
429             trace.close()
430         for element in self._elements.itervalues():
431             # invoke cleanup hooks
432             if hasattr(element, 'cleanup'):
433                 element.cleanup()
434         for element in self._elements.itervalues():
435             # invoke destroy hooks
436             if hasattr(element, 'destroy'):
437                 element.destroy()
438         self._elements.clear()
439         self._traces.clear()
440
441     def trace(self, guid, trace_id, attribute='value'):
442         app = self._elements[guid]
443
444         if attribute == 'value':
445             path = app.sync_trace(self.home_directory, trace_id)
446             if path:
447                 fd = open(path, "r")
448                 content = fd.read()
449                 fd.close()
450             else:
451                 content = None
452         elif attribute == 'path':
453             content = app.remote_trace_path(trace_id)
454         elif attribute == 'size':
455             # TODO
456             raise NotImplementedError
457         else:
458             content = None
459         return content
460
461     def follow_trace(self, trace_id, trace):
462         self._traces[trace_id] = trace
463     
464     def _make_generic(self, parameters, kind):
465         app = kind(self.plapi)
466
467         # Note: there is 1-to-1 correspondence between attribute names
468         #   If that changes, this has to change as well
469         for attr,val in parameters.iteritems():
470             setattr(app, attr, val)
471
472         return app
473
474     def _make_node(self, parameters):
475         node = self._make_generic(parameters, self._node.Node)
476
477         # If emulation is enabled, we automatically need
478         # some vsys interfaces and packages
479         if node.emulation:
480             node.required_vsys.add('ipfw-be')
481             node.required_packages.add('ipfwslice')
482
483         return node
484
485     def _make_node_iface(self, parameters):
486         return self._make_generic(parameters, self._interfaces.NodeIface)
487
488     def _make_tun_iface(self, parameters):
489         return self._make_generic(parameters, self._interfaces.TunIface)
490
491     def _make_tap_iface(self, parameters):
492         return self._make_generic(parameters, self._interfaces.TapIface)
493
494     def _make_netpipe(self, parameters):
495         return self._make_generic(parameters, self._interfaces.NetPipe)
496
497     def _make_internet(self, parameters):
498         return self._make_generic(parameters, self._interfaces.Internet)
499
500     def _make_application(self, parameters):
501         return self._make_generic(parameters, self._app.Application)
502
503     def _make_dependency(self, parameters):
504         return self._make_generic(parameters, self._app.Dependency)
505
506     def _make_nepi_dependency(self, parameters):
507         return self._make_generic(parameters, self._app.NepiDependency)
508
509     def _make_ns3_dependency(self, parameters):
510         return self._make_generic(parameters, self._app.NS3Dependency)
511