Remember blacklisting of nodes, and accelerate detection of unresponsive nodes if...
[nepi.git] / src / nepi / testbeds / planetlab / execute.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 from constants import TESTBED_ID
5 from nepi.core import testbed_impl
6 from nepi.util.constants import TIME_NOW
7 from nepi.util.graphtools import mst
8 from nepi.util import ipaddr2
9 from nepi.util import environ
10 import sys
11 import os
12 import os.path
13 import time
14 import resourcealloc
15 import collections
16 import operator
17 import functools
18 import socket
19 import struct
20 import tempfile
21 import subprocess
22 import random
23 import shutil
24
25 from nepi.util.constants import TESTBED_STATUS_CONFIGURED
26
27 class TempKeyError(Exception):
28     pass
29
30 class TestbedController(testbed_impl.TestbedController):
31     def __init__(self, testbed_version):
32         super(TestbedController, self).__init__(TESTBED_ID, testbed_version)
33         self._home_directory = None
34         self.slicename = None
35         self._traces = dict()
36
37         import node, interfaces, application
38         self._node = node
39         self._interfaces = interfaces
40         self._app = application
41         
42         self._blacklist = set()
43         self._just_provisioned = set()
44         
45         self._load_blacklist()
46
47     @property
48     def home_directory(self):
49         return self._home_directory
50
51     @property
52     def plapi(self):
53         if not hasattr(self, '_plapi'):
54             import plcapi
55
56             if self.authUser:
57                 self._plapi = plcapi.PLCAPI(
58                     username = self.authUser,
59                     password = self.authString,
60                     hostname = self.plcHost,
61                     urlpattern = self.plcUrl
62                     )
63             else:
64                 # anonymous access - may not be enough for much
65                 self._plapi = plcapi.PLCAPI()
66         return self._plapi
67
68     @property
69     def slice_id(self):
70         if not hasattr(self, '_slice_id'):
71             slices = self.plapi.GetSlices(self.slicename, fields=('slice_id',))
72             if slices:
73                 self._slice_id = slices[0]['slice_id']
74             else:
75                 # If it wasn't found, don't remember this failure, keep trying
76                 return None
77         return self._slice_id
78     
79     def _load_blacklist(self):
80         blpath = environ.homepath('plblacklist')
81         
82         try:
83             bl = open(blpath, "r")
84         except:
85             self._blacklist = set()
86             return
87             
88         try:
89             self._blacklist = set(
90                 map(int,
91                     map(str.strip, bl.readlines())
92                 )
93             )
94         finally:
95             bl.close()
96     
97     def _save_blacklist(self):
98         blpath = environ.homepath('plblacklist')
99         bl = open(blpath, "w")
100         try:
101             bl.writelines(
102                 map('%s\n'.__mod__, self._blacklist))
103         finally:
104             bl.close()
105     
106     def do_setup(self):
107         self._home_directory = self._attributes.\
108             get_attribute_value("homeDirectory")
109         self.slicename = self._attributes.\
110             get_attribute_value("slice")
111         self.authUser = self._attributes.\
112             get_attribute_value("authUser")
113         self.authString = self._attributes.\
114             get_attribute_value("authPass")
115         self.sliceSSHKey = self._attributes.\
116             get_attribute_value("sliceSSHKey")
117         self.sliceSSHKeyPass = None
118         self.plcHost = self._attributes.\
119             get_attribute_value("plcHost")
120         self.plcUrl = self._attributes.\
121             get_attribute_value("plcUrl")
122         super(TestbedController, self).do_setup()
123
124     def do_post_asynclaunch(self, guid):
125         # Dependencies were launched asynchronously,
126         # so wait for them
127         dep = self._elements[guid]
128         if isinstance(dep, self._app.Dependency):
129             dep.async_setup_wait()
130     
131     # Two-phase configuration for asynchronous launch
132     do_poststep_preconfigure = staticmethod(do_post_asynclaunch)
133     do_poststep_configure = staticmethod(do_post_asynclaunch)
134
135     def do_preconfigure(self):
136         while True:
137             # Perform resource discovery if we don't have
138             # specific resources assigned yet
139             self.do_resource_discovery()
140
141             # Create PlanetLab slivers
142             self.do_provisioning()
143             
144             try:
145                 # Wait for provisioning
146                 self.do_wait_nodes()
147                 
148                 # Okkey...
149                 break
150             except self._node.UnresponsiveNodeError:
151                 # Oh... retry...
152                 pass
153         
154         # Plan application deployment
155         self.do_spanning_deployment_plan()
156
157         # Configure elements per XML data
158         super(TestbedController, self).do_preconfigure()
159
160     def do_resource_discovery(self):
161         to_provision = self._to_provision = set()
162         
163         reserved = set(self._blacklist)
164         for guid, node in self._elements.iteritems():
165             if isinstance(node, self._node.Node) and node._node_id is not None:
166                 reserved.add(node._node_id)
167         
168         # Initial algo:
169         #   look for perfectly defined nodes
170         #   (ie: those with only one candidate)
171         for guid, node in self._elements.iteritems():
172             if isinstance(node, self._node.Node) and node._node_id is None:
173                 # Try existing nodes first
174                 # If we have only one candidate, simply use it
175                 candidates = node.find_candidates(
176                     filter_slice_id = self.slice_id)
177                 candidates -= reserved
178                 if len(candidates) == 1:
179                     node_id = iter(candidates).next()
180                     node.assign_node_id(node_id)
181                     reserved.add(node_id)
182                 elif not candidates:
183                     # Try again including unassigned nodes
184                     candidates = node.find_candidates()
185                     candidates -= reserved
186                     if len(candidates) > 1:
187                         continue
188                     if len(candidates) == 1:
189                         node_id = iter(candidates).next()
190                         node.assign_node_id(node_id)
191                         to_provision.add(node_id)
192                         reserved.add(node_id)
193                     elif not candidates:
194                         raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
195                             node.make_filter_description())
196         
197         # Now do the backtracking search for a suitable solution
198         # First with existing slice nodes
199         reqs = []
200         nodes = []
201         for guid, node in self._elements.iteritems():
202             if isinstance(node, self._node.Node) and node._node_id is None:
203                 # Try existing nodes first
204                 # If we have only one candidate, simply use it
205                 candidates = node.find_candidates(
206                     filter_slice_id = self.slice_id)
207                 candidates -= reserved
208                 reqs.append(candidates)
209                 nodes.append(node)
210         
211         if nodes and reqs:
212             try:
213                 solution = resourcealloc.alloc(reqs)
214             except resourcealloc.ResourceAllocationError:
215                 # Failed, try again with all nodes
216                 reqs = []
217                 for node in nodes:
218                     candidates = node.find_candidates()
219                     reqs.append(candidates)
220                 
221                 solution = resourcealloc.alloc(reqs)
222                 to_provision.update(solution)
223             
224             # Do assign nodes
225             for node, node_id in zip(nodes, solution):
226                 node.assign_node_id(node_id)
227
228     def do_provisioning(self):
229         if self._to_provision:
230             # Add new nodes to the slice
231             cur_nodes = self.plapi.GetSlices(self.slicename, ['node_ids'])[0]['node_ids']
232             new_nodes = list(set(cur_nodes) | self._to_provision)
233             self.plapi.UpdateSlice(self.slicename, nodes=new_nodes)
234
235         # cleanup
236         self._just_provisioned = self._to_provision
237         del self._to_provision
238     
239     def do_wait_nodes(self):
240         for guid, node in self._elements.iteritems():
241             if isinstance(node, self._node.Node):
242                 # Just inject configuration stuff
243                 node.home_path = "nepi-node-%s" % (guid,)
244                 node.ident_path = self.sliceSSHKey
245                 node.slicename = self.slicename
246             
247                 # Show the magic
248                 print "PlanetLab Node", guid, "configured at", node.hostname
249             
250         try:
251             for guid, node in self._elements.iteritems():
252                 if isinstance(node, self._node.Node):
253                     print "Waiting for Node", guid, "configured at", node.hostname,
254                     sys.stdout.flush()
255                     
256                     node.wait_provisioning(
257                         (20*60 if node._node_id in self._just_provisioned else 60)
258                     )
259                     
260                     print "READY"
261         except self._node.UnresponsiveNodeError:
262             # Uh... 
263             print "UNRESPONSIVE"
264             
265             # Mark all dead nodes (which are unresponsive) on the blacklist
266             # and re-raise
267             for guid, node in self._elements.iteritems():
268                 if isinstance(node, self._node.Node):
269                     if not node.is_alive():
270                         print "Blacklisting", node.hostname, "for unresponsiveness"
271                         self._blacklist.add(node._node_id)
272                         node.unassign_node()
273             
274             try:
275                 self._save_blacklist()
276             except:
277                 # not important...
278                 import traceback
279                 traceback.print_exc()
280             
281             raise
282     
283     def do_spanning_deployment_plan(self):
284         # Create application groups by collecting all applications
285         # based on their hash - the hash should contain everything that
286         # defines them and the platform they're built
287         
288         def dephash(app):
289             return (
290                 frozenset((app.depends or "").split(' ')),
291                 frozenset((app.sources or "").split(' ')),
292                 app.build,
293                 app.install,
294                 app.node.architecture,
295                 app.node.operatingSystem,
296                 app.node.pl_distro,
297             )
298         
299         depgroups = collections.defaultdict(list)
300         
301         for element in self._elements.itervalues():
302             if isinstance(element, self._app.Dependency):
303                 depgroups[dephash(element)].append(element)
304         
305         # Set up spanning deployment for those applications that
306         # have been deployed in several nodes.
307         for dh, group in depgroups.iteritems():
308             if len(group) > 1:
309                 # Pick root (deterministically)
310                 root = min(group, key=lambda app:app.node.hostname)
311                 
312                 # Obtain all IPs in numeric format
313                 # (which means faster distance computations)
314                 for dep in group:
315                     dep._ip = socket.gethostbyname(dep.node.hostname)
316                     dep._ip_n = struct.unpack('!L', socket.inet_aton(dep._ip))[0]
317                 
318                 # Compute plan
319                 # NOTE: the plan is an iterator
320                 plan = mst.mst(
321                     group,
322                     lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
323                     root = root,
324                     maxbranching = 2)
325                 
326                 # Re-sign private key
327                 try:
328                     tempprk, temppuk, tmppass = self._make_temp_private_key()
329                 except TempKeyError:
330                     continue
331                 
332                 # Set up slaves
333                 plan = list(plan)
334                 for slave, master in plan:
335                     slave.set_master(master)
336                     slave.install_keys(tempprk, temppuk, tmppass)
337                     
338         # We don't need the user's passphrase anymore
339         self.sliceSSHKeyPass = None
340     
341     def _make_temp_private_key(self):
342         # Get the user's key's passphrase
343         if not self.sliceSSHKeyPass:
344             if 'SSH_ASKPASS' in os.environ:
345                 proc = subprocess.Popen(
346                     [ os.environ['SSH_ASKPASS'],
347                       "Please type the passphrase for the %s SSH identity file. "
348                       "The passphrase will be used to re-cipher the identity file with "
349                       "a random 256-bit key for automated chain deployment on the "
350                       "%s PlanetLab slice" % ( 
351                         os.path.basename(self.sliceSSHKey), 
352                         self.slicename
353                     ) ],
354                     stdin = open("/dev/null"),
355                     stdout = subprocess.PIPE,
356                     stderr = subprocess.PIPE)
357                 out,err = proc.communicate()
358                 self.sliceSSHKeyPass = out.strip()
359         
360         if not self.sliceSSHKeyPass:
361             raise TempKeyError
362         
363         # Create temporary key files
364         prk = tempfile.NamedTemporaryFile(
365             dir = self.root_directory,
366             prefix = "pl_deploy_tmpk_",
367             suffix = "")
368
369         puk = tempfile.NamedTemporaryFile(
370             dir = self.root_directory,
371             prefix = "pl_deploy_tmpk_",
372             suffix = ".pub")
373             
374         # Create secure 256-bits temporary passphrase
375         passphrase = ''.join(map(chr,[rng.randint(0,255) 
376                                       for rng in (random.SystemRandom(),)
377                                       for i in xrange(32)] )).encode("hex")
378                 
379         # Copy keys
380         oprk = open(self.sliceSSHKey, "rb")
381         opuk = open(self.sliceSSHKey+".pub", "rb")
382         shutil.copymode(oprk.name, prk.name)
383         shutil.copymode(opuk.name, puk.name)
384         shutil.copyfileobj(oprk, prk)
385         shutil.copyfileobj(opuk, puk)
386         prk.flush()
387         puk.flush()
388         oprk.close()
389         opuk.close()
390         
391         # A descriptive comment
392         comment = "%s#NEPI_INTERNAL@%s" % (self.authUser, self.slicename)
393         
394         # Recipher keys
395         proc = subprocess.Popen(
396             ["ssh-keygen", "-p",
397              "-f", prk.name,
398              "-P", self.sliceSSHKeyPass,
399              "-N", passphrase,
400              "-C", comment ],
401             stdout = subprocess.PIPE,
402             stderr = subprocess.PIPE,
403             stdin = subprocess.PIPE
404         )
405         out, err = proc.communicate()
406         
407         if err:
408             raise RuntimeError, "Problem generating keys: \n%s\n%r" % (
409                 out, err)
410         
411         prk.seek(0)
412         puk.seek(0)
413         
414         # Change comment on public key
415         puklines = puk.readlines()
416         puklines[0] = puklines[0].split(' ')
417         puklines[0][-1] = comment+'\n'
418         puklines[0] = ' '.join(puklines[0])
419         puk.seek(0)
420         puk.truncate()
421         puk.writelines(puklines)
422         del puklines
423         puk.flush()
424         
425         return prk, puk, passphrase
426     
427     def set(self, guid, name, value, time = TIME_NOW):
428         super(TestbedController, self).set(guid, name, value, time)
429         # TODO: take on account schedule time for the task
430         element = self._elements[guid]
431         if element:
432             setattr(element, name, value)
433
434             if hasattr(element, 'refresh'):
435                 # invoke attribute refresh hook
436                 element.refresh()
437
438     def get(self, guid, name, time = TIME_NOW):
439         value = super(TestbedController, self).get(guid, name, time)
440         # TODO: take on account schedule time for the task
441         factory_id = self._create[guid]
442         factory = self._factories[factory_id]
443         if factory.box_attributes.is_attribute_design_only(name):
444             return value
445         element = self._elements.get(guid)
446         try:
447             return getattr(element, name)
448         except KeyError, AttributeError:
449             return value
450
451     def get_address(self, guid, index, attribute='Address'):
452         index = int(index)
453
454         # try the real stuff
455         iface = self._elements.get(guid)
456         if iface and index == 0:
457             if attribute == 'Address':
458                 return iface.address
459             elif attribute == 'NetPrefix':
460                 return iface.netprefix
461             elif attribute == 'Broadcast':
462                 return iface.broadcast
463
464         # if all else fails, query box
465         return super(TestbedController, self).get_address(guid, index, attribute)
466
467     def action(self, time, guid, action):
468         raise NotImplementedError
469
470     def shutdown(self):
471         for trace in self._traces.itervalues():
472             trace.close()
473         for element in self._elements.itervalues():
474             # invoke cleanup hooks
475             if hasattr(element, 'cleanup'):
476                 element.cleanup()
477         for element in self._elements.itervalues():
478             # invoke destroy hooks
479             if hasattr(element, 'destroy'):
480                 element.destroy()
481         self._elements.clear()
482         self._traces.clear()
483
484     def trace(self, guid, trace_id, attribute='value'):
485         app = self._elements[guid]
486
487         if attribute == 'value':
488             path = app.sync_trace(self.home_directory, trace_id)
489             if path:
490                 fd = open(path, "r")
491                 content = fd.read()
492                 fd.close()
493             else:
494                 content = None
495         elif attribute == 'path':
496             content = app.remote_trace_path(trace_id)
497         elif attribute == 'size':
498             # TODO
499             raise NotImplementedError
500         else:
501             content = None
502         return content
503
504     def follow_trace(self, trace_id, trace):
505         self._traces[trace_id] = trace
506     
507     def _make_generic(self, parameters, kind):
508         app = kind(self.plapi)
509
510         # Note: there is 1-to-1 correspondence between attribute names
511         #   If that changes, this has to change as well
512         for attr,val in parameters.iteritems():
513             setattr(app, attr, val)
514
515         return app
516
517     def _make_node(self, parameters):
518         node = self._make_generic(parameters, self._node.Node)
519
520         # If emulation is enabled, we automatically need
521         # some vsys interfaces and packages
522         if node.emulation:
523             node.required_vsys.add('ipfw-be')
524             node.required_packages.add('ipfwslice')
525
526         return node
527
528     def _make_node_iface(self, parameters):
529         return self._make_generic(parameters, self._interfaces.NodeIface)
530
531     def _make_tun_iface(self, parameters):
532         return self._make_generic(parameters, self._interfaces.TunIface)
533
534     def _make_tap_iface(self, parameters):
535         return self._make_generic(parameters, self._interfaces.TapIface)
536
537     def _make_netpipe(self, parameters):
538         return self._make_generic(parameters, self._interfaces.NetPipe)
539
540     def _make_internet(self, parameters):
541         return self._make_generic(parameters, self._interfaces.Internet)
542
543     def _make_application(self, parameters):
544         return self._make_generic(parameters, self._app.Application)
545
546     def _make_dependency(self, parameters):
547         return self._make_generic(parameters, self._app.Dependency)
548
549     def _make_nepi_dependency(self, parameters):
550         return self._make_generic(parameters, self._app.NepiDependency)
551
552     def _make_ns3_dependency(self, parameters):
553         return self._make_generic(parameters, self._app.NS3Dependency)
554