Fixing CCNx on PlanetLab to accept a user defined bind port.
[nepi.git] / src / nepi / testbeds / planetlab / application.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import os
7 import os.path
8 import sys
9 import nepi.util.server as server
10 import cStringIO
11 import subprocess
12 import rspawn
13 import random
14 import time
15 import socket
16 import threading
17 import logging
18 import re
19
20 from nepi.util.constants import ApplicationStatus as AS
21
22 _ccnre = re.compile("\s*(udp|tcp)\s+(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\s*$")
23
24 class Dependency(object):
25     """
26     A Dependency is in every respect like an application.
27     
28     It depends on some packages, it may require building binaries, it must deploy
29     them...
30     
31     But it has no command. Dependencies aren't ever started, or stopped, and have
32     no status.
33     """
34
35     TRACES = ('buildlog')
36
37     def __init__(self, api=None):
38         if not api:
39             api = plcapi.PLCAPI()
40         self._api = api
41         
42         # Attributes
43         self.command = None
44         self.sudo = False
45         
46         self.build = None
47         self.install = None
48         self.depends = None
49         self.buildDepends = None
50         self.sources = None
51         self.rpmFusion = False
52         self.env = {}
53         
54         self.stdin = None
55         self.stdout = None
56         self.stderr = None
57         self.buildlog = None
58         
59         self.add_to_path = True
60         
61         # Those are filled when the app is configured
62         self.home_path = None
63         
64         # Those are filled when an actual node is connected
65         self.node = None
66         
67         # Those are filled when the app is started
68         #   Having both pid and ppid makes it harder
69         #   for pid rollover to induce tracking mistakes
70         self._started = False
71         self._setup = False
72         self._setuper = None
73         self._pid = None
74         self._ppid = None
75
76         # Spanning tree deployment
77         self._master = None
78         self._master_passphrase = None
79         self._master_prk = None
80         self._master_puk = None
81         self._master_token = os.urandom(8).encode("hex")
82         self._build_pid = None
83         self._build_ppid = None
84         
85         # Logging
86         self._logger = logging.getLogger('nepi.testbeds.planetlab')
87         
88     
89     def __str__(self):
90         return "%s<%s>" % (
91             self.__class__.__name__,
92             ' '.join(filter(bool,(self.depends, self.sources)))
93         )
94     
95     def validate(self):
96         if self.home_path is None:
97             raise AssertionError, "Misconfigured application: missing home path"
98         if self.node.ident_path is None or not os.access(self.node.ident_path, os.R_OK):
99             raise AssertionError, "Misconfigured application: missing slice SSH key"
100         if self.node is None:
101             raise AssertionError, "Misconfigured application: unconnected node"
102         if self.node.hostname is None:
103             raise AssertionError, "Misconfigured application: misconfigured node"
104         if self.node.slicename is None:
105             raise AssertionError, "Misconfigured application: unspecified slice"
106     
107     def check_bad_host(self, out, err):
108         """
109         Called whenever an operation fails, it's given the output to be checked for
110         telltale signs of unhealthy hosts.
111         """
112         return False
113     
114     def remote_trace_path(self, whichtrace):
115         if whichtrace in self.TRACES:
116             tracefile = os.path.join(self.home_path, whichtrace)
117         else:
118             tracefile = None
119         
120         return tracefile
121
122     def remote_trace_name(self, whichtrace):
123         if whichtrace in self.TRACES:
124             return whichtrace
125         return None
126
127     def sync_trace(self, local_dir, whichtrace):
128         tracefile = self.remote_trace_path(whichtrace)
129         if not tracefile:
130             return None
131         
132         local_path = os.path.join(local_dir, tracefile)
133         
134         # create parent local folders
135         proc = subprocess.Popen(
136             ["mkdir", "-p", os.path.dirname(local_path)],
137             stdout = open("/dev/null","w"),
138             stdin = open("/dev/null","r"))
139
140         if proc.wait():
141             raise RuntimeError, "Failed to synchronize trace"
142         
143         # sync files
144         try:
145             self._popen_scp(
146                 '%s@%s:%s' % (self.node.slicename, self.node.hostname,
147                     tracefile),
148                 local_path
149                 )
150         except RuntimeError, e:
151             raise RuntimeError, "Failed to synchronize trace: %s %s" \
152                     % (e.args[0], e.args[1],)
153         
154         return local_path
155     
156     def recover(self):
157         # We assume a correct deployment, so recovery only
158         # means we mark this dependency as deployed
159         self._setup = True
160
161     def setup(self):
162         self._logger.info("Setting up %s", self)
163         self._make_home()
164         self._launch_build()
165         self._finish_build()
166         self._setup = True
167     
168     def async_setup(self):
169         if not self._setuper:
170             def setuper():
171                 try:
172                     self.setup()
173                 except:
174                     self._setuper._exc.append(sys.exc_info())
175             self._setuper = threading.Thread(
176                 target = setuper)
177             self._setuper._exc = []
178             self._setuper.start()
179     
180     def async_setup_wait(self):
181         if not self._setup:
182             self._logger.info("Waiting for %s to be setup", self)
183             if self._setuper:
184                 self._setuper.join()
185                 if not self._setup:
186                     if self._setuper._exc:
187                         exctyp,exval,exctrace = self._setuper._exc[0]
188                         raise exctyp,exval,exctrace
189                     else:
190                         raise RuntimeError, "Failed to setup application"
191                 else:
192                     self._logger.info("Setup ready: %s at %s", self, self.node.hostname)
193             else:
194                 self.setup()
195         
196     def _make_home(self):
197         # Make sure all the paths are created where 
198         # they have to be created for deployment
199         # sync files
200         try:
201             self._popen_ssh_command(
202                 "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \
203                     % { 'home' : server.shell_escape(self.home_path) },
204                 timeout = 120,
205                 retry = 3
206                 )
207         except RuntimeError, e:
208             raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],)
209         
210         if self.stdin:
211             stdin = self.stdin
212             if not os.path.isfile(stdin):
213                 stdin = cStringIO.StringIO(self.stdin)
214
215             # Write program input
216             try:
217                 self._popen_scp(stdin,
218                     '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
219                         os.path.join(self.home_path, 'stdin') ),
220                     )
221             except RuntimeError, e:
222                 raise RuntimeError, "Failed to set up application %s: %s %s" \
223                         % (self.home_path, e.args[0], e.args[1],)
224
225     def _replace_paths(self, command):
226         """
227         Replace all special path tags with shell-escaped actual paths.
228         """
229         # need to append ${HOME} if paths aren't absolute, to MAKE them absolute.
230         root = '' if self.home_path.startswith('/') else "${HOME}/"
231         return ( command
232             .replace("${SOURCES}", root+server.shell_escape(self.home_path))
233             .replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) )
234
235     def _launch_build(self, trial=0):
236         if self._master is not None:
237             if not trial or self._master_prk is not None:
238                 self._do_install_keys()
239             buildscript = self._do_build_slave()
240         else:
241             buildscript = self._do_build_master()
242             
243         if buildscript is not None:
244             self._logger.info("Building %s at %s", self, self.node.hostname)
245             
246             # upload build script
247             try:
248                 self._popen_scp(
249                     buildscript,
250                     '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
251                         os.path.join(self.home_path, 'nepi-build.sh') )
252                     )
253             except RuntimeError, e:
254                 raise RuntimeError, "Failed to set up application %s: %s %s" \
255                         % (self.home_path, e.args[0], e.args[1],)
256             
257             # launch build
258             self._do_launch_build()
259     
260     def _finish_build(self):
261         self._do_wait_build()
262         self._do_install()
263
264     def _do_build_slave(self):
265         if not self.sources and not self.build:
266             return None
267             
268         # Create build script
269         files = set()
270         
271         if self.sources:
272             sources = self.sources.split(' ')
273             files.update(
274                 "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostip, 
275                     os.path.join(self._master.home_path, os.path.basename(source)),)
276                 for source in sources
277             )
278         
279         if self.build:
280             files.add(
281                 "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostip, 
282                     os.path.join(self._master.home_path, 'build.tar.gz'),)
283             )
284         
285         sshopts = "-o ConnectTimeout=30 -o ConnectionAttempts=3 -o ServerAliveInterval=30 -o TCPKeepAlive=yes"
286         
287         launch_agent = "{ ( echo -e '#!/bin/sh\\ncat' > .ssh-askpass ) && chmod u+x .ssh-askpass"\
288                         " && export SSH_ASKPASS=$(pwd)/.ssh-askpass "\
289                         " && ssh-agent > .ssh-agent.sh ; } && . ./.ssh-agent.sh && ( echo $NEPI_MASTER_PASSPHRASE | ssh-add %(prk)s ) && rm -rf %(prk)s %(puk)s" %  \
290         {
291             'prk' : server.shell_escape(self._master_prk_name),
292             'puk' : server.shell_escape(self._master_puk_name),
293         }
294         
295         kill_agent = "kill $SSH_AGENT_PID"
296         
297         waitmaster = (
298             "{ "
299             "echo 'Checking master reachability' ; "
300             "if ping -c 3 %(master_host)s && (. ./.ssh-agent.sh > /dev/null ; ssh -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s echo MASTER SAYS HI ) ; then "
301             "echo 'Master node reachable' ; "
302             "else "
303             "echo 'MASTER NODE UNREACHABLE' && "
304             "exit 1 ; "
305             "fi ; "
306             ". ./.ssh-agent.sh ; "
307             "while [[ $(. ./.ssh-agent.sh > /dev/null ; ssh -q -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s cat %(token_path)s.retcode || /bin/true) != %(token)s ]] ; do sleep 5 ; done ; "
308             "if [[ $(. ./.ssh-agent.sh > /dev/null ; ssh -q -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s cat %(token_path)s || /bin/true) != %(token)s ]] ; then echo BAD TOKEN ; exit 1 ; fi ; "
309             "}" 
310         ) % {
311             'hostkey' : 'master_known_hosts',
312             'master' : "%s@%s" % (self._master.node.slicename, self._master.node.hostip),
313             'master_host' : self._master.node.hostip,
314             'token_path' : os.path.join(self._master.home_path, 'build.token'),
315             'token' : server.shell_escape(self._master._master_token),
316             'sshopts' : sshopts,
317         }
318         
319         syncfiles = ". ./.ssh-agent.sh && scp -p -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(files)s ." % {
320             'hostkey' : 'master_known_hosts',
321             'files' : ' '.join(files),
322             'sshopts' : sshopts,
323         }
324         if self.build:
325             syncfiles += " && tar xzf build.tar.gz"
326         syncfiles += " && ( echo %s > build.token )" % (server.shell_escape(self._master_token),)
327         syncfiles += " && ( echo %s > build.token.retcode )" % (server.shell_escape(self._master_token),)
328         syncfiles = "{ . ./.ssh-agent.sh ; %s ; }" % (syncfiles,)
329         
330         cleanup = "{ . ./.ssh-agent.sh ; kill $SSH_AGENT_PID ; rm -rf %(prk)s %(puk)s master_known_hosts .ssh-askpass ; }" % {
331             'prk' : server.shell_escape(self._master_prk_name),
332             'puk' : server.shell_escape(self._master_puk_name),
333         }
334         
335         slavescript = "( ( %(launch_agent)s && %(waitmaster)s && %(syncfiles)s && %(kill_agent)s && %(cleanup)s ) || %(cleanup)s ) ; echo %(token)s > build.token.retcode" % {
336             'waitmaster' : waitmaster,
337             'syncfiles' : syncfiles,
338             'cleanup' : cleanup,
339             'kill_agent' : kill_agent,
340             'launch_agent' : launch_agent,
341             'home' : server.shell_escape(self.home_path),
342             'token' : server.shell_escape(self._master_token),
343         }
344         
345         return cStringIO.StringIO(slavescript)
346          
347     def _do_launch_build(self):
348         script = "bash ./nepi-build.sh"
349         if self._master_passphrase:
350             script = "NEPI_MASTER_PASSPHRASE=%s %s" % (
351                 server.shell_escape(self._master_passphrase),
352                 script
353             )
354         (out,err),proc = rspawn.remote_spawn(
355             script,
356             pidfile = 'build-pid',
357             home = self.home_path,
358             stdin = '/dev/null',
359             stdout = 'buildlog',
360             stderr = rspawn.STDOUT,
361             
362             host = self.node.hostname,
363             port = None,
364             user = self.node.slicename,
365             agent = None,
366             ident_key = self.node.ident_path,
367             server_key = self.node.server_key,
368             hostip = self.node.hostip,
369             )
370         
371         if proc.wait():
372             if self.check_bad_host(out, err):
373                 self.node.blacklist()
374             raise RuntimeError, "Failed to set up build slave %s: %s %s" % (self.home_path, out,err,)
375         
376         
377         pid = ppid = None
378         delay = 1.0
379         for i in xrange(5):
380             pidtuple = rspawn.remote_check_pid(
381                 os.path.join(self.home_path,'build-pid'),
382                 host = self.node.hostname,
383                 port = None,
384                 user = self.node.slicename,
385                 agent = None,
386                 ident_key = self.node.ident_path,
387                 server_key = self.node.server_key,
388                 hostip = self.node.hostip
389                 )
390             
391             if pidtuple:
392                 pid, ppid = pidtuple
393                 self._build_pid, self._build_ppid = pidtuple
394                 break
395             else:
396                 time.sleep(delay)
397                 delay = min(30,delay*1.2)
398         else:
399             raise RuntimeError, "Failed to set up build slave %s: cannot get pid" % (self.home_path,)
400
401         self._logger.info("Deploying %s at %s", self, self.node.hostname)
402         
403     def _do_wait_build(self, trial=0):
404         pid = self._build_pid
405         ppid = self._build_ppid
406         
407         if pid and ppid:
408             delay = 1.0
409             first = True
410             bustspin = 0
411             while True:
412                 status = rspawn.remote_status(
413                     pid, ppid,
414                     host = self.node.hostname,
415                     port = None,
416                     user = self.node.slicename,
417                     agent = None,
418                     ident_key = self.node.ident_path,
419                     server_key = self.node.server_key,
420                     hostip = self.node.hostip
421                     )
422                 
423                 if status is rspawn.FINISHED:
424                     self._build_pid = self._build_ppid = None
425                     break
426                 elif status is not rspawn.RUNNING:
427                     self._logger.warn("Busted waiting for %s to finish building at %s %s", self, self.node.hostname,
428                             "(build slave)" if self._master is not None else "(build master)")
429                     bustspin += 1
430                     time.sleep(delay*(5.5+random.random()))
431                     if bustspin > 12:
432                         self._build_pid = self._build_ppid = None
433                         break
434                 else:
435                     if first:
436                         self._logger.info("Waiting for %s to finish building at %s %s", self, self.node.hostname,
437                             "(build slave)" if self._master is not None else "(build master)")
438                         
439                         first = False
440                     time.sleep(delay*(0.5+random.random()))
441                     delay = min(30,delay*1.2)
442                     bustspin = 0
443             
444             # check build token
445             slave_token = ""
446             for i in xrange(3):
447                 (out, err), proc = self._popen_ssh_command(
448                     "cat %(token_path)s" % {
449                         'token_path' : os.path.join(self.home_path, 'build.token'),
450                     },
451                     timeout = 120,
452                     noerrors = True)
453                 if not proc.wait() and out:
454                     slave_token = out.strip()
455                 
456                 if slave_token:
457                     break
458                 else:
459                     time.sleep(2)
460             
461             if slave_token != self._master_token:
462                 # Get buildlog for the error message
463
464                 (buildlog, err), proc = self._popen_ssh_command(
465                     "cat %(buildlog)s" % {
466                         'buildlog' : os.path.join(self.home_path, 'buildlog'),
467                         'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'),
468                     },
469                     timeout = 120,
470                     noerrors = True)
471                 
472                 proc.wait()
473                 
474                 if self.check_bad_host(buildlog, err):
475                     self.node.blacklist()
476                 elif self._master and trial < 3 and 'BAD TOKEN' in buildlog or 'BAD TOKEN' in err:
477                     # bad sync with master, may try again
478                     # but first wait for master
479                     self._master.async_setup_wait()
480                     self._launch_build(trial+1)
481                     return self._do_wait_build(trial+1)
482                 elif trial < 3:
483                     return self._do_wait_build(trial+1)
484                 else:
485                     # No longer need'em
486                     self._master_prk = None
487                     self._master_puk = None
488         
489                     raise RuntimeError, "Failed to set up application %s: "\
490                             "build failed, got wrong token from pid %s/%s "\
491                             "(expected %r, got %r), see buildlog at %s:\n%s" % (
492                         self.home_path, pid, ppid, self._master_token, slave_token, self.node.hostname, buildlog)
493
494             # No longer need'em
495             self._master_prk = None
496             self._master_puk = None
497         
498             self._logger.info("Built %s at %s", self, self.node.hostname)
499
500     def _do_kill_build(self):
501         pid = self._build_pid
502         ppid = self._build_ppid
503         
504         if pid and ppid:
505             self._logger.info("Killing build of %s", self)
506             rspawn.remote_kill(
507                 pid, ppid,
508                 host = self.node.hostname,
509                 port = None,
510                 user = self.node.slicename,
511                 agent = None,
512                 ident_key = self.node.ident_path,
513                 hostip = self.node.hostip
514                 )
515         
516         
517     def _do_build_master(self):
518         if not self.sources and not self.build and not self.buildDepends:
519             return None
520             
521         if self.sources:
522             sources = self.sources.split(' ')
523             
524             # Copy all sources
525             try:
526                 self._popen_scp(
527                     sources,
528                     "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
529                         os.path.join(self.home_path,'.'),)
530                     )
531             except RuntimeError, e:
532                 raise RuntimeError, "Failed upload source file %r: %s %s" \
533                         % (sources, e.args[0], e.args[1],)
534             
535         buildscript = cStringIO.StringIO()
536         
537         buildscript.write("(\n")
538         
539         if self.buildDepends:
540             # Install build dependencies
541             buildscript.write(
542                 "sudo -S yum -y install %(packages)s\n" % {
543                     'packages' : self.buildDepends
544                 }
545             )
546         
547             
548         if self.build:
549             # Build sources
550             buildscript.write(
551                 "mkdir -p build && ( cd build && ( %(command)s ) )\n" % {
552                     'command' : self._replace_paths(self.build),
553                     'home' : server.shell_escape(self.home_path),
554                 }
555             )
556         
557             # Make archive
558             buildscript.write("tar czf build.tar.gz build\n")
559         
560         # Write token
561         buildscript.write("echo %(master_token)s > build.token ) ; echo %(master_token)s > build.token.retcode" % {
562             'master_token' : server.shell_escape(self._master_token)
563         })
564         
565         buildscript.seek(0)
566
567         return buildscript
568
569     def _do_install(self):
570         if self.install:
571             self._logger.info("Installing %s at %s", self, self.node.hostname)
572            
573             # Install application
574             try:
575                 self._popen_ssh_command(
576                     "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/{install,build}log >&2 && false )" % \
577                         {
578                         'command' : self._replace_paths(self.install),
579                         'home' : server.shell_escape(self.home_path),
580                         },
581                     )
582             except RuntimeError, e:
583                 if self.check_bad_host(e.args[0], e.args[1]):
584                     self.node.blacklist()
585                 raise RuntimeError, "Failed install build sources: %s %s" % (e.args[0], e.args[1],)
586
587     def set_master(self, master):
588         self._master = master
589         
590     def install_keys(self, prk, puk, passphrase):
591         # Install keys
592         self._master_passphrase = passphrase
593         self._master_prk = prk
594         self._master_puk = puk
595         self._master_prk_name = os.path.basename(prk.name)
596         self._master_puk_name = os.path.basename(puk.name)
597         
598     def _do_install_keys(self):
599         prk = self._master_prk
600         puk = self._master_puk
601        
602         try:
603             self._popen_scp(
604                 [ prk.name, puk.name ],
605                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path )
606                 )
607         except RuntimeError, e:
608             raise RuntimeError, "Failed to set up application deployment keys: %s %s" \
609                     % (e.args[0], e.args[1],)
610
611         try:
612             self._popen_scp(
613                 cStringIO.StringIO('%s,%s %s\n' % (
614                     self._master.node.hostname, self._master.node.hostip, 
615                     self._master.node.server_key)),
616                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
617                     os.path.join(self.home_path,"master_known_hosts") )
618                 )
619         except RuntimeError, e:
620             raise RuntimeError, "Failed to set up application deployment keys: %s %s" \
621                     % (e.args[0], e.args[1],)
622         
623     
624     def cleanup(self):
625         # make sure there's no leftover build processes
626         self._do_kill_build()
627         
628         # No longer need'em
629         self._master_prk = None
630         self._master_puk = None
631
632     @server.eintr_retry
633     def _popen_scp(self, src, dst, retry = 3):
634         while 1:
635             try:
636                 (out,err),proc = server.popen_scp(
637                     src,
638                     dst, 
639                     port = None,
640                     agent = None,
641                     ident_key = self.node.ident_path,
642                     server_key = self.node.server_key
643                     )
644
645                 if server.eintr_retry(proc.wait)():
646                     raise RuntimeError, (out, err)
647                 return (out, err), proc
648             except:
649                 if retry <= 0:
650                     raise
651                 else:
652                     retry -= 1
653   
654
655     @server.eintr_retry
656     def _popen_ssh_command(self, command, retry = 0, noerrors=False, timeout=None):
657         (out,err),proc = server.popen_ssh_command(
658             command,
659             host = self.node.hostname,
660             port = None,
661             user = self.node.slicename,
662             agent = None,
663             ident_key = self.node.ident_path,
664             server_key = self.node.server_key,
665             timeout = timeout,
666             retry = retry
667             )
668
669         if server.eintr_retry(proc.wait)():
670             if not noerrors:
671                 raise RuntimeError, (out, err)
672         return (out, err), proc
673
674 class Application(Dependency):
675     """
676     An application also has dependencies, but also a command to be ran and monitored.
677     
678     It adds the output of that command as traces.
679     """
680     
681     TRACES = ('stdout','stderr','buildlog', 'output')
682     
683     def __init__(self, api=None):
684         super(Application,self).__init__(api)
685         
686         # Attributes
687         self.command = None
688         self.sudo = False
689         
690         self.stdin = None
691         self.stdout = None
692         self.stderr = None
693         self.output = None
694         
695         # Those are filled when the app is started
696         #   Having both pid and ppid makes it harder
697         #   for pid rollover to induce tracking mistakes
698         self._started = False
699         self._pid = None
700         self._ppid = None
701
702         # Do not add to the python path of nodes
703         self.add_to_path = False
704     
705     def __str__(self):
706         return "%s<command:%s%s>" % (
707             self.__class__.__name__,
708             "sudo " if self.sudo else "",
709             self.command,
710         )
711     
712     def start(self):
713         self._logger.info("Starting %s", self)
714         
715         # Create shell script with the command
716         # This way, complex commands and scripts can be ran seamlessly
717         # sync files
718         command = cStringIO.StringIO()
719         command.write('export PYTHONPATH=$PYTHONPATH:%s\n' % (
720             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
721         ))
722         command.write('export PATH=$PATH:%s\n' % (
723             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
724         ))
725         if self.node.env:
726             for envkey, envvals in self.node.env.iteritems():
727                 for envval in envvals:
728                     command.write('export %s=%s\n' % (envkey, envval))
729         command.write(self.command)
730         command.seek(0)
731
732         try:
733             self._popen_scp(
734                 command,
735                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
736                     os.path.join(self.home_path, "app.sh"))
737                 )
738         except RuntimeError, e:
739             raise RuntimeError, "Failed to set up application: %s %s" \
740                     % (e.args[0], e.args[1],)
741         
742         # Start process in a "daemonized" way, using nohup and heavy
743         # stdin/out redirection to avoid connection issues
744         (out,err),proc = rspawn.remote_spawn(
745             self._replace_paths("bash ./app.sh"),
746             
747             pidfile = './pid',
748             home = self.home_path,
749             stdin = 'stdin' if self.stdin is not None else '/dev/null',
750             stdout = 'stdout' if self.stdout else '/dev/null',
751             stderr = 'stderr' if self.stderr else '/dev/null',
752             sudo = self.sudo,
753             
754             host = self.node.hostname,
755             port = None,
756             user = self.node.slicename,
757             agent = None,
758             ident_key = self.node.ident_path,
759             server_key = self.node.server_key
760             )
761         
762         if proc.wait():
763             if self.check_bad_host(out, err):
764                 self.node.blacklist()
765             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
766
767         self._started = True
768     
769     def recover(self):
770         # Assuming the application is running on PlanetLab,
771         # proper pidfiles should be present at the app's home path.
772         # So we mark this application as started, and check the pidfiles
773         self._started = True
774         self.checkpid()
775
776     def checkpid(self):            
777         # Get PID/PPID
778         # NOTE: wait a bit for the pidfile to be created
779         if self._started and not self._pid or not self._ppid:
780             pidtuple = rspawn.remote_check_pid(
781                 os.path.join(self.home_path,'pid'),
782                 host = self.node.hostname,
783                 port = None,
784                 user = self.node.slicename,
785                 agent = None,
786                 ident_key = self.node.ident_path,
787                 server_key = self.node.server_key
788                 )
789             
790             if pidtuple:
791                 self._pid, self._ppid = pidtuple
792     
793     def status(self):
794         self.checkpid()
795         if not self._started:
796             return AS.STATUS_NOT_STARTED
797         elif not self._pid or not self._ppid:
798             return AS.STATUS_NOT_STARTED
799         else:
800             status = rspawn.remote_status(
801                 self._pid, self._ppid,
802                 host = self.node.hostname,
803                 port = None,
804                 user = self.node.slicename,
805                 agent = None,
806                 ident_key = self.node.ident_path,
807                 server_key = self.node.server_key
808                 )
809             
810             if status is rspawn.NOT_STARTED:
811                 return AS.STATUS_NOT_STARTED
812             elif status is rspawn.RUNNING:
813                 return AS.STATUS_RUNNING
814             elif status is rspawn.FINISHED:
815                 return AS.STATUS_FINISHED
816             else:
817                 # WTF?
818                 return AS.STATUS_NOT_STARTED
819     
820     def kill(self):
821         status = self.status()
822         if status == AS.STATUS_RUNNING:
823             # kill by ppid+pid - SIGTERM first, then try SIGKILL
824             rspawn.remote_kill(
825                 self._pid, self._ppid,
826                 host = self.node.hostname,
827                 port = None,
828                 user = self.node.slicename,
829                 agent = None,
830                 ident_key = self.node.ident_path,
831                 server_key = self.node.server_key,
832                 sudo = self.sudo
833                 )
834             self._logger.info("Killed %s", self)
835
836
837 class NepiDependency(Dependency):
838     """
839     This dependency adds nepi itself to the python path,
840     so that you may run testbeds within PL nodes.
841     """
842     
843     # Class attribute holding a *weak* reference to the shared NEPI tar file
844     # so that they may share it. Don't operate on the file itself, it would
845     # be a mess, just use its path.
846     _shared_nepi_tar = None
847     
848     def __init__(self, api = None):
849         super(NepiDependency, self).__init__(api)
850         
851         self._tarball = None
852         
853         self.depends = 'python python-ipaddr python-setuptools'
854         
855         # our sources are in our ad-hoc tarball
856         self.sources = self.tarball.name
857         
858         tarname = os.path.basename(self.tarball.name)
859         
860         # it's already built - just move the tarball into place
861         self.build = "mv -f ${SOURCES}/%s ." % (tarname,)
862         
863         # unpack it into sources, and we're done
864         self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,)
865     
866     @property
867     def tarball(self):
868         if self._tarball is None:
869             shared_tar = self._shared_nepi_tar and self._shared_nepi_tar()
870             if shared_tar is not None:
871                 self._tarball = shared_tar
872             else:
873                 # Build an ad-hoc tarball
874                 # Prebuilt
875                 import nepi
876                 import tempfile
877                 
878                 shared_tar = tempfile.NamedTemporaryFile(prefix='nepi-src-', suffix='.tar.gz')
879                 
880                 proc = subprocess.Popen(
881                     ["tar", "czf", shared_tar.name, 
882                         '-C', os.path.join(os.path.dirname(os.path.dirname(nepi.__file__)),'.'), 
883                         'nepi'],
884                     stdout = open("/dev/null","w"),
885                     stdin = open("/dev/null","r"))
886
887                 if proc.wait():
888                     raise RuntimeError, "Failed to create nepi tarball"
889                 
890                 self._tarball = self._shared_nepi_tar = shared_tar
891                 
892         return self._tarball
893
894 class NS3Dependency(Dependency):
895     """
896     This dependency adds NS3 libraries to the library paths,
897     so that you may run the NS3 testbed within PL nodes.
898     
899     You'll also need the NepiDependency.
900     """
901     
902     def __init__(self, api = None):
903         super(NS3Dependency, self).__init__(api)
904         
905         self.buildDepends = 'make waf gcc gcc-c++ gccxml unzip bzr'
906         
907         # We have to download the sources, untar, build...
908         pygccxml_source_url = "http://leaseweb.dl.sourceforge.net/project/pygccxml/pygccxml/pygccxml-1.0/pygccxml-1.0.0.zip"
909         ns3_source_url = "http://nepi.pl.sophia.inria.fr/code/nepi-ns3.13/archive/tip.tar.gz"
910         passfd_source_url = "http://nepi.pl.sophia.inria.fr/code/python-passfd/archive/tip.tar.gz"
911         
912         pybindgen_version = "797"
913
914         self.build =(
915             " ( "
916             "  cd .. && "
917             "  python -c 'import pygccxml, pybindgen, passfd' && "
918             "  test -f lib/ns/_core.so && "
919             "  test -f lib/ns/__init__.py && "
920             "  test -f lib/ns/core.py && "
921             "  test -f lib/libns3-core.so && "
922             "  LD_LIBRARY_PATH=lib PYTHONPATH=lib python -c 'import ns.core' "
923             " ) || ( "
924                 # Not working, rebuild
925                      # Archive SHA1 sums to check
926                      "echo '7158877faff2254e6c094bf18e6b4283cac19137  pygccxml-1.0.0.zip' > archive_sums.txt && "
927                      " ( " # check existing files
928                      " sha1sum -c archive_sums.txt && "
929                      " test -f passfd-src.tar.gz && "
930                      " test -f ns3-src.tar.gz "
931                      " ) || ( " # nope? re-download
932                      " rm -rf pybindgen pygccxml-1.0.0.zip passfd-src.tar.gz ns3-src.tar.gz && "
933                      " bzr checkout lp:pybindgen -r %(pybindgen_version)s && " # continue, to exploit the case when it has already been dl'ed
934                      " wget -q -c -O pygccxml-1.0.0.zip %(pygccxml_source_url)s && " 
935                      " wget -q -c -O passfd-src.tar.gz %(passfd_source_url)s && "
936                      " wget -q -c -O ns3-src.tar.gz %(ns3_source_url)s && "  
937                      " sha1sum -c archive_sums.txt " # Check SHA1 sums when applicable
938                      " ) && "
939                      "unzip -n pygccxml-1.0.0.zip && "
940                      "mkdir -p ns3-src && "
941                      "mkdir -p passfd-src && "
942                      "tar xzf ns3-src.tar.gz --strip-components=1 -C ns3-src && "
943                      "tar xzf passfd-src.tar.gz --strip-components=1 -C passfd-src && "
944                      "rm -rf target && "    # mv doesn't like unclean targets
945                      "mkdir -p target && "
946                      "cd pygccxml-1.0.0 && "
947                      "rm -rf unittests docs && " # pygccxml has ~100M of unit tests - excessive - docs aren't needed either
948                      "python setup.py build && "
949                      "python setup.py install --install-lib ${BUILD}/target && "
950                      "python setup.py clean && "
951                      "cd ../pybindgen && "
952                      "export PYTHONPATH=$PYTHONPATH:${BUILD}/target && "
953                      "./waf configure --prefix=${BUILD}/target -d release && "
954                      "./waf && "
955                      "./waf install && "
956                      "./waf clean && "
957                      "mv -f ${BUILD}/target/lib/python*/site-packages/pybindgen ${BUILD}/target/. && "
958                      "rm -rf ${BUILD}/target/lib && "
959                      "cd ../passfd-src && "
960                      "python setup.py build && "
961                      "python setup.py install --install-lib ${BUILD}/target && "
962                      "python setup.py clean && "
963                      "cd ../ns3-src && "
964                      "./waf configure --prefix=${BUILD}/target --with-pybindgen=../pybindgen-src -d release --disable-examples --disable-tests && "
965                      "./waf &&"
966                      "./waf install && "
967                      "rm -f ${BUILD}/target/lib/*.so && "
968                      "cp -a ${BUILD}/ns3-src/build/libns3*.so ${BUILD}/target/lib && "
969                      "cp -a ${BUILD}/ns3-src/build/bindings/python/ns ${BUILD}/target/lib &&"
970                      "./waf clean "
971              " )"
972                      % dict(
973                         pybindgen_version = server.shell_escape(pybindgen_version),
974                         pygccxml_source_url = server.shell_escape(pygccxml_source_url),
975                         ns3_source_url = server.shell_escape(ns3_source_url),
976                         passfd_source_url = server.shell_escape(passfd_source_url),
977                      ))
978         
979         # Just move ${BUILD}/target
980         self.install = (
981             " ( "
982             "  cd .. && "
983             "  python -c 'import pygccxml, pybindgen, passfd' && "
984             "  test -f lib/ns/_core.so && "
985             "  test -f lib/ns/__init__.py && "
986             "  test -f lib/ns/core.py && "
987             "  test -f lib/libns3-core.so && "
988             "  LD_LIBRARY_PATH=lib PYTHONPATH=lib python -c 'import ns.core' "
989             " ) || ( "
990                 # Not working, reinstall
991                     "test -d ${BUILD}/target && "
992                     "[[ \"x\" != \"x$(find ${BUILD}/target -mindepth 1 -print -quit)\" ]] &&"
993                     "( for i in ${BUILD}/target/* ; do rm -rf ${SOURCES}/${i##*/} ; done ) && " # mv doesn't like unclean targets
994                     "mv -f ${BUILD}/target/* ${SOURCES}"
995             " )"
996         )
997         
998         # Set extra environment paths
999         self.env['NEPI_NS3BINDINGS'] = "${SOURCES}/lib"
1000         self.env['NEPI_NS3LIBRARY'] = "${SOURCES}/lib"
1001     
1002     @property
1003     def tarball(self):
1004         if self._tarball is None:
1005             shared_tar = self._shared_nepi_tar and self._shared_nepi_tar()
1006             if shared_tar is not None:
1007                 self._tarball = shared_tar
1008             else:
1009                 # Build an ad-hoc tarball
1010                 # Prebuilt
1011                 import nepi
1012                 import tempfile
1013                 
1014                 shared_tar = tempfile.NamedTemporaryFile(prefix='nepi-src-', suffix='.tar.gz')
1015                 
1016                 proc = subprocess.Popen(
1017                     ["tar", "czf", shared_tar.name, 
1018                         '-C', os.path.join(os.path.dirname(os.path.dirname(nepi.__file__)),'.'), 
1019                         'nepi'],
1020                     stdout = open("/dev/null","w"),
1021                     stdin = open("/dev/null","r"))
1022
1023                 if proc.wait():
1024                     raise RuntimeError, "Failed to create nepi tarball"
1025                 
1026                 self._tarball = self._shared_nepi_tar = shared_tar
1027                 
1028         return self._tarball
1029
1030 class YumDependency(Dependency):
1031     """
1032     This dependency is an internal helper class used to
1033     efficiently distribute yum-downloaded rpms.
1034     
1035     It temporarily sets the yum cache as persistent in the
1036     build master, and installs all the required packages.
1037     
1038     The rpm packages left in the yum cache are gathered and
1039     distributed by the underlying Dependency in an efficient
1040     manner. Build slaves will then install those rpms back in
1041     the cache before issuing the install command.
1042     
1043     When packages have been installed already, nothing but an
1044     empty tar is distributed.
1045     """
1046     
1047     # Class attribute holding a *weak* reference to the shared NEPI tar file
1048     # so that they may share it. Don't operate on the file itself, it would
1049     # be a mess, just use its path.
1050     _shared_nepi_tar = None
1051     
1052     def _build_get(self):
1053         # canonical representation of dependencies
1054         depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
1055         
1056         # download rpms and pack into a tar archive
1057         return (
1058             "sudo -S nice yum -y makecache && "
1059             "sudo -S sed -i -r 's/keepcache *= *0/keepcache=1/' /etc/yum.conf && "
1060             " ( ( "
1061                 "sudo -S nice yum -y install %s ; "
1062                 "rm -f ${BUILD}/packages.tar ; "
1063                 "tar -C /var/cache/yum -rf ${BUILD}/packages.tar $(cd /var/cache/yum ; find -iname '*.rpm')"
1064             " ) || /bin/true ) && "
1065             "sudo -S sed -i -r 's/keepcache *= *1/keepcache=0/' /etc/yum.conf && "
1066             "( sudo -S nice yum -y clean packages || /bin/true ) "
1067         ) % ( depends, )
1068     def _build_set(self, value):
1069         # ignore
1070         return
1071     build = property(_build_get, _build_set)
1072     
1073     def _install_get(self):
1074         # canonical representation of dependencies
1075         depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
1076         
1077         # unpack cached rpms into yum cache, install, and cleanup
1078         return (
1079             "sudo -S tar -k --keep-newer-files -C /var/cache/yum -xf packages.tar && "
1080             "sudo -S nice yum -y install %s && "
1081             "( sudo -S nice yum -y clean packages || /bin/true ) "
1082         ) % ( depends, )
1083     def _install_set(self, value):
1084         # ignore
1085         return
1086     install = property(_install_get, _install_set)
1087         
1088     def check_bad_host(self, out, err):
1089         badre = re.compile(r'(?:'
1090                            r'The GPG keys listed for the ".*" repository are already installed but they are not correct for this package'
1091                            r'|Error: Cannot retrieve repository metadata (repomd.xml) for repository: .*[.] Please verify its path and try again'
1092                            r'|Error: disk I/O error'
1093                            r'|MASTER NODE UNREACHABLE'
1094                            r')', 
1095                            re.I)
1096         return badre.search(out) or badre.search(err) or self.node.check_bad_host(out,err)
1097
1098
1099 class CCNxDaemon(Application):
1100     """
1101     An application also has dependencies, but also a command to be ran and monitored.
1102     
1103     It adds the output of that command as traces.
1104     """
1105     
1106     def __init__(self, api=None):
1107         super(CCNxDaemon,self).__init__(api)
1108         
1109         # Attributes
1110         self.ccnLocalPort = None
1111         self.ccnRoutes = None
1112         self.ccnxVersion = "ccnx-0.6.0"
1113         
1114         self.ccnx_0_5_1_sources = "http://www.ccnx.org/releases/ccnx-0.5.1.tar.gz"
1115         self.ccnx_0_6_0_sources = "http://www.ccnx.org/releases/ccnx-0.6.0.tar.gz"
1116         self.buildDepends = 'make gcc development-tools openssl-devel expat-devel libpcap-devel libxml2-devel'
1117
1118         self.ccnx_0_5_1_build = (
1119             " ( "
1120             "  cd .. && "
1121             "  test -d ccnx-0.5.1-src/build/bin "
1122             " ) || ( "
1123                 # Not working, rebuild
1124                 "("
1125                      " mkdir -p ccnx-0.5.1-src && "
1126                      " wget -q -c -O ccnx-0.5.1-src.tar.gz %(ccnx_source_url)s &&"
1127                      " tar xf ccnx-0.5.1-src.tar.gz --strip-components=1 -C ccnx-0.5.1-src "
1128                 ") && "
1129                      "cd ccnx-0.5.1-src && "
1130                      "mkdir -p build/include &&"
1131                      "mkdir -p build/lib &&"
1132                      "mkdir -p build/bin &&"
1133                      "I=$PWD/build && "
1134                      "INSTALL_BASE=$I ./configure &&"
1135                      "make && make install"
1136              " )") % dict(
1137                      ccnx_source_url = server.shell_escape(self.ccnx_0_5_1_sources),
1138                 )
1139
1140         self.ccnx_0_5_1_install = (
1141             " ( "
1142             "  test -d ${BUILD}/ccnx-0.5.1-src/build/bin && "
1143             "  cp -r ${BUILD}/ccnx-0.5.1-src/build/bin ${SOURCES}"
1144             " )"
1145         )
1146
1147         self.ccnx_0_6_0_build = (
1148             " ( "
1149             "  cd .. && "
1150             "  test -d ccnx-0.6.0-src/build/bin "
1151             " ) || ( "
1152                 # Not working, rebuild
1153                 "("
1154                      " mkdir -p ccnx-0.6.0-src && "
1155                      " wget -q -c -O ccnx-0.6.0-src.tar.gz %(ccnx_source_url)s &&"
1156                      " tar xf ccnx-0.6.0-src.tar.gz --strip-components=1 -C ccnx-0.6.0-src "
1157                 ") && "
1158                      "cd ccnx-0.6.0-src && "
1159                      "./configure && make"
1160              " )") % dict(
1161                      ccnx_source_url = server.shell_escape(self.ccnx_0_6_0_sources),
1162                 )
1163
1164         self.ccnx_0_6_0_install = (
1165             " ( "
1166             "  test -d ${BUILD}/ccnx-0.6.0-src/bin && "
1167             "  cp -r ${BUILD}/ccnx-0.6.0-src/bin ${SOURCES}"
1168             " )"
1169         )
1170
1171         self.env['PATH'] = "$PATH:${SOURCES}/bin"
1172
1173     def setup(self):
1174         # setting ccn sources
1175         if not self.build:
1176             if self.ccnxVersion == 'ccnx-0.6.0':
1177                 self.build = self.ccnx_0_6_0_build
1178             elif self.ccnxVersion == 'ccnx-0.5.1':
1179                 self.build = self.ccnx_0_5_1_build
1180
1181         if not self.install:
1182             if self.ccnxVersion == 'ccnx-0.6.0':
1183                 self.install = self.ccnx_0_6_0_install
1184             elif self.ccnxVersion == 'ccnx-0.5.1':
1185                 self.install = self.ccnx_0_5_1_install
1186
1187         super(CCNxDaemon, self).setup()
1188
1189     def start(self):
1190         self.command = ""
1191         if self.ccnLocalPort:
1192             self.command = "export CCN_LOCAL_PORT=%s ; " % self.ccnLocalPort
1193         self.command += " ccndstart "
1194
1195         # configure ccn routes
1196         if self.ccnRoutes:
1197             routes = self.ccnRoutes.split("|")
1198             
1199             if self.ccnLocalPort:
1200                 routes = map(lambda route: "%s %s" %(route, 
1201                     self.ccnLocalPort) if _ccnre.match(route) else route, 
1202                         routes)
1203
1204             routes = map(lambda route: "ccndc add ccnx:/ %s" % route, 
1205                 routes)
1206
1207             routescmd = " ; ".join(routes)
1208             self.command += " ; "
1209             self.command += routescmd
1210
1211         # Start will be invoked in prestart step
1212         super(CCNxDaemon, self).start()
1213             
1214     def kill(self):
1215         self._logger.info("Killing %s", self)
1216
1217         command = "${SOURCES}/bin/ccndstop"
1218
1219         if self.ccnLocalPort:
1220             self.command = "export CCN_LOCAL_PORT=%s; %s" % (self.ccnLocalPort, command)
1221
1222         cmd = self._replace_paths(command)
1223         command = cStringIO.StringIO()
1224         command.write(cmd)
1225         command.seek(0)
1226
1227         try:
1228             self._popen_scp(
1229                 command,
1230                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
1231                     os.path.join(self.home_path, "kill.sh"))
1232                 )
1233         except RuntimeError, e:
1234             raise RuntimeError, "Failed to kill ccndxdaemon: %s %s" \
1235                     % (e.args[0], e.args[1],)
1236         
1237
1238         script = "bash ./kill.sh"
1239         (out,err),proc = rspawn.remote_spawn(
1240             script,
1241             pidfile = 'kill-pid',
1242             home = self.home_path,
1243             stdin = '/dev/null',
1244             stdout = 'killlog',
1245             stderr = rspawn.STDOUT,
1246             
1247             host = self.node.hostname,
1248             port = None,
1249             user = self.node.slicename,
1250             agent = None,
1251             ident_key = self.node.ident_path,
1252             server_key = self.node.server_key,
1253             hostip = self.node.hostip,
1254             )
1255         
1256         if proc.wait():
1257             raise RuntimeError, "Failed to kill cnnxdaemon: %s %s" % (out,err,)
1258         
1259         super(CCNxDaemon, self).kill()
1260