Added attribute MaxAddresses for interface factories to design
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 import sys
11 import os
12 import os.path
13 import time
14 import resourcealloc
15 import collections
16 import operator
17 import functools
18 import socket
19 import struct
20 import tempfile
21 import subprocess
22 import random
23 import shutil
24
25 class TempKeyError(Exception):
26     pass
27
28 class TestbedController(testbed_impl.TestbedController):
29     def __init__(self, testbed_version):
30         super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
31         self._home_directory = None
32         self.slicename = None
33         self._traces = dict()
34
35         import node, interfaces, application
36         self._node = node
37         self._interfaces = interfaces
38         self._app = application
39         
40         self._blacklist = set()
41         self._just_provisioned = set()
42         
43         self._load_blacklist()
44
45     @property
46     def home_directory(self):
47         return self._home_directory
48
49     @property
50     def plapi(self):
51         if not hasattr(self, '_plapi'):
52             import plcapi
53
54             if self.authUser:
55                 self._plapi = plcapi.PLCAPI(
56                     username = self.authUser,
57                     password = self.authString,
58                     hostname = self.plcHost,
59                     urlpattern = self.plcUrl
60                     )
61             else:
62                 # anonymous access - may not be enough for much
63                 self._plapi = plcapi.PLCAPI()
64         return self._plapi
65
66     @property
67     def slice_id(self):
68         if not hasattr(self, '_slice_id'):
69             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
70             if slices:
71                 self._slice_id = slices[0]['slice_id']
72             else:
73                 # If it wasn't found, don't remember this failure, keep trying
74                 return None
75         return self._slice_id
76     
77     def _load_blacklist(self):
78         blpath = environ.homepath('plblacklist')
79         
80         try:
81             bl = open(blpath, "r")
82         except:
83             self._blacklist = set()
84             return
85             
86         try:
87             self._blacklist = set(
88                 map(int,
89                     map(str.strip, bl.readlines())
90                 )
91             )
92         finally:
93             bl.close()
94     
95     def _save_blacklist(self):
96         blpath = environ.homepath('plblacklist')
97         bl = open(blpath, "w")
98         try:
99             bl.writelines(
100                 map('%s\n'.__mod__, self._blacklist))
101         finally:
102             bl.close()
103     
104     def do_setup(self):
105         self._home_directory = self._attributes.\
106             get_attribute_value("homeDirectory")
107         self.slicename = self._attributes.\
108             get_attribute_value("slice")
109         self.authUser = self._attributes.\
110             get_attribute_value("authUser")
111         self.authString = self._attributes.\
112             get_attribute_value("authPass")
113         self.sliceSSHKey = self._attributes.\
114             get_attribute_value("sliceSSHKey")
115         self.sliceSSHKeyPass = None
116         self.plcHost = self._attributes.\
117             get_attribute_value("plcHost")
118         self.plcUrl = self._attributes.\
119             get_attribute_value("plcUrl")
120         super(TestbedController, self).do_setup()
121
122     def do_post_asynclaunch(self, guid):
123         # Dependencies were launched asynchronously,
124         # so wait for them
125         dep = self._elements[guid]
126         if isinstance(dep, self._app.Dependency):
127             dep.async_setup_wait()
128     
129     # Two-phase configuration for asynchronous launch
130     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
131     do_poststep_configure = staticmethod(do_post_asynclaunch)
132
133     def do_preconfigure(self):
134         while True:
135             # Perform resource discovery if we don't have
136             # specific resources assigned yet
137             self.do_resource_discovery()
138
139             # Create PlanetLab slivers
140             self.do_provisioning()
141             
142             try:
143                 # Wait for provisioning
144                 self.do_wait_nodes()
145                 
146                 # Okkey...
147                 break
148             except self._node.UnresponsiveNodeError:
149                 # Oh... retry...
150                 pass
151         
152         # Plan application deployment
153         self.do_spanning_deployment_plan()
154
155         # Configure elements per XML data
156         super(TestbedController, self).do_preconfigure()
157
158     def do_resource_discovery(self):
159         to_provision = self._to_provision = set()
160         
161         reserved = set(self._blacklist)
162         for guid, node in self._elements.iteritems():
163             if isinstance(node, self._node.Node) and node._node_id is not None:
164                 reserved.add(node._node_id)
165         
166         # Initial algo:
167         #   look for perfectly defined nodes
168         #   (ie: those with only one candidate)
169         for guid, node in self._elements.iteritems():
170             if isinstance(node, self._node.Node) and node._node_id is None:
171                 # Try existing nodes first
172                 # If we have only one candidate, simply use it
173                 candidates = node.find_candidates(
174                     filter_slice_id = self.slice_id)
175                 candidates -= reserved
176                 if len(candidates) == 1:
177                     node_id = iter(candidates).next()
178                     node.assign_node_id(node_id)
179                     reserved.add(node_id)
180                 elif not candidates:
181                     # Try again including unassigned nodes
182                     candidates = node.find_candidates()
183                     candidates -= reserved
184                     if len(candidates) > 1:
185                         continue
186                     if len(candidates) == 1:
187                         node_id = iter(candidates).next()
188                         node.assign_node_id(node_id)
189                         to_provision.add(node_id)
190                         reserved.add(node_id)
191                     elif not candidates:
192                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
193                             node.make_filter_description())
194         
195         # Now do the backtracking search for a suitable solution
196         # First with existing slice nodes
197         reqs = []
198         nodes = []
199         for guid, node in self._elements.iteritems():
200             if isinstance(node, self._node.Node) and node._node_id is None:
201                 # Try existing nodes first
202                 # If we have only one candidate, simply use it
203                 candidates = node.find_candidates(
204                     filter_slice_id = self.slice_id)
205                 candidates -= reserved
206                 reqs.append(candidates)
207                 nodes.append(node)
208         
209         if nodes and reqs:
210             try:
211                 solution = resourcealloc.alloc(reqs)
212             except resourcealloc.ResourceAllocationError:
213                 # Failed, try again with all nodes
214                 reqs = []
215                 for node in nodes:
216                     candidates = node.find_candidates()
217                     reqs.append(candidates)
218                 
219                 solution = resourcealloc.alloc(reqs)
220                 to_provision.update(solution)
221             
222             # Do assign nodes
223             for node, node_id in zip(nodes, solution):
224                 node.assign_node_id(node_id)
225
226     def do_provisioning(self):
227         if self._to_provision:
228             # Add new nodes to the slice
229             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
230             new_nodes = list(set(cur_nodes) | self._to_provision)
231             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
232
233         # cleanup
234         self._just_provisioned = self._to_provision
235         del self._to_provision
236     
237     def do_wait_nodes(self):
238         for guid, node in self._elements.iteritems():
239             if isinstance(node, self._node.Node):
240                 # Just inject configuration stuff
241                 node.home_path = "nepi-node-%s" % (guid,)
242                 node.ident_path = self.sliceSSHKey
243                 node.slicename = self.slicename
244             
245                 # Show the magic
246                 print "PlanetLab Node", guid, "configured at", node.hostname
247             
248         try:
249             for guid, node in self._elements.iteritems():
250                 if isinstance(node, self._node.Node):
251                     print "Waiting for Node", guid, "configured at", node.hostname,
252                     sys.stdout.flush()
253                     
254                     node.wait_provisioning(
255                         (20*60 if node._node_id in self._just_provisioned else 60)
256                     )
257                     
258                     print "READY"
259         except self._node.UnresponsiveNodeError:
260             # Uh... 
261             print "UNRESPONSIVE"
262             
263             # Mark all dead nodes (which are unresponsive) on the blacklist
264             # and re-raise
265             for guid, node in self._elements.iteritems():
266                 if isinstance(node, self._node.Node):
267                     if not node.is_alive():
268                         print "Blacklisting", node.hostname, "for unresponsiveness"
269                         self._blacklist.add(node._node_id)
270                         node.unassign_node()
271             
272             try:
273                 self._save_blacklist()
274             except:
275                 # not important...
276                 import traceback
277                 traceback.print_exc()
278             
279             raise
280     
281     def do_spanning_deployment_plan(self):
282         # Create application groups by collecting all applications
283         # based on their hash - the hash should contain everything that
284         # defines them and the platform they're built
285         
286         def dephash(app):
287             return (
288                 frozenset((app.depends or "").split(' ')),
289                 frozenset((app.sources or "").split(' ')),
290                 app.build,
291                 app.install,
292                 app.node.architecture,
293                 app.node.operatingSystem,
294                 app.node.pl_distro,
295             )
296         
297         depgroups = collections.defaultdict(list)
298         
299         for element in self._elements.itervalues():
300             if isinstance(element, self._app.Dependency):
301                 depgroups[dephash(element)].append(element)
302         
303         # Set up spanning deployment for those applications that
304         # have been deployed in several nodes.
305         for dh, group in depgroups.iteritems():
306             if len(group) > 1:
307                 # Pick root (deterministically)
308                 root = min(group, key=lambda app:app.node.hostname)
309                 
310                 # Obtain all IPs in numeric format
311                 # (which means faster distance computations)
312                 for dep in group:
313                     dep._ip = socket.gethostbyname(dep.node.hostname)
314                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
315                 
316                 # Compute plan
317                 # NOTE: the plan is an iterator
318                 plan = mst.mst(
319                     group,
320                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
321                     root = root,
322                     maxbranching = 2)
323                 
324                 # Re-sign private key
325                 try:
326                     tempprk, temppuk, tmppass = self._make_temp_private_key()
327                 except TempKeyError:
328                     continue
329                 
330                 # Set up slaves
331                 plan = list(plan)
332                 for slave, master in plan:
333                     slave.set_master(master)
334                     slave.install_keys(tempprk, temppuk, tmppass)
335                     
336         # We don't need the user's passphrase anymore
337         self.sliceSSHKeyPass = None
338     
339     def _make_temp_private_key(self):
340         # Get the user's key's passphrase
341         if not self.sliceSSHKeyPass:
342             if 'SSH_ASKPASS' in os.environ:
343                 proc = subprocess.Popen(
344                     [ os.environ['SSH_ASKPASS'],
345                       "Please type the passphrase for the %s SSH identity file. "
346                       "The passphrase will be used to re-cipher the identity file with "
347                       "a random 256-bit key for automated chain deployment on the "
348                       "%s PlanetLab slice" % ( 
349                         os.path.basename(self.sliceSSHKey), 
350                         self.slicename
351                     ) ],
352                     stdin = open("/dev/null"),
353                     stdout = subprocess.PIPE,
354                     stderr = subprocess.PIPE)
355                 out,err = proc.communicate()
356                 self.sliceSSHKeyPass = out.strip()
357         
358         if not self.sliceSSHKeyPass:
359             raise TempKeyError
360         
361         # Create temporary key files
362         prk = tempfile.NamedTemporaryFile(
363             dir = self.root_directory,
364             prefix = "pl_deploy_tmpk_",
365             suffix = "")
366
367         puk = tempfile.NamedTemporaryFile(
368             dir = self.root_directory,
369             prefix = "pl_deploy_tmpk_",
370             suffix = ".pub")
371             
372         # Create secure 256-bits temporary passphrase
373         passphrase = ''.join(map(chr,[rng.randint(0,255) 
374                                       for rng in (random.SystemRandom(),)
375                                       for i in xrange(32)] )).encode("hex")
376                 
377         # Copy keys
378         oprk = open(self.sliceSSHKey, "rb")
379         opuk = open(self.sliceSSHKey+".pub", "rb")
380         shutil.copymode(oprk.name, prk.name)
381         shutil.copymode(opuk.name, puk.name)
382         shutil.copyfileobj(oprk, prk)
383         shutil.copyfileobj(opuk, puk)
384         prk.flush()
385         puk.flush()
386         oprk.close()
387         opuk.close()
388         
389         # A descriptive comment
390         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
391         
392         # Recipher keys
393         proc = subprocess.Popen(
394             ["ssh-keygen", "-p",
395              "-f", prk.name,
396              "-P", self.sliceSSHKeyPass,
397              "-N", passphrase,
398              "-C", comment ],
399             stdout = subprocess.PIPE,
400             stderr = subprocess.PIPE,
401             stdin = subprocess.PIPE
402         )
403         out, err = proc.communicate()
404         
405         if err:
406             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
407                 out, err)
408         
409         prk.seek(0)
410         puk.seek(0)
411         
412         # Change comment on public key
413         puklines = puk.readlines()
414         puklines[0] = puklines[0].split(' ')
415         puklines[0][-1] = comment+'\n'
416         puklines[0] = ' '.join(puklines[0])
417         puk.seek(0)
418         puk.truncate()
419         puk.writelines(puklines)
420         del puklines
421         puk.flush()
422         
423         return prk, puk, passphrase
424     
425     def set(self, guid, name, value, time = TIME_NOW):
426         super(TestbedController, self).set(guid, name, value, time)
427         # TODO: take on account schedule time for the task
428         element = self._elements[guid]
429         if element:
430             setattr(element, name, value)
431
432             if hasattr(element, 'refresh'):
433                 # invoke attribute refresh hook
434                 element.refresh()
435
436     def get(self, guid, name, time = TIME_NOW):
437         value = super(TestbedController, self).get(guid, name, time)
438         # TODO: take on account schedule time for the task
439         factory_id = self._create[guid]
440         factory = self._factories[factory_id]
441         if factory.box_attributes.is_attribute_design_only(name):
442             return value
443         element = self._elements.get(guid)
444         try:
445             return getattr(element, name)
446         except KeyError, AttributeError:
447             return value
448
449     def get_address(self, guid, index, attribute='Address'):
450         index = int(index)
451
452         # try the real stuff
453         iface = self._elements.get(guid)
454         if iface and index == 0:
455             if attribute == 'Address':
456                 return iface.address
457             elif attribute == 'NetPrefix':
458                 return iface.netprefix
459             elif attribute == 'Broadcast':
460                 return iface.broadcast
461
462         # if all else fails, query box
463         return super(TestbedController, self).get_address(guid, index, attribute)
464
465     def action(self, time, guid, action):
466         raise NotImplementedError
467
468     def shutdown(self):
469         for trace in self._traces.itervalues():
470             trace.close()
471         for element in self._elements.itervalues():
472             # invoke cleanup hooks
473             if hasattr(element, 'cleanup'):
474                 element.cleanup()
475         for element in self._elements.itervalues():
476             # invoke destroy hooks
477             if hasattr(element, 'destroy'):
478                 element.destroy()
479         self._elements.clear()
480         self._traces.clear()
481
482     def trace(self, guid, trace_id, attribute='value'):
483         app = self._elements[guid]
484
485         if attribute == 'value':
486             path = app.sync_trace(self.home_directory, trace_id)
487             if path:
488                 fd = open(path, "r")
489                 content = fd.read()
490                 fd.close()
491             else:
492                 content = None
493         elif attribute == 'path':
494             content = app.remote_trace_path(trace_id)
495         else:
496             content = None
497         return content
498
499     def follow_trace(self, trace_id, trace):
500         self._traces[trace_id] = trace
501     
502     def _make_generic(self, parameters, kind):
503         app = kind(self.plapi)
504
505         # Note: there is 1-to-1 correspondence between attribute names
506         #   If that changes, this has to change as well
507         for attr,val in parameters.iteritems():
508             setattr(app, attr, val)
509
510         return app
511
512     def _make_node(self, parameters):
513         node = self._make_generic(parameters, self._node.Node)
514
515         # If emulation is enabled, we automatically need
516         # some vsys interfaces and packages
517         if node.emulation:
518             node.required_vsys.add('ipfw-be')
519             node.required_packages.add('ipfwslice')
520
521         return node
522
523     def _make_node_iface(self, parameters):
524         return self._make_generic(parameters, self._interfaces.NodeIface)
525
526     def _make_tun_iface(self, parameters):
527         return self._make_generic(parameters, self._interfaces.TunIface)
528
529     def _make_tap_iface(self, parameters):
530         return self._make_generic(parameters, self._interfaces.TapIface)
531
532     def _make_netpipe(self, parameters):
533         return self._make_generic(parameters, self._interfaces.NetPipe)
534
535     def _make_internet(self, parameters):
536         return self._make_generic(parameters, self._interfaces.Internet)
537
538     def _make_application(self, parameters):
539         return self._make_generic(parameters, self._app.Application)
540
541     def _make_dependency(self, parameters):
542         return self._make_generic(parameters, self._app.Dependency)
543
544     def _make_nepi_dependency(self, parameters):
545         return self._make_generic(parameters, self._app.NepiDependency)
546
547     def _make_ns3_dependency(self, parameters):
548         return self._make_generic(parameters, self._app.NS3Dependency)
549