Ticket #70: Logging instead of prints for PlanetLab
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID, TESTBED_VERSION
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 from nepi.util.parallel import ParallelRun
11 import sys
12 import os
13 import os.path
14 import time
15 import resourcealloc
16 import collections
17 import operator
18 import functools
19 import socket
20 import struct
21 import tempfile
22 import subprocess
23 import random
24 import shutil
25 import logging
26
27 class TempKeyError(Exception):
28     pass
29
30 class TestbedController(testbed_impl.TestbedController):
31     def __init__(self):
32         super(TestbedController, self).__init__(TESTBED_ID, TESTBED_VERSION)
33         self._home_directory = None
34         self.slicename = None
35         self._traces = dict()
36
37         import node, interfaces, application
38         self._node = node
39         self._interfaces = interfaces
40         self._app = application
41         
42         self._blacklist = set()
43         self._just_provisioned = set()
44         
45         self._load_blacklist()
46         
47         self._logger = logging.getLogger('nepi.testbeds.planetlab')
48
49     @property
50     def home_directory(self):
51         return self._home_directory
52
53     @property
54     def plapi(self):
55         if not hasattr(self, '_plapi'):
56             import plcapi
57
58             if self.authUser:
59                 self._plapi = plcapi.PLCAPI(
60                     username = self.authUser,
61                     password = self.authString,
62                     hostname = self.plcHost,
63                     urlpattern = self.plcUrl
64                     )
65             else:
66                 # anonymous access - may not be enough for much
67                 self._plapi = plcapi.PLCAPI()
68         return self._plapi
69
70     @property
71     def slice_id(self):
72         if not hasattr(self, '_slice_id'):
73             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
74             if slices:
75                 self._slice_id = slices[0]['slice_id']
76             else:
77                 # If it wasn't found, don't remember this failure, keep trying
78                 return None
79         return self._slice_id
80     
81     def _load_blacklist(self):
82         blpath = environ.homepath('plblacklist')
83         
84         try:
85             bl = open(blpath, "r")
86         except:
87             self._blacklist = set()
88             return
89             
90         try:
91             self._blacklist = set(
92                 map(int,
93                     map(str.strip, bl.readlines())
94                 )
95             )
96         finally:
97             bl.close()
98     
99     def _save_blacklist(self):
100         blpath = environ.homepath('plblacklist')
101         bl = open(blpath, "w")
102         try:
103             bl.writelines(
104                 map('%s\n'.__mod__, self._blacklist))
105         finally:
106             bl.close()
107     
108     def do_setup(self):
109         self._home_directory = self._attributes.\
110             get_attribute_value("homeDirectory")
111         self.slicename = self._attributes.\
112             get_attribute_value("slice")
113         self.authUser = self._attributes.\
114             get_attribute_value("authUser")
115         self.authString = self._attributes.\
116             get_attribute_value("authPass")
117         self.sliceSSHKey = self._attributes.\
118             get_attribute_value("sliceSSHKey")
119         self.sliceSSHKeyPass = None
120         self.plcHost = self._attributes.\
121             get_attribute_value("plcHost")
122         self.plcUrl = self._attributes.\
123             get_attribute_value("plcUrl")
124         self.logLevel = self._attributes.\
125             get_attribute_value("plLogLevel")
126         
127         self._logger.setLevel(getattr(logging,self.logLevel))
128         
129         super(TestbedController, self).do_setup()
130
131     def do_post_asynclaunch(self, guid):
132         # Dependencies were launched asynchronously,
133         # so wait for them
134         dep = self._elements[guid]
135         if isinstance(dep, self._app.Dependency):
136             dep.async_setup_wait()
137     
138     # Two-phase configuration for asynchronous launch
139     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
140     do_poststep_configure = staticmethod(do_post_asynclaunch)
141
142     def do_preconfigure(self):
143         while True:
144             # Perform resource discovery if we don't have
145             # specific resources assigned yet
146             self.do_resource_discovery()
147
148             # Create PlanetLab slivers
149             self.do_provisioning()
150             
151             try:
152                 # Wait for provisioning
153                 self.do_wait_nodes()
154                 
155                 # Okkey...
156                 break
157             except self._node.UnresponsiveNodeError:
158                 # Oh... retry...
159                 pass
160         
161         # Plan application deployment
162         self.do_spanning_deployment_plan()
163
164         # Configure elements per XML data
165         super(TestbedController, self).do_preconfigure()
166
167     def do_resource_discovery(self):
168         to_provision = self._to_provision = set()
169         
170         reserved = set(self._blacklist)
171         for guid, node in self._elements.iteritems():
172             if isinstance(node, self._node.Node) and node._node_id is not None:
173                 reserved.add(node._node_id)
174         
175         # Initial algo:
176         #   look for perfectly defined nodes
177         #   (ie: those with only one candidate)
178         for guid, node in self._elements.iteritems():
179             if isinstance(node, self._node.Node) and node._node_id is None:
180                 # Try existing nodes first
181                 # If we have only one candidate, simply use it
182                 candidates = node.find_candidates(
183                     filter_slice_id = self.slice_id)
184                 candidates -= reserved
185                 if len(candidates) == 1:
186                     node_id = iter(candidates).next()
187                     node.assign_node_id(node_id)
188                     reserved.add(node_id)
189                 elif not candidates:
190                     # Try again including unassigned nodes
191                     candidates = node.find_candidates()
192                     candidates -= reserved
193                     if len(candidates) > 1:
194                         continue
195                     if len(candidates) == 1:
196                         node_id = iter(candidates).next()
197                         node.assign_node_id(node_id)
198                         to_provision.add(node_id)
199                         reserved.add(node_id)
200                     elif not candidates:
201                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
202                             node.make_filter_description())
203         
204         # Now do the backtracking search for a suitable solution
205         # First with existing slice nodes
206         reqs = []
207         nodes = []
208         for guid, node in self._elements.iteritems():
209             if isinstance(node, self._node.Node) and node._node_id is None:
210                 # Try existing nodes first
211                 # If we have only one candidate, simply use it
212                 candidates = node.find_candidates(
213                     filter_slice_id = self.slice_id)
214                 candidates -= reserved
215                 reqs.append(candidates)
216                 nodes.append(node)
217         
218         if nodes and reqs:
219             try:
220                 solution = resourcealloc.alloc(reqs)
221             except resourcealloc.ResourceAllocationError:
222                 # Failed, try again with all nodes
223                 reqs = []
224                 for node in nodes:
225                     candidates = node.find_candidates()
226                     candidates -= reserved
227                     reqs.append(candidates)
228                 
229                 solution = resourcealloc.alloc(reqs)
230                 to_provision.update(solution)
231             
232             # Do assign nodes
233             for node, node_id in zip(nodes, solution):
234                 node.assign_node_id(node_id)
235
236     def do_provisioning(self):
237         if self._to_provision:
238             # Add new nodes to the slice
239             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
240             new_nodes = list(set(cur_nodes) | self._to_provision)
241             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
242
243         # cleanup
244         self._just_provisioned = self._to_provision
245         del self._to_provision
246     
247     def do_wait_nodes(self):
248         for guid, node in self._elements.iteritems():
249             if isinstance(node, self._node.Node):
250                 # Just inject configuration stuff
251                 node.home_path = "nepi-node-%s" % (guid,)
252                 node.ident_path = self.sliceSSHKey
253                 node.slicename = self.slicename
254             
255                 # Show the magic
256                 self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
257             
258         try:
259             for guid, node in self._elements.iteritems():
260                 if isinstance(node, self._node.Node):
261                     self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
262                     
263                     node.wait_provisioning(
264                         (20*60 if node._node_id in self._just_provisioned else 60)
265                     )
266                     
267                     self._logger.info("READY Node %s at %s", guid, node.hostname)
268         except self._node.UnresponsiveNodeError:
269             # Uh... 
270             self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
271             
272             # Mark all dead nodes (which are unresponsive) on the blacklist
273             # and re-raise
274             for guid, node in self._elements.iteritems():
275                 if isinstance(node, self._node.Node):
276                     if not node.is_alive():
277                         self._logger.warn("Blacklisting %s for unresponsiveness", node.hostname)
278                         self._blacklist.add(node._node_id)
279                         node.unassign_node()
280             
281             try:
282                 self._save_blacklist()
283             except:
284                 # not important...
285                 import traceback
286                 traceback.print_exc()
287             
288             raise
289     
290     def do_spanning_deployment_plan(self):
291         # Create application groups by collecting all applications
292         # based on their hash - the hash should contain everything that
293         # defines them and the platform they're built
294         
295         def dephash(app):
296             return (
297                 frozenset((app.depends or "").split(' ')),
298                 frozenset((app.sources or "").split(' ')),
299                 app.build,
300                 app.install,
301                 app.node.architecture,
302                 app.node.operatingSystem,
303                 app.node.pl_distro,
304             )
305         
306         depgroups = collections.defaultdict(list)
307         
308         for element in self._elements.itervalues():
309             if isinstance(element, self._app.Dependency):
310                 depgroups[dephash(element)].append(element)
311         
312         # Set up spanning deployment for those applications that
313         # have been deployed in several nodes.
314         for dh, group in depgroups.iteritems():
315             if len(group) > 1:
316                 # Pick root (deterministically)
317                 root = min(group, key=lambda app:app.node.hostname)
318                 
319                 # Obtain all IPs in numeric format
320                 # (which means faster distance computations)
321                 for dep in group:
322                     dep._ip = socket.gethostbyname(dep.node.hostname)
323                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
324                 
325                 # Compute plan
326                 # NOTE: the plan is an iterator
327                 plan = mst.mst(
328                     group,
329                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
330                     root = root,
331                     maxbranching = 2)
332                 
333                 # Re-sign private key
334                 try:
335                     tempprk, temppuk, tmppass = self._make_temp_private_key()
336                 except TempKeyError:
337                     continue
338                 
339                 # Set up slaves
340                 plan = list(plan)
341                 for slave, master in plan:
342                     slave.set_master(master)
343                     slave.install_keys(tempprk, temppuk, tmppass)
344                     
345         # We don't need the user's passphrase anymore
346         self.sliceSSHKeyPass = None
347     
348     def _make_temp_private_key(self):
349         # Get the user's key's passphrase
350         if not self.sliceSSHKeyPass:
351             if 'SSH_ASKPASS' in os.environ:
352                 proc = subprocess.Popen(
353                     [ os.environ['SSH_ASKPASS'],
354                       "Please type the passphrase for the %s SSH identity file. "
355                       "The passphrase will be used to re-cipher the identity file with "
356                       "a random 256-bit key for automated chain deployment on the "
357                       "%s PlanetLab slice" % ( 
358                         os.path.basename(self.sliceSSHKey), 
359                         self.slicename
360                     ) ],
361                     stdin = open("/dev/null"),
362                     stdout = subprocess.PIPE,
363                     stderr = subprocess.PIPE)
364                 out,err = proc.communicate()
365                 self.sliceSSHKeyPass = out.strip()
366         
367         if not self.sliceSSHKeyPass:
368             raise TempKeyError
369         
370         # Create temporary key files
371         prk = tempfile.NamedTemporaryFile(
372             dir = self.root_directory,
373             prefix = "pl_deploy_tmpk_",
374             suffix = "")
375
376         puk = tempfile.NamedTemporaryFile(
377             dir = self.root_directory,
378             prefix = "pl_deploy_tmpk_",
379             suffix = ".pub")
380             
381         # Create secure 256-bits temporary passphrase
382         passphrase = ''.join(map(chr,[rng.randint(0,255) 
383                                       for rng in (random.SystemRandom(),)
384                                       for i in xrange(32)] )).encode("hex")
385                 
386         # Copy keys
387         oprk = open(self.sliceSSHKey, "rb")
388         opuk = open(self.sliceSSHKey+".pub", "rb")
389         shutil.copymode(oprk.name, prk.name)
390         shutil.copymode(opuk.name, puk.name)
391         shutil.copyfileobj(oprk, prk)
392         shutil.copyfileobj(opuk, puk)
393         prk.flush()
394         puk.flush()
395         oprk.close()
396         opuk.close()
397         
398         # A descriptive comment
399         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
400         
401         # Recipher keys
402         proc = subprocess.Popen(
403             ["ssh-keygen", "-p",
404              "-f", prk.name,
405              "-P", self.sliceSSHKeyPass,
406              "-N", passphrase,
407              "-C", comment ],
408             stdout = subprocess.PIPE,
409             stderr = subprocess.PIPE,
410             stdin = subprocess.PIPE
411         )
412         out, err = proc.communicate()
413         
414         if err:
415             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
416                 out, err)
417         
418         prk.seek(0)
419         puk.seek(0)
420         
421         # Change comment on public key
422         puklines = puk.readlines()
423         puklines[0] = puklines[0].split(' ')
424         puklines[0][-1] = comment+'\n'
425         puklines[0] = ' '.join(puklines[0])
426         puk.seek(0)
427         puk.truncate()
428         puk.writelines(puklines)
429         del puklines
430         puk.flush()
431         
432         return prk, puk, passphrase
433     
434     def set(self, guid, name, value, time = TIME_NOW):
435         super(TestbedController, self).set(guid, name, value, time)
436         # TODO: take on account schedule time for the task
437         element = self._elements[guid]
438         if element:
439             setattr(element, name, value)
440
441             if hasattr(element, 'refresh'):
442                 # invoke attribute refresh hook
443                 element.refresh()
444
445     def get(self, guid, name, time = TIME_NOW):
446         value = super(TestbedController, self).get(guid, name, time)
447         # TODO: take on account schedule time for the task
448         factory_id = self._create[guid]
449         factory = self._factories[factory_id]
450         element = self._elements.get(guid)
451         try:
452             return getattr(element, name)
453         except (KeyError, AttributeError):
454             return value
455
456     def get_address(self, guid, index, attribute='Address'):
457         index = int(index)
458
459         # try the real stuff
460         iface = self._elements.get(guid)
461         if iface and index == 0:
462             if attribute == 'Address':
463                 return iface.address
464             elif attribute == 'NetPrefix':
465                 return iface.netprefix
466             elif attribute == 'Broadcast':
467                 return iface.broadcast
468
469         # if all else fails, query box
470         return super(TestbedController, self).get_address(guid, index, attribute)
471
472     def action(self, time, guid, action):
473         raise NotImplementedError
474
475     def shutdown(self):
476         for trace in self._traces.itervalues():
477             trace.close()
478         
479         runner = ParallelRun(16)
480         runner.start()
481         for element in self._elements.itervalues():
482             # invoke cleanup hooks
483             if hasattr(element, 'cleanup'):
484                 runner.put(element.cleanup)
485         runner.join()
486         
487         runner = ParallelRun(16)
488         runner.start()
489         for element in self._elements.itervalues():
490             # invoke destroy hooks
491             if hasattr(element, 'destroy'):
492                 runner.put(element.destroy)
493         runner.join()
494         
495         self._elements.clear()
496         self._traces.clear()
497
498     def trace(self, guid, trace_id, attribute='value'):
499         app = self._elements[guid]
500
501         if attribute == 'value':
502             path = app.sync_trace(self.home_directory, trace_id)
503             if path:
504                 fd = open(path, "r")
505                 content = fd.read()
506                 fd.close()
507             else:
508                 content = None
509         elif attribute == 'path':
510             content = app.remote_trace_path(trace_id)
511         else:
512             content = None
513         return content
514
515     def follow_trace(self, trace_id, trace):
516         self._traces[trace_id] = trace
517     
518     def _make_generic(self, parameters, kind):
519         app = kind(self.plapi)
520
521         # Note: there is 1-to-1 correspondence between attribute names
522         #   If that changes, this has to change as well
523         for attr,val in parameters.iteritems():
524             setattr(app, attr, val)
525
526         return app
527
528     def _make_node(self, parameters):
529         return self._make_generic(parameters, self._node.Node)
530
531     def _make_node_iface(self, parameters):
532         return self._make_generic(parameters, self._interfaces.NodeIface)
533
534     def _make_tun_iface(self, parameters):
535         return self._make_generic(parameters, self._interfaces.TunIface)
536
537     def _make_tap_iface(self, parameters):
538         return self._make_generic(parameters, self._interfaces.TapIface)
539
540     def _make_netpipe(self, parameters):
541         return self._make_generic(parameters, self._interfaces.NetPipe)
542
543     def _make_internet(self, parameters):
544         return self._make_generic(parameters, self._interfaces.Internet)
545
546     def _make_application(self, parameters):
547         return self._make_generic(parameters, self._app.Application)
548
549     def _make_dependency(self, parameters):
550         return self._make_generic(parameters, self._app.Dependency)
551
552     def _make_nepi_dependency(self, parameters):
553         return self._make_generic(parameters, self._app.NepiDependency)
554
555     def _make_ns3_dependency(self, parameters):
556         return self._make_generic(parameters, self._app.NS3Dependency)
557