Merge with tip
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
11 import sys
12 import os
13 import os.path
14 import time
15 import resourcealloc
16 import collections
17 import operator
18 import functools
19 import socket
20 import struct
21 import tempfile
22 import subprocess
23 import random
24 import shutil
25 import logging
26
27 class TempKeyError(Exception):
28     pass
29
30 class TestbedController(testbed_impl.TestbedController):
31     def __init__(self):
32         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
33         self._home_directory = None
34         self.slicename = None
35         self._traces = dict()
36
37         import node, interfaces, application
38         self._node = node
39         self._interfaces = interfaces
40         self._app = application
41         
42         self._blacklist = set()
43         self._just_provisioned = set()
44         
45         self._load_blacklist()
46         
47         self._logger = logging.getLogger('nepi.testbeds.planetlab')
48
49     @property
50     def home_directory(self):
51         return self._home_directory
52
53     @property
54     def plapi(self):
55         if not hasattr(self, '_plapi'):
56             import plcapi
57
58             if self.authUser:
59                 self._plapi = plcapi.PLCAPI(
60                     username = self.authUser,
61                     password = self.authString,
62                     hostname = self.plcHost,
63                     urlpattern = self.plcUrl
64                     )
65             else:
66                 # anonymous access - may not be enough for much
67                 self._plapi = plcapi.PLCAPI()
68         return self._plapi
69
70     @property
71     def slice_id(self):
72         if not hasattr(self, '_slice_id'):
73             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
74             if slices:
75                 self._slice_id = slices[0]['slice_id']
76             else:
77                 # If it wasn't found, don't remember this failure, keep trying
78                 return None
79         return self._slice_id
80     
81     def _load_blacklist(self):
82         blpath = environ.homepath('plblacklist')
83         
84         try:
85             bl = open(blpath, "r")
86         except:
87             self._blacklist = set()
88             return
89             
90         try:
91             self._blacklist = set(
92                 map(int,
93                     map(str.strip, bl.readlines())
94                 )
95             )
96         finally:
97             bl.close()
98     
99     def _save_blacklist(self):
100         blpath = environ.homepath('plblacklist')
101         bl = open(blpath, "w")
102         try:
103             bl.writelines(
104                 map('%s\n'.__mod__, self._blacklist))
105         finally:
106             bl.close()
107     
108     def do_setup(self):
109         self._home_directory = self._attributes.\
110             get_attribute_value("homeDirectory")
111         self.slicename = self._attributes.\
112             get_attribute_value("slice")
113         self.authUser = self._attributes.\
114             get_attribute_value("authUser")
115         self.authString = self._attributes.\
116             get_attribute_value("authPass")
117         self.sliceSSHKey = self._attributes.\
118             get_attribute_value("sliceSSHKey")
119         self.sliceSSHKeyPass = None
120         self.plcHost = self._attributes.\
121             get_attribute_value("plcHost")
122         self.plcUrl = self._attributes.\
123             get_attribute_value("plcUrl")
124         self.logLevel = self._attributes.\
125             get_attribute_value("plLogLevel")
126         self.tapPortBase = self._attributes.\
127             get_attribute_value("tapPortBase")
128         self.p2pDeployment = self._attributes.\
129             get_attribute_value("p2pDeployment")
130         
131         self._logger.setLevel(getattr(logging,self.logLevel))
132         
133         super(TestbedController, self).do_setup()
134
135     def do_post_asynclaunch(self, guid):
136         # Dependencies were launched asynchronously,
137         # so wait for them
138         dep = self._elements[guid]
139         if isinstance(dep, self._app.Dependency):
140             dep.async_setup_wait()
141     
142     # Two-phase configuration for asynchronous launch
143     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
144     do_poststep_configure = staticmethod(do_post_asynclaunch)
145
146     def do_preconfigure(self):
147         while True:
148             # Perform resource discovery if we don't have
149             # specific resources assigned yet
150             self.do_resource_discovery()
151
152             # Create PlanetLab slivers
153             self.do_provisioning()
154             
155             try:
156                 # Wait for provisioning
157                 self.do_wait_nodes()
158                 
159                 # Okkey...
160                 break
161             except self._node.UnresponsiveNodeError:
162                 # Oh... retry...
163                 pass
164         
165         if self.p2pDeployment:
166             # Plan application deployment
167             self.do_spanning_deployment_plan()
168
169         # Configure elements per XML data
170         super(TestbedController, self).do_preconfigure()
171
172     def do_resource_discovery(self):
173         to_provision = self._to_provision = set()
174         
175         reserved = set(self._blacklist)
176         for guid, node in self._elements.iteritems():
177             if isinstance(node, self._node.Node) and node._node_id is not None:
178                 reserved.add(node._node_id)
179         
180         # Initial algo:
181         #   look for perfectly defined nodes
182         #   (ie: those with only one candidate)
183         for guid, node in self._elements.iteritems():
184             if isinstance(node, self._node.Node) and node._node_id is None:
185                 # Try existing nodes first
186                 # If we have only one candidate, simply use it
187                 candidates = node.find_candidates(
188                     filter_slice_id = self.slice_id)
189                 candidates -= reserved
190                 if len(candidates) == 1:
191                     node_id = iter(candidates).next()
192                     node.assign_node_id(node_id)
193                     reserved.add(node_id)
194                 elif not candidates:
195                     # Try again including unassigned nodes
196                     candidates = node.find_candidates()
197                     candidates -= reserved
198                     if len(candidates) > 1:
199                         continue
200                     if len(candidates) == 1:
201                         node_id = iter(candidates).next()
202                         node.assign_node_id(node_id)
203                         to_provision.add(node_id)
204                         reserved.add(node_id)
205                     elif not candidates:
206                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
207                             node.make_filter_description())
208         
209         # Now do the backtracking search for a suitable solution
210         # First with existing slice nodes
211         reqs = []
212         nodes = []
213         for guid, node in self._elements.iteritems():
214             if isinstance(node, self._node.Node) and node._node_id is None:
215                 # Try existing nodes first
216                 # If we have only one candidate, simply use it
217                 candidates = node.find_candidates(
218                     filter_slice_id = self.slice_id)
219                 candidates -= reserved
220                 reqs.append(candidates)
221                 nodes.append(node)
222         
223         if nodes and reqs:
224             try:
225                 solution = resourcealloc.alloc(reqs)
226             except resourcealloc.ResourceAllocationError:
227                 # Failed, try again with all nodes
228                 reqs = []
229                 for node in nodes:
230                     candidates = node.find_candidates()
231                     candidates -= reserved
232                     reqs.append(candidates)
233                 
234                 solution = resourcealloc.alloc(reqs)
235                 to_provision.update(solution)
236             
237             # Do assign nodes
238             for node, node_id in zip(nodes, solution):
239                 node.assign_node_id(node_id)
240
241     def do_provisioning(self):
242         if self._to_provision:
243             # Add new nodes to the slice
244             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
245             new_nodes = list(set(cur_nodes) | self._to_provision)
246             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
247
248         # cleanup
249         self._just_provisioned = self._to_provision
250         del self._to_provision
251     
252     def do_wait_nodes(self):
253         for guid, node in self._elements.iteritems():
254             if isinstance(node, self._node.Node):
255                 # Just inject configuration stuff
256                 node.home_path = "nepi-node-%s" % (guid,)
257                 node.ident_path = self.sliceSSHKey
258                 node.slicename = self.slicename
259             
260                 # Show the magic
261                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
262             
263         try:
264             for guid, node in self._elements.iteritems():
265                 if isinstance(node, self._node.Node):
266                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
267                     
268                     node.wait_provisioning(
269                         (20*60 if node._node_id in self._just_provisioned else 60)
270                     )
271                     
272                     self._logger.info("READY Node %s at %s", guid, node.hostname)
273                     
274                     # Prepare dependency installer now
275                     node.prepare_dependencies()
276         except self._node.UnresponsiveNodeError:
277             # Uh... 
278             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
279             
280             # Mark all dead nodes (which are unresponsive) on the blacklist
281             # and re-raise
282             for guid, node in self._elements.iteritems():
283                 if isinstance(node, self._node.Node):
284                     if not node.is_alive():
285                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
286                         self._blacklist.add(node._node_id)
287                         node.unassign_node()
288             
289             try:
290                 self._save_blacklist()
291             except:
292                 # not important...
293                 import traceback
294                 traceback.print_exc()
295             
296             raise
297     
298     def do_spanning_deployment_plan(self):
299         # Create application groups by collecting all applications
300         # based on their hash - the hash should contain everything that
301         # defines them and the platform they're built
302         
303         def dephash(app):
304             return (
305                 frozenset((app.depends or "").split(' ')),
306                 frozenset((app.sources or "").split(' ')),
307                 app.build,
308                 app.install,
309                 app.node.architecture,
310                 app.node.operatingSystem,
311                 app.node.pl_distro,
312             )
313         
314         depgroups = collections.defaultdict(list)
315         
316         for element in self._elements.itervalues():
317             if isinstance(element, self._app.Dependency):
318                 depgroups[dephash(element)].append(element)
319             elif isinstance(element, self._node.Node):
320                 deps = element._yum_dependencies
321                 if deps:
322                     depgroups[dephash(deps)].append(deps)
323         
324         # Set up spanning deployment for those applications that
325         # have been deployed in several nodes.
326         for dh, group in depgroups.iteritems():
327             if len(group) > 1:
328                 # Pick root (deterministically)
329                 root = min(group, key=lambda app:app.node.hostname)
330                 
331                 # Obtain all IPs in numeric format
332                 # (which means faster distance computations)
333                 for dep in group:
334                     dep._ip = socket.gethostbyname(dep.node.hostname)
335                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
336                 
337                 # Compute plan
338                 # NOTE: the plan is an iterator
339                 plan = mst.mst(
340                     group,
341                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
342                     root = root,
343                     maxbranching = 2)
344                 
345                 # Re-sign private key
346                 try:
347                     tempprk, temppuk, tmppass = self._make_temp_private_key()
348                 except TempKeyError:
349                     continue
350                 
351                 # Set up slaves
352                 plan = list(plan)
353                 for slave, master in plan:
354                     slave.set_master(master)
355                     slave.install_keys(tempprk, temppuk, tmppass)
356                     
357         # We don't need the user's passphrase anymore
358         self.sliceSSHKeyPass = None
359     
360     def _make_temp_private_key(self):
361         # Get the user's key's passphrase
362         if not self.sliceSSHKeyPass:
363             if 'SSH_ASKPASS' in os.environ:
364                 proc = subprocess.Popen(
365                     [ os.environ['SSH_ASKPASS'],
366                       "Please type the passphrase for the %s SSH identity file. "
367                       "The passphrase will be used to re-cipher the identity file with "
368                       "a random 256-bit key for automated chain deployment on the "
369                       "%s PlanetLab slice" % ( 
370                         os.path.basename(self.sliceSSHKey), 
371                         self.slicename
372                     ) ],
373                     stdin = open("/dev/null"),
374                     stdout = subprocess.PIPE,
375                     stderr = subprocess.PIPE)
376                 out,err = proc.communicate()
377                 self.sliceSSHKeyPass = out.strip()
378         
379         if not self.sliceSSHKeyPass:
380             raise TempKeyError
381         
382         # Create temporary key files
383         prk = tempfile.NamedTemporaryFile(
384             dir = self.root_directory,
385             prefix = "pl_deploy_tmpk_",
386             suffix = "")
387
388         puk = tempfile.NamedTemporaryFile(
389             dir = self.root_directory,
390             prefix = "pl_deploy_tmpk_",
391             suffix = ".pub")
392             
393         # Create secure 256-bits temporary passphrase
394         passphrase = ''.join(map(chr,[rng.randint(0,255) 
395                                       for rng in (random.SystemRandom(),)
396                                       for i in xrange(32)] )).encode("hex")
397                 
398         # Copy keys
399         oprk = open(self.sliceSSHKey, "rb")
400         opuk = open(self.sliceSSHKey+".pub", "rb")
401         shutil.copymode(oprk.name, prk.name)
402         shutil.copymode(opuk.name, puk.name)
403         shutil.copyfileobj(oprk, prk)
404         shutil.copyfileobj(opuk, puk)
405         prk.flush()
406         puk.flush()
407         oprk.close()
408         opuk.close()
409         
410         # A descriptive comment
411         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
412         
413         # Recipher keys
414         proc = subprocess.Popen(
415             ["ssh-keygen", "-p",
416              "-f", prk.name,
417              "-P", self.sliceSSHKeyPass,
418              "-N", passphrase,
419              "-C", comment ],
420             stdout = subprocess.PIPE,
421             stderr = subprocess.PIPE,
422             stdin = subprocess.PIPE
423         )
424         out, err = proc.communicate()
425         
426         if err:
427             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
428                 out, err)
429         
430         prk.seek(0)
431         puk.seek(0)
432         
433         # Change comment on public key
434         puklines = puk.readlines()
435         puklines[0] = puklines[0].split(' ')
436         puklines[0][-1] = comment+'\n'
437         puklines[0] = ' '.join(puklines[0])
438         puk.seek(0)
439         puk.truncate()
440         puk.writelines(puklines)
441         del puklines
442         puk.flush()
443         
444         return prk, puk, passphrase
445     
446     def set(self, guid, name, value, time = TIME_NOW):
447         super(TestbedController, self).set(guid, name, value, time)
448         # TODO: take on account schedule time for the task
449         element = self._elements[guid]
450         if element:
451             setattr(element, name, value)
452
453             if hasattr(element, 'refresh'):
454                 # invoke attribute refresh hook
455                 element.refresh()
456
457     def get(self, guid, name, time = TIME_NOW):
458         value = super(TestbedController, self).get(guid, name, time)
459         # TODO: take on account schedule time for the task
460         factory_id = self._create[guid]
461         factory = self._factories[factory_id]
462         element = self._elements.get(guid)
463         try:
464             return getattr(element, name)
465         except (KeyError, AttributeError):
466             return value
467
468     def get_address(self, guid, index, attribute='Address'):
469         index = int(index)
470
471         # try the real stuff
472         iface = self._elements.get(guid)
473         if iface and index == 0:
474             if attribute == 'Address':
475                 return iface.address
476             elif attribute == 'NetPrefix':
477                 return iface.netprefix
478             elif attribute == 'Broadcast':
479                 return iface.broadcast
480
481         # if all else fails, query box
482         return super(TestbedController, self).get_address(guid, index, attribute)
483
484     def action(self, time, guid, action):
485         raise NotImplementedError
486
487     def shutdown(self):
488         for trace in self._traces.itervalues():
489             trace.close()
490         
491         runner = ParallelRun(16)
492         runner.start()
493         for element in self._elements.itervalues():
494             # invoke cleanup hooks
495             if hasattr(element, 'cleanup'):
496                 runner.put(element.cleanup)
497         runner.join()
498         
499         runner = ParallelRun(16)
500         runner.start()
501         for element in self._elements.itervalues():
502             # invoke destroy hooks
503             if hasattr(element, 'destroy'):
504                 runner.put(element.destroy)
505         runner.join()
506         
507         self._elements.clear()
508         self._traces.clear()
509
510     def trace(self, guid, trace_id, attribute='value'):
511         app = self._elements[guid]
512
513         if attribute == 'value':
514             path = app.sync_trace(self.home_directory, trace_id)
515             if path:
516                 fd = open(path, "r")
517                 content = fd.read()
518                 fd.close()
519             else:
520                 content = None
521         elif attribute == 'path':
522             content = app.remote_trace_path(trace_id)
523         else:
524             content = None
525         return content
526
527     def follow_trace(self, trace_id, trace):
528         self._traces[trace_id] = trace
529     
530     def _make_generic(self, parameters, kind):
531         app = kind(self.plapi)
532
533         # Note: there is 1-to-1 correspondence between attribute names
534         #   If that changes, this has to change as well
535         for attr,val in parameters.iteritems():
536             setattr(app, attr, val)
537
538         return app
539
540     def _make_node(self, parameters):
541         return self._make_generic(parameters, self._node.Node)
542
543     def _make_node_iface(self, parameters):
544         return self._make_generic(parameters, self._interfaces.NodeIface)
545
546     def _make_tun_iface(self, parameters):
547         return self._make_generic(parameters, self._interfaces.TunIface)
548
549     def _make_tap_iface(self, parameters):
550         return self._make_generic(parameters, self._interfaces.TapIface)
551
552     def _make_netpipe(self, parameters):
553         return self._make_generic(parameters, self._interfaces.NetPipe)
554
555     def _make_internet(self, parameters):
556         return self._make_generic(parameters, self._interfaces.Internet)
557
558     def _make_application(self, parameters):
559         return self._make_generic(parameters, self._app.Application)
560
561     def _make_dependency(self, parameters):
562         return self._make_generic(parameters, self._app.Dependency)
563
564     def _make_nepi_dependency(self, parameters):
565         return self._make_generic(parameters, self._app.NepiDependency)
566
567     def _make_ns3_dependency(self, parameters):
568         return self._make_generic(parameters, self._app.NS3Dependency)
569