Merging with HEAD
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 import os
10 import os.path
11 import time
12 import resourcealloc
13 import collections
14 import operator
15 import functools
16 import socket
17 import struct
18 import tempfile
19 import subprocess
20 import random
21 import shutil
22
23 from nepi.util.constants import TESTBED_STATUS_CONFIGURED
24
25 class TempKeyError(Exception):
26     pass
27
28 class TestbedController(testbed_impl.TestbedController):
29     def __init__(self, testbed_version):
30         super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
31         self._home_directory = None
32         self.slicename = None
33         self._traces = dict()
34
35         import node, interfaces, application
36         self._node = node
37         self._interfaces = interfaces
38         self._app = application
39
40     @property
41     def home_directory(self):
42         return self._home_directory
43
44     @property
45     def plapi(self):
46         if not hasattr(self, '_plapi'):
47             import plcapi
48
49             if self.authUser:
50                 self._plapi = plcapi.PLCAPI(
51                     username = self.authUser,
52                     password = self.authString,
53                     hostname = self.plcHost,
54                     urlpattern = self.plcUrl
55                     )
56             else:
57                 # anonymous access - may not be enough for much
58                 self._plapi = plcapi.PLCAPI()
59         return self._plapi
60
61     @property
62     def slice_id(self):
63         if not hasattr(self, '_slice_id'):
64             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
65             if slices:
66                 self._slice_id = slices[0]['slice_id']
67             else:
68                 # If it wasn't found, don't remember this failure, keep trying
69                 return None
70         return self._slice_id
71
72     def do_setup(self):
73         self._home_directory = self._attributes.\
74             get_attribute_value("homeDirectory")
75         self.slicename = self._attributes.\
76             get_attribute_value("slice")
77         self.authUser = self._attributes.\
78             get_attribute_value("authUser")
79         self.authString = self._attributes.\
80             get_attribute_value("authPass")
81         self.sliceSSHKey = self._attributes.\
82             get_attribute_value("sliceSSHKey")
83         self.sliceSSHKeyPass = None
84         self.plcHost = self._attributes.\
85             get_attribute_value("plcHost")
86         self.plcUrl = self._attributes.\
87             get_attribute_value("plcUrl")
88         super(TestbedController, self).do_setup()
89
90     def do_post_asynclaunch(self, guid):
91         # Dependencies were launched asynchronously,
92         # so wait for them
93         dep = self._elements[guid]
94         if isinstance(dep, self._app.Dependency):
95             dep.async_setup_wait()
96     
97     # Two-phase configuration for asynchronous launch
98     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
99     do_poststep_configure = staticmethod(do_post_asynclaunch)
100
101     def do_preconfigure(self):
102         # Perform resource discovery if we don't have
103         # specific resources assigned yet
104         self.do_resource_discovery()
105
106         # Create PlanetLab slivers
107         self.do_provisioning()
108         
109         # Plan application deployment
110         self.do_spanning_deployment_plan()
111
112         # Configure elements per XML data
113         super(TestbedController, self).do_preconfigure()
114
115     def do_resource_discovery(self):
116         to_provision = self._to_provision = set()
117         
118         # Initial algo:
119         #   look for perfectly defined nodes
120         #   (ie: those with only one candidate)
121         for guid, node in self._elements.iteritems():
122             if isinstance(node, self._node.Node) and node._node_id is None:
123                 # Try existing nodes first
124                 # If we have only one candidate, simply use it
125                 candidates = node.find_candidates(
126                     filter_slice_id = self.slice_id)
127                 if len(candidates) == 1:
128                     node.assign_node_id(iter(candidates).next())
129                 else:
130                     # Try again including unassigned nodes
131                     candidates = node.find_candidates()
132                     if len(candidates) > 1:
133                         continue
134                     if len(candidates) == 1:
135                         node_id = iter(candidates).next()
136                         node.assign_node_id(node_id)
137                         to_provision.add(node_id)
138                     elif not candidates:
139                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
140                             node.make_filter_description())
141         
142         # Now do the backtracking search for a suitable solution
143         # First with existing slice nodes
144         reqs = []
145         nodes = []
146         for guid, node in self._elements.iteritems():
147             if isinstance(node, self._node.Node) and node._node_id is None:
148                 # Try existing nodes first
149                 # If we have only one candidate, simply use it
150                 candidates = node.find_candidates(
151                     filter_slice_id = self.slice_id)
152                 reqs.append(candidates)
153                 nodes.append(node)
154         
155         if nodes and reqs:
156             try:
157                 solution = resourcealloc.alloc(reqs)
158             except resourcealloc.ResourceAllocationError:
159                 # Failed, try again with all nodes
160                 reqs = []
161                 for node in nodes:
162                     candidates = node.find_candidates()
163                     reqs.append(candidates)
164                 
165                 solution = resourcealloc.alloc(reqs)
166                 to_provision.update(solution)
167             
168             # Do assign nodes
169             for node, node_id in zip(nodes, solution):
170                 node.assign_node_id(node_id)
171
172     def do_provisioning(self):
173         if self._to_provision:
174             # Add new nodes to the slice
175             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
176             new_nodes = list(set(cur_nodes) | self._to_provision)
177             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
178
179         # cleanup
180         del self._to_provision
181     
182     def do_spanning_deployment_plan(self):
183         # Create application groups by collecting all applications
184         # based on their hash - the hash should contain everything that
185         # defines them and the platform they're built
186         
187         def dephash(app):
188             return (
189                 frozenset((app.depends or "").split(' ')),
190                 frozenset((app.sources or "").split(' ')),
191                 app.build,
192                 app.install,
193                 app.node.architecture,
194                 app.node.operatingSystem,
195                 app.node.pl_distro,
196             )
197         
198         depgroups = collections.defaultdict(list)
199         
200         for element in self._elements.itervalues():
201             if isinstance(element, self._app.Dependency):
202                 depgroups[dephash(element)].append(element)
203         
204         # Set up spanning deployment for those applications that
205         # have been deployed in several nodes.
206         for dh, group in depgroups.iteritems():
207             if len(group) > 1:
208                 # Pick root (deterministically)
209                 root = min(group, key=lambda app:app.node.hostname)
210                 
211                 # Obtain all IPs in numeric format
212                 # (which means faster distance computations)
213                 for dep in group:
214                     dep._ip = socket.gethostbyname(dep.node.hostname)
215                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
216                 
217                 # Compute plan
218                 # NOTE: the plan is an iterator
219                 plan = mst.mst(
220                     group,
221                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
222                     root = root,
223                     maxbranching = 2)
224                 
225                 # Re-sign private key
226                 try:
227                     tempprk, temppuk, tmppass = self._make_temp_private_key()
228                 except TempKeyError:
229                     continue
230                 
231                 # Set up slaves
232                 plan = list(plan)
233                 for slave, master in plan:
234                     slave.set_master(master)
235                     slave.install_keys(tempprk, temppuk, tmppass)
236                     
237         # We don't need the user's passphrase anymore
238         self.sliceSSHKeyPass = None
239     
240     def _make_temp_private_key(self):
241         # Get the user's key's passphrase
242         if not self.sliceSSHKeyPass:
243             if 'SSH_ASKPASS' in os.environ:
244                 proc = subprocess.Popen(
245                     [ os.environ['SSH_ASKPASS'],
246                       "Please type the passphrase for the %s SSH identity file. "
247                       "The passphrase will be used to re-cipher the identity file with "
248                       "a random 256-bit key for automated chain deployment on the "
249                       "%s PlanetLab slice" % ( 
250                         os.path.basename(self.sliceSSHKey), 
251                         self.slicename
252                     ) ],
253                     stdin = open("/dev/null"),
254                     stdout = subprocess.PIPE,
255                     stderr = subprocess.PIPE)
256                 out,err = proc.communicate()
257                 self.sliceSSHKeyPass = out.strip()
258         
259         if not self.sliceSSHKeyPass:
260             raise TempKeyError
261         
262         # Create temporary key files
263         prk = tempfile.NamedTemporaryFile(
264             dir = self.root_directory,
265             prefix = "pl_deploy_tmpk_",
266             suffix = "")
267
268         puk = tempfile.NamedTemporaryFile(
269             dir = self.root_directory,
270             prefix = "pl_deploy_tmpk_",
271             suffix = ".pub")
272             
273         # Create secure 256-bits temporary passphrase
274         passphrase = ''.join(map(chr,[rng.randint(0,255) 
275                                       for rng in (random.SystemRandom(),)
276                                       for i in xrange(32)] )).encode("hex")
277                 
278         # Copy keys
279         oprk = open(self.sliceSSHKey, "rb")
280         opuk = open(self.sliceSSHKey+".pub", "rb")
281         shutil.copymode(oprk.name, prk.name)
282         shutil.copymode(opuk.name, puk.name)
283         shutil.copyfileobj(oprk, prk)
284         shutil.copyfileobj(opuk, puk)
285         prk.flush()
286         puk.flush()
287         oprk.close()
288         opuk.close()
289         
290         # A descriptive comment
291         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
292         
293         # Recipher keys
294         proc = subprocess.Popen(
295             ["ssh-keygen", "-p",
296              "-f", prk.name,
297              "-P", self.sliceSSHKeyPass,
298              "-N", passphrase,
299              "-C", comment ],
300             stdout = subprocess.PIPE,
301             stderr = subprocess.PIPE,
302             stdin = subprocess.PIPE
303         )
304         out, err = proc.communicate()
305         
306         if err:
307             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
308                 out, err)
309         
310         prk.seek(0)
311         puk.seek(0)
312         
313         # Change comment on public key
314         puklines = puk.readlines()
315         puklines[0] = puklines[0].split(' ')
316         puklines[0][-1] = comment+'\n'
317         puklines[0] = ' '.join(puklines[0])
318         puk.seek(0)
319         puk.truncate()
320         puk.writelines(puklines)
321         del puklines
322         puk.flush()
323         
324         return prk, puk, passphrase
325     
326     def set(self, guid, name, value, time = TIME_NOW):
327         super(TestbedController, self).set(guid, name, value, time)
328         # TODO: take on account schedule time for the task
329         element = self._elements[guid]
330         if element:
331             setattr(element, name, value)
332
333             if hasattr(element, 'refresh'):
334                 # invoke attribute refresh hook
335                 element.refresh()
336
337     def get(self, guid, name, time = TIME_NOW):
338         value = super(TestbedController, self).get(guid, name, time)
339         # TODO: take on account schedule time for the task
340         factory_id = self._create[guid]
341         factory = self._factories[factory_id]
342         if factory.box_attributes.is_attribute_design_only(name):
343             return value
344         element = self._elements.get(guid)
345         try:
346             return getattr(element, name)
347         except KeyError, AttributeError:
348             return value
349
350     def get_address(self, guid, index, attribute='Address'):
351         index = int(index)
352
353         # try the real stuff
354         iface = self._elements.get(guid)
355         if iface and index == 0:
356             if attribute == 'Address':
357                 return iface.address
358             elif attribute == 'NetPrefix':
359                 return iface.netprefix
360             elif attribute == 'Broadcast':
361                 return iface.broadcast
362
363         # if all else fails, query box
364         return super(TestbedController, self).get_address(guid, index, attribute)
365
366     def action(self, time, guid, action):
367         raise NotImplementedError
368
369     def shutdown(self):
370         for trace in self._traces.itervalues():
371             trace.close()
372         for element in self._elements.itervalues():
373             # invoke cleanup hooks
374             if hasattr(element, 'cleanup'):
375                 element.cleanup()
376         for element in self._elements.itervalues():
377             # invoke destroy hooks
378             if hasattr(element, 'destroy'):
379                 element.destroy()
380         self._elements.clear()
381         self._traces.clear()
382
383     def trace(self, guid, trace_id, attribute='value'):
384         app = self._elements[guid]
385
386         if attribute == 'value':
387             path = app.sync_trace(self.home_directory, trace_id)
388             if path:
389                 fd = open(path, "r")
390                 content = fd.read()
391                 fd.close()
392             else:
393                 content = None
394         elif attribute == 'path':
395             content = app.remote_trace_path(trace_id)
396         elif attribute == 'size':
397             # TODO
398             raise NotImplementedError
399         else:
400             content = None
401         return content
402
403     def follow_trace(self, trace_id, trace):
404         self._traces[trace_id] = trace
405     
406     def _make_generic(self, parameters, kind):
407         app = kind(self.plapi)
408
409         # Note: there is 1-to-1 correspondence between attribute names
410         #   If that changes, this has to change as well
411         for attr,val in parameters.iteritems():
412             setattr(app, attr, val)
413
414         return app
415
416     def _make_node(self, parameters):
417         node = self._make_generic(parameters, self._node.Node)
418
419         # If emulation is enabled, we automatically need
420         # some vsys interfaces and packages
421         if node.emulation:
422             node.required_vsys.add('ipfw-be')
423             node.required_packages.add('ipfwslice')
424
425         return node
426
427     def _make_node_iface(self, parameters):
428         return self._make_generic(parameters, self._interfaces.NodeIface)
429
430     def _make_tun_iface(self, parameters):
431         return self._make_generic(parameters, self._interfaces.TunIface)
432
433     def _make_tap_iface(self, parameters):
434         return self._make_generic(parameters, self._interfaces.TapIface)
435
436     def _make_netpipe(self, parameters):
437         return self._make_generic(parameters, self._interfaces.NetPipe)
438
439     def _make_internet(self, parameters):
440         return self._make_generic(parameters, self._interfaces.Internet)
441
442     def _make_application(self, parameters):
443         return self._make_generic(parameters, self._app.Application)
444
445     def _make_dependency(self, parameters):
446         return self._make_generic(parameters, self._app.Dependency)
447
448     def _make_nepi_dependency(self, parameters):
449         return self._make_generic(parameters, self._app.NepiDependency)
450
451     def _make_ns3_dependency(self, parameters):
452         return self._make_generic(parameters, self._app.NS3Dependency)
453