Ticket #69: tun/tap port base allows concurrent experiments in the same slice (if...
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
11 import sys
12 import os
13 import os.path
14 import time
15 import resourcealloc
16 import collections
17 import operator
18 import functools
19 import socket
20 import struct
21 import tempfile
22 import subprocess
23 import random
24 import shutil
25 import logging
26
27 class TempKeyError(Exception):
28     pass
29
30 class TestbedController(testbed_impl.TestbedController):
31     def __init__(self):
32         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
33         self._home_directory = None
34         self.slicename = None
35         self._traces = dict()
36
37         import node, interfaces, application
38         self._node = node
39         self._interfaces = interfaces
40         self._app = application
41         
42         self._blacklist = set()
43         self._just_provisioned = set()
44         
45         self._load_blacklist()
46         
47         self._logger = logging.getLogger('nepi.testbeds.planetlab')
48
49     @property
50     def home_directory(self):
51         return self._home_directory
52
53     @property
54     def plapi(self):
55         if not hasattr(self, '_plapi'):
56             import plcapi
57
58             if self.authUser:
59                 self._plapi = plcapi.PLCAPI(
60                     username = self.authUser,
61                     password = self.authString,
62                     hostname = self.plcHost,
63                     urlpattern = self.plcUrl
64                     )
65             else:
66                 # anonymous access - may not be enough for much
67                 self._plapi = plcapi.PLCAPI()
68         return self._plapi
69
70     @property
71     def slice_id(self):
72         if not hasattr(self, '_slice_id'):
73             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
74             if slices:
75                 self._slice_id = slices[0]['slice_id']
76             else:
77                 # If it wasn't found, don't remember this failure, keep trying
78                 return None
79         return self._slice_id
80     
81     def _load_blacklist(self):
82         blpath = environ.homepath('plblacklist')
83         
84         try:
85             bl = open(blpath, "r")
86         except:
87             self._blacklist = set()
88             return
89             
90         try:
91             self._blacklist = set(
92                 map(int,
93                     map(str.strip, bl.readlines())
94                 )
95             )
96         finally:
97             bl.close()
98     
99     def _save_blacklist(self):
100         blpath = environ.homepath('plblacklist')
101         bl = open(blpath, "w")
102         try:
103             bl.writelines(
104                 map('%s\n'.__mod__, self._blacklist))
105         finally:
106             bl.close()
107     
108     def do_setup(self):
109         self._home_directory = self._attributes.\
110             get_attribute_value("homeDirectory")
111         self.slicename = self._attributes.\
112             get_attribute_value("slice")
113         self.authUser = self._attributes.\
114             get_attribute_value("authUser")
115         self.authString = self._attributes.\
116             get_attribute_value("authPass")
117         self.sliceSSHKey = self._attributes.\
118             get_attribute_value("sliceSSHKey")
119         self.sliceSSHKeyPass = None
120         self.plcHost = self._attributes.\
121             get_attribute_value("plcHost")
122         self.plcUrl = self._attributes.\
123             get_attribute_value("plcUrl")
124         self.logLevel = self._attributes.\
125             get_attribute_value("plLogLevel")
126         self.tapPortBase = self._attributes.\
127             get_attribute_value("tapPortBase")
128         
129         self._logger.setLevel(getattr(logging,self.logLevel))
130         
131         super(TestbedController, self).do_setup()
132
133     def do_post_asynclaunch(self, guid):
134         # Dependencies were launched asynchronously,
135         # so wait for them
136         dep = self._elements[guid]
137         if isinstance(dep, self._app.Dependency):
138             dep.async_setup_wait()
139     
140     # Two-phase configuration for asynchronous launch
141     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
142     do_poststep_configure = staticmethod(do_post_asynclaunch)
143
144     def do_preconfigure(self):
145         while True:
146             # Perform resource discovery if we don't have
147             # specific resources assigned yet
148             self.do_resource_discovery()
149
150             # Create PlanetLab slivers
151             self.do_provisioning()
152             
153             try:
154                 # Wait for provisioning
155                 self.do_wait_nodes()
156                 
157                 # Okkey...
158                 break
159             except self._node.UnresponsiveNodeError:
160                 # Oh... retry...
161                 pass
162         
163         # Plan application deployment
164         self.do_spanning_deployment_plan()
165
166         # Configure elements per XML data
167         super(TestbedController, self).do_preconfigure()
168
169     def do_resource_discovery(self):
170         to_provision = self._to_provision = set()
171         
172         reserved = set(self._blacklist)
173         for guid, node in self._elements.iteritems():
174             if isinstance(node, self._node.Node) and node._node_id is not None:
175                 reserved.add(node._node_id)
176         
177         # Initial algo:
178         #   look for perfectly defined nodes
179         #   (ie: those with only one candidate)
180         for guid, node in self._elements.iteritems():
181             if isinstance(node, self._node.Node) and node._node_id is None:
182                 # Try existing nodes first
183                 # If we have only one candidate, simply use it
184                 candidates = node.find_candidates(
185                     filter_slice_id = self.slice_id)
186                 candidates -= reserved
187                 if len(candidates) == 1:
188                     node_id = iter(candidates).next()
189                     node.assign_node_id(node_id)
190                     reserved.add(node_id)
191                 elif not candidates:
192                     # Try again including unassigned nodes
193                     candidates = node.find_candidates()
194                     candidates -= reserved
195                     if len(candidates) > 1:
196                         continue
197                     if len(candidates) == 1:
198                         node_id = iter(candidates).next()
199                         node.assign_node_id(node_id)
200                         to_provision.add(node_id)
201                         reserved.add(node_id)
202                     elif not candidates:
203                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
204                             node.make_filter_description())
205         
206         # Now do the backtracking search for a suitable solution
207         # First with existing slice nodes
208         reqs = []
209         nodes = []
210         for guid, node in self._elements.iteritems():
211             if isinstance(node, self._node.Node) and node._node_id is None:
212                 # Try existing nodes first
213                 # If we have only one candidate, simply use it
214                 candidates = node.find_candidates(
215                     filter_slice_id = self.slice_id)
216                 candidates -= reserved
217                 reqs.append(candidates)
218                 nodes.append(node)
219         
220         if nodes and reqs:
221             try:
222                 solution = resourcealloc.alloc(reqs)
223             except resourcealloc.ResourceAllocationError:
224                 # Failed, try again with all nodes
225                 reqs = []
226                 for node in nodes:
227                     candidates = node.find_candidates()
228                     candidates -= reserved
229                     reqs.append(candidates)
230                 
231                 solution = resourcealloc.alloc(reqs)
232                 to_provision.update(solution)
233             
234             # Do assign nodes
235             for node, node_id in zip(nodes, solution):
236                 node.assign_node_id(node_id)
237
238     def do_provisioning(self):
239         if self._to_provision:
240             # Add new nodes to the slice
241             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
242             new_nodes = list(set(cur_nodes) | self._to_provision)
243             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
244
245         # cleanup
246         self._just_provisioned = self._to_provision
247         del self._to_provision
248     
249     def do_wait_nodes(self):
250         for guid, node in self._elements.iteritems():
251             if isinstance(node, self._node.Node):
252                 # Just inject configuration stuff
253                 node.home_path = "nepi-node-%s" % (guid,)
254                 node.ident_path = self.sliceSSHKey
255                 node.slicename = self.slicename
256             
257                 # Show the magic
258                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
259             
260         try:
261             for guid, node in self._elements.iteritems():
262                 if isinstance(node, self._node.Node):
263                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
264                     
265                     node.wait_provisioning(
266                         (20*60 if node._node_id in self._just_provisioned else 60)
267                     )
268                     
269                     self._logger.info("READY Node %s at %s", guid, node.hostname)
270         except self._node.UnresponsiveNodeError:
271             # Uh... 
272             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
273             
274             # Mark all dead nodes (which are unresponsive) on the blacklist
275             # and re-raise
276             for guid, node in self._elements.iteritems():
277                 if isinstance(node, self._node.Node):
278                     if not node.is_alive():
279                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
280                         self._blacklist.add(node._node_id)
281                         node.unassign_node()
282             
283             try:
284                 self._save_blacklist()
285             except:
286                 # not important...
287                 import traceback
288                 traceback.print_exc()
289             
290             raise
291     
292     def do_spanning_deployment_plan(self):
293         # Create application groups by collecting all applications
294         # based on their hash - the hash should contain everything that
295         # defines them and the platform they're built
296         
297         def dephash(app):
298             return (
299                 frozenset((app.depends or "").split(' ')),
300                 frozenset((app.sources or "").split(' ')),
301                 app.build,
302                 app.install,
303                 app.node.architecture,
304                 app.node.operatingSystem,
305                 app.node.pl_distro,
306             )
307         
308         depgroups = collections.defaultdict(list)
309         
310         for element in self._elements.itervalues():
311             if isinstance(element, self._app.Dependency):
312                 depgroups[dephash(element)].append(element)
313         
314         # Set up spanning deployment for those applications that
315         # have been deployed in several nodes.
316         for dh, group in depgroups.iteritems():
317             if len(group) > 1:
318                 # Pick root (deterministically)
319                 root = min(group, key=lambda app:app.node.hostname)
320                 
321                 # Obtain all IPs in numeric format
322                 # (which means faster distance computations)
323                 for dep in group:
324                     dep._ip = socket.gethostbyname(dep.node.hostname)
325                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
326                 
327                 # Compute plan
328                 # NOTE: the plan is an iterator
329                 plan = mst.mst(
330                     group,
331                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
332                     root = root,
333                     maxbranching = 2)
334                 
335                 # Re-sign private key
336                 try:
337                     tempprk, temppuk, tmppass = self._make_temp_private_key()
338                 except TempKeyError:
339                     continue
340                 
341                 # Set up slaves
342                 plan = list(plan)
343                 for slave, master in plan:
344                     slave.set_master(master)
345                     slave.install_keys(tempprk, temppuk, tmppass)
346                     
347         # We don't need the user's passphrase anymore
348         self.sliceSSHKeyPass = None
349     
350     def _make_temp_private_key(self):
351         # Get the user's key's passphrase
352         if not self.sliceSSHKeyPass:
353             if 'SSH_ASKPASS' in os.environ:
354                 proc = subprocess.Popen(
355                     [ os.environ['SSH_ASKPASS'],
356                       "Please type the passphrase for the %s SSH identity file. "
357                       "The passphrase will be used to re-cipher the identity file with "
358                       "a random 256-bit key for automated chain deployment on the "
359                       "%s PlanetLab slice" % ( 
360                         os.path.basename(self.sliceSSHKey), 
361                         self.slicename
362                     ) ],
363                     stdin = open("/dev/null"),
364                     stdout = subprocess.PIPE,
365                     stderr = subprocess.PIPE)
366                 out,err = proc.communicate()
367                 self.sliceSSHKeyPass = out.strip()
368         
369         if not self.sliceSSHKeyPass:
370             raise TempKeyError
371         
372         # Create temporary key files
373         prk = tempfile.NamedTemporaryFile(
374             dir = self.root_directory,
375             prefix = "pl_deploy_tmpk_",
376             suffix = "")
377
378         puk = tempfile.NamedTemporaryFile(
379             dir = self.root_directory,
380             prefix = "pl_deploy_tmpk_",
381             suffix = ".pub")
382             
383         # Create secure 256-bits temporary passphrase
384         passphrase = ''.join(map(chr,[rng.randint(0,255) 
385                                       for rng in (random.SystemRandom(),)
386                                       for i in xrange(32)] )).encode("hex")
387                 
388         # Copy keys
389         oprk = open(self.sliceSSHKey, "rb")
390         opuk = open(self.sliceSSHKey+".pub", "rb")
391         shutil.copymode(oprk.name, prk.name)
392         shutil.copymode(opuk.name, puk.name)
393         shutil.copyfileobj(oprk, prk)
394         shutil.copyfileobj(opuk, puk)
395         prk.flush()
396         puk.flush()
397         oprk.close()
398         opuk.close()
399         
400         # A descriptive comment
401         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
402         
403         # Recipher keys
404         proc = subprocess.Popen(
405             ["ssh-keygen", "-p",
406              "-f", prk.name,
407              "-P", self.sliceSSHKeyPass,
408              "-N", passphrase,
409              "-C", comment ],
410             stdout = subprocess.PIPE,
411             stderr = subprocess.PIPE,
412             stdin = subprocess.PIPE
413         )
414         out, err = proc.communicate()
415         
416         if err:
417             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
418                 out, err)
419         
420         prk.seek(0)
421         puk.seek(0)
422         
423         # Change comment on public key
424         puklines = puk.readlines()
425         puklines[0] = puklines[0].split(' ')
426         puklines[0][-1] = comment+'\n'
427         puklines[0] = ' '.join(puklines[0])
428         puk.seek(0)
429         puk.truncate()
430         puk.writelines(puklines)
431         del puklines
432         puk.flush()
433         
434         return prk, puk, passphrase
435     
436     def set(self, guid, name, value, time = TIME_NOW):
437         super(TestbedController, self).set(guid, name, value, time)
438         # TODO: take on account schedule time for the task
439         element = self._elements[guid]
440         if element:
441             setattr(element, name, value)
442
443             if hasattr(element, 'refresh'):
444                 # invoke attribute refresh hook
445                 element.refresh()
446
447     def get(self, guid, name, time = TIME_NOW):
448         value = super(TestbedController, self).get(guid, name, time)
449         # TODO: take on account schedule time for the task
450         factory_id = self._create[guid]
451         factory = self._factories[factory_id]
452         element = self._elements.get(guid)
453         try:
454             return getattr(element, name)
455         except (KeyError, AttributeError):
456             return value
457
458     def get_address(self, guid, index, attribute='Address'):
459         index = int(index)
460
461         # try the real stuff
462         iface = self._elements.get(guid)
463         if iface and index == 0:
464             if attribute == 'Address':
465                 return iface.address
466             elif attribute == 'NetPrefix':
467                 return iface.netprefix
468             elif attribute == 'Broadcast':
469                 return iface.broadcast
470
471         # if all else fails, query box
472         return super(TestbedController, self).get_address(guid, index, attribute)
473
474     def action(self, time, guid, action):
475         raise NotImplementedError
476
477     def shutdown(self):
478         for trace in self._traces.itervalues():
479             trace.close()
480         
481         runner = ParallelRun(16)
482         runner.start()
483         for element in self._elements.itervalues():
484             # invoke cleanup hooks
485             if hasattr(element, 'cleanup'):
486                 runner.put(element.cleanup)
487         runner.join()
488         
489         runner = ParallelRun(16)
490         runner.start()
491         for element in self._elements.itervalues():
492             # invoke destroy hooks
493             if hasattr(element, 'destroy'):
494                 runner.put(element.destroy)
495         runner.join()
496         
497         self._elements.clear()
498         self._traces.clear()
499
500     def trace(self, guid, trace_id, attribute='value'):
501         app = self._elements[guid]
502
503         if attribute == 'value':
504             path = app.sync_trace(self.home_directory, trace_id)
505             if path:
506                 fd = open(path, "r")
507                 content = fd.read()
508                 fd.close()
509             else:
510                 content = None
511         elif attribute == 'path':
512             content = app.remote_trace_path(trace_id)
513         else:
514             content = None
515         return content
516
517     def follow_trace(self, trace_id, trace):
518         self._traces[trace_id] = trace
519     
520     def _make_generic(self, parameters, kind):
521         app = kind(self.plapi)
522
523         # Note: there is 1-to-1 correspondence between attribute names
524         #   If that changes, this has to change as well
525         for attr,val in parameters.iteritems():
526             setattr(app, attr, val)
527
528         return app
529
530     def _make_node(self, parameters):
531         return self._make_generic(parameters, self._node.Node)
532
533     def _make_node_iface(self, parameters):
534         return self._make_generic(parameters, self._interfaces.NodeIface)
535
536     def _make_tun_iface(self, parameters):
537         return self._make_generic(parameters, self._interfaces.TunIface)
538
539     def _make_tap_iface(self, parameters):
540         return self._make_generic(parameters, self._interfaces.TapIface)
541
542     def _make_netpipe(self, parameters):
543         return self._make_generic(parameters, self._interfaces.NetPipe)
544
545     def _make_internet(self, parameters):
546         return self._make_generic(parameters, self._interfaces.Internet)
547
548     def _make_application(self, parameters):
549         return self._make_generic(parameters, self._app.Application)
550
551     def _make_dependency(self, parameters):
552         return self._make_generic(parameters, self._app.Dependency)
553
554     def _make_nepi_dependency(self, parameters):
555         return self._make_generic(parameters, self._app.NepiDependency)
556
557     def _make_ns3_dependency(self, parameters):
558         return self._make_generic(parameters, self._app.NS3Dependency)
559