55883f4894c3778bad77bafebf01997c3638350f
[nepi.git] / src / nepi / testbeds / planetlab / application.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import os
7 import os.path
8 import sys
9 import nepi.util.server as server
10 import cStringIO
11 import subprocess
12 import rspawn
13 import random
14 import time
15 import socket
16 import threading
17 import logging
18 import re
19
20 from nepi.util.constants import ApplicationStatus as AS
21
22 _ccnre = re.compile("\s*(udp|tcp)\s+(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\s*$")
23
24 class Dependency(object):
25     """
26     A Dependency is in every respect like an application.
27     
28     It depends on some packages, it may require building binaries, it must deploy
29     them...
30     
31     But it has no command. Dependencies aren't ever started, or stopped, and have
32     no status.
33     """
34
35     TRACES = ('buildlog')
36
37     def __init__(self, api=None):
38         if not api:
39             api = plcapi.PLCAPI()
40         self._api = api
41         
42         # Attributes
43         self.command = None
44         self.sudo = False
45         
46         self.build = None
47         self.install = None
48         self.depends = None
49         self.buildDepends = None
50         self.sources = None
51         self.rpmFusion = False
52         self.env = {}
53         
54         self.stdin = None
55         self.stdout = None
56         self.stderr = None
57         self.buildlog = None
58         
59         self.add_to_path = True
60         
61         # Those are filled when the app is configured
62         self.home_path = None
63         
64         # Those are filled when an actual node is connected
65         self.node = None
66         
67         # Those are filled when the app is started
68         #   Having both pid and ppid makes it harder
69         #   for pid rollover to induce tracking mistakes
70         self._started = False
71         self._setup = False
72         self._setuper = None
73         self._pid = None
74         self._ppid = None
75
76         # Spanning tree deployment
77         self._master = None
78         self._master_passphrase = None
79         self._master_prk = None
80         self._master_puk = None
81         self._master_token = os.urandom(8).encode("hex")
82         self._build_pid = None
83         self._build_ppid = None
84         
85         # Logging
86         self._logger = logging.getLogger('nepi.testbeds.planetlab')
87         
88     
89     def __str__(self):
90         return "%s<%s>" % (
91             self.__class__.__name__,
92             ' '.join(filter(bool,(self.depends, self.sources)))
93         )
94    
95     def deployed(self):
96         return self._setup
97
98     def validate(self):
99         if self.home_path is None:
100             raise AssertionError, "Misconfigured application: missing home path"
101         if self.node.ident_path is None or not os.access(self.node.ident_path, os.R_OK):
102             raise AssertionError, "Misconfigured application: missing slice SSH key"
103         if self.node is None:
104             raise AssertionError, "Misconfigured application: unconnected node"
105         if self.node.hostname is None:
106             raise AssertionError, "Misconfigured application: misconfigured node"
107         if self.node.slicename is None:
108             raise AssertionError, "Misconfigured application: unspecified slice"
109     
110     def check_bad_host(self, out, err):
111         """
112         Called whenever an operation fails, it's given the output to be checked for
113         telltale signs of unhealthy hosts.
114         """
115         return False
116     
117     def remote_trace_path(self, whichtrace):
118         if whichtrace in self.TRACES:
119             tracefile = os.path.join(self.home_path, whichtrace)
120         else:
121             tracefile = None
122         
123         return tracefile
124
125     def remote_trace_name(self, whichtrace):
126         if whichtrace in self.TRACES:
127             return whichtrace
128         return None
129
130     def sync_trace(self, local_dir, whichtrace):
131         tracefile = self.remote_trace_path(whichtrace)
132         if not tracefile:
133             return None
134         
135         local_path = os.path.join(local_dir, tracefile)
136         
137         # create parent local folders
138         proc = subprocess.Popen(
139             ["mkdir", "-p", os.path.dirname(local_path)],
140             stdout = open("/dev/null","w"),
141             stdin = open("/dev/null","r"))
142
143         if proc.wait():
144             raise RuntimeError, "Failed to synchronize trace"
145         
146         # sync files
147         try:
148             self._popen_scp(
149                 '%s@%s:%s' % (self.node.slicename, self.node.hostname,
150                     tracefile),
151                 local_path
152                 )
153         except RuntimeError, e:
154             raise RuntimeError, "Failed to synchronize trace: %s %s" \
155                     % (e.args[0], e.args[1],)
156         
157         return local_path
158     
159     def recover(self):
160         # We assume a correct deployment, so recovery only
161         # means we mark this dependency as deployed
162         self._setup = True
163
164     def setup(self):
165         self._logger.info("Setting up %s", self)
166         self._make_home()
167         self._launch_build()
168         self._finish_build()
169         self._setup = True
170     
171     def async_setup(self):
172         if not self._setuper:
173             def setuper():
174                 try:
175                     self.setup()
176                 except:
177                     self._setuper._exc.append(sys.exc_info())
178             self._setuper = threading.Thread(
179                 target = setuper)
180             self._setuper._exc = []
181             self._setuper.start()
182     
183     def async_setup_wait(self):
184         if not self._setup:
185             self._logger.info("Waiting for %s to be setup", self)
186             if self._setuper:
187                 self._setuper.join()
188                 if not self._setup:
189                     if self._setuper._exc:
190                         exctyp,exval,exctrace = self._setuper._exc[0]
191                         raise exctyp,exval,exctrace
192                     else:
193                         raise RuntimeError, "Failed to setup application"
194                 else:
195                     self._logger.info("Setup ready: %s at %s", self, self.node.hostname)
196             else:
197                 self.setup()
198         
199     def _make_home(self):
200         # Make sure all the paths are created where 
201         # they have to be created for deployment
202         # sync files
203         try:
204             self._popen_ssh_command(
205                 "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \
206                     % { 'home' : server.shell_escape(self.home_path) },
207                 timeout = 120,
208                 retry = 3
209                 )
210         except RuntimeError, e:
211             raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],)
212         
213         if self.stdin:
214             stdin = self.stdin
215             if not os.path.isfile(stdin):
216                 stdin = cStringIO.StringIO(self.stdin)
217
218             # Write program input
219             try:
220                 self._popen_scp(stdin,
221                     '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
222                         os.path.join(self.home_path, 'stdin') ),
223                     )
224             except RuntimeError, e:
225                 raise RuntimeError, "Failed to set up application %s: %s %s" \
226                         % (self.home_path, e.args[0], e.args[1],)
227
228     def _replace_paths(self, command):
229         """
230         Replace all special path tags with shell-escaped actual paths.
231         """
232         # need to append ${HOME} if paths aren't absolute, to MAKE them absolute.
233         root = '' if self.home_path.startswith('/') else "${HOME}/"
234         return ( command
235             .replace("${SOURCES}", root+server.shell_escape(self.home_path))
236             .replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) )
237
238     def _launch_build(self, trial=0):
239         if self._master is not None:
240             if not trial or self._master_prk is not None:
241                 self._do_install_keys()
242             buildscript = self._do_build_slave()
243         else:
244             buildscript = self._do_build_master()
245             
246         if buildscript is not None:
247             self._logger.info("Building %s at %s", self, self.node.hostname)
248             
249             # upload build script
250             try:
251                 self._popen_scp(
252                     buildscript,
253                     '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
254                         os.path.join(self.home_path, 'nepi-build.sh') )
255                     )
256             except RuntimeError, e:
257                 raise RuntimeError, "Failed to set up application %s: %s %s" \
258                         % (self.home_path, e.args[0], e.args[1],)
259             
260             # launch build
261             self._do_launch_build()
262     
263     def _finish_build(self):
264         self._do_wait_build()
265         self._do_install()
266
267     def _do_build_slave(self):
268         if not self.sources and not self.build:
269             return None
270             
271         # Create build script
272         files = set()
273         
274         if self.sources:
275             sources = self.sources.split(' ')
276             files.update(
277                 "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostip, 
278                     os.path.join(self._master.home_path, os.path.basename(source)),)
279                 for source in sources
280             )
281         
282         if self.build:
283             files.add(
284                 "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostip, 
285                     os.path.join(self._master.home_path, 'build.tar.gz'),)
286             )
287         
288         sshopts = "-o ConnectTimeout=30 -o ConnectionAttempts=3 -o ServerAliveInterval=30 -o TCPKeepAlive=yes"
289         
290         launch_agent = "{ ( echo -e '#!/bin/sh\\ncat' > .ssh-askpass ) && chmod u+x .ssh-askpass"\
291                         " && export SSH_ASKPASS=$(pwd)/.ssh-askpass "\
292                         " && ssh-agent > .ssh-agent.sh ; } && . ./.ssh-agent.sh && ( echo $NEPI_MASTER_PASSPHRASE | ssh-add %(prk)s ) && rm -rf %(prk)s %(puk)s" %  \
293         {
294             'prk' : server.shell_escape(self._master_prk_name),
295             'puk' : server.shell_escape(self._master_puk_name),
296         }
297         
298         kill_agent = "kill $SSH_AGENT_PID"
299         
300         waitmaster = (
301             "{ "
302             "echo 'Checking master reachability' ; "
303             "if ping -c 3 %(master_host)s && (. ./.ssh-agent.sh > /dev/null ; ssh -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s echo MASTER SAYS HI ) ; then "
304             "echo 'Master node reachable' ; "
305             "else "
306             "echo 'MASTER NODE UNREACHABLE' && "
307             "exit 1 ; "
308             "fi ; "
309             ". ./.ssh-agent.sh ; "
310             "while [[ $(. ./.ssh-agent.sh > /dev/null ; ssh -q -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s cat %(token_path)s.retcode || /bin/true) != %(token)s ]] ; do sleep 5 ; done ; "
311             "if [[ $(. ./.ssh-agent.sh > /dev/null ; ssh -q -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s cat %(token_path)s || /bin/true) != %(token)s ]] ; then echo BAD TOKEN ; exit 1 ; fi ; "
312             "}" 
313         ) % {
314             'hostkey' : 'master_known_hosts',
315             'master' : "%s@%s" % (self._master.node.slicename, self._master.node.hostip),
316             'master_host' : self._master.node.hostip,
317             'token_path' : os.path.join(self._master.home_path, 'build.token'),
318             'token' : server.shell_escape(self._master._master_token),
319             'sshopts' : sshopts,
320         }
321         
322         syncfiles = ". ./.ssh-agent.sh && scp -p -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(files)s ." % {
323             'hostkey' : 'master_known_hosts',
324             'files' : ' '.join(files),
325             'sshopts' : sshopts,
326         }
327         if self.build:
328             syncfiles += " && tar xzf build.tar.gz"
329         syncfiles += " && ( echo %s > build.token )" % (server.shell_escape(self._master_token),)
330         syncfiles += " && ( echo %s > build.token.retcode )" % (server.shell_escape(self._master_token),)
331         syncfiles = "{ . ./.ssh-agent.sh ; %s ; }" % (syncfiles,)
332         
333         cleanup = "{ . ./.ssh-agent.sh ; kill $SSH_AGENT_PID ; rm -rf %(prk)s %(puk)s master_known_hosts .ssh-askpass ; }" % {
334             'prk' : server.shell_escape(self._master_prk_name),
335             'puk' : server.shell_escape(self._master_puk_name),
336         }
337         
338         slavescript = "( ( %(launch_agent)s && %(waitmaster)s && %(syncfiles)s && %(kill_agent)s && %(cleanup)s ) || %(cleanup)s ) ; echo %(token)s > build.token.retcode" % {
339             'waitmaster' : waitmaster,
340             'syncfiles' : syncfiles,
341             'cleanup' : cleanup,
342             'kill_agent' : kill_agent,
343             'launch_agent' : launch_agent,
344             'home' : server.shell_escape(self.home_path),
345             'token' : server.shell_escape(self._master_token),
346         }
347         
348         return cStringIO.StringIO(slavescript)
349          
350     def _do_launch_build(self):
351         script = "bash ./nepi-build.sh"
352         if self._master_passphrase:
353             script = "NEPI_MASTER_PASSPHRASE=%s %s" % (
354                 server.shell_escape(self._master_passphrase),
355                 script
356             )
357         (out,err),proc = rspawn.remote_spawn(
358             script,
359             pidfile = 'build-pid',
360             home = self.home_path,
361             stdin = '/dev/null',
362             stdout = 'buildlog',
363             stderr = rspawn.STDOUT,
364             
365             host = self.node.hostname,
366             port = None,
367             user = self.node.slicename,
368             agent = None,
369             ident_key = self.node.ident_path,
370             server_key = self.node.server_key,
371             hostip = self.node.hostip,
372             )
373         
374         if proc.wait():
375             if self.check_bad_host(out, err):
376                 self.node.blacklist()
377             raise RuntimeError, "Failed to set up build slave %s: %s %s" % (self.home_path, out,err,)
378         
379         
380         pid = ppid = None
381         delay = 1.0
382         for i in xrange(5):
383             pidtuple = rspawn.remote_check_pid(
384                 os.path.join(self.home_path,'build-pid'),
385                 host = self.node.hostname,
386                 port = None,
387                 user = self.node.slicename,
388                 agent = None,
389                 ident_key = self.node.ident_path,
390                 server_key = self.node.server_key,
391                 hostip = self.node.hostip
392                 )
393             
394             if pidtuple:
395                 pid, ppid = pidtuple
396                 self._build_pid, self._build_ppid = pidtuple
397                 break
398             else:
399                 time.sleep(delay)
400                 delay = min(30,delay*1.2)
401         else:
402             raise RuntimeError, "Failed to set up build slave %s: cannot get pid" % (self.home_path,)
403
404         self._logger.info("Deploying %s at %s", self, self.node.hostname)
405         
406     def _do_wait_build(self, trial=0):
407         pid = self._build_pid
408         ppid = self._build_ppid
409         
410         if pid and ppid:
411             delay = 1.0
412             first = True
413             bustspin = 0
414             while True:
415                 status = rspawn.remote_status(
416                     pid, ppid,
417                     host = self.node.hostname,
418                     port = None,
419                     user = self.node.slicename,
420                     agent = None,
421                     ident_key = self.node.ident_path,
422                     server_key = self.node.server_key,
423                     hostip = self.node.hostip
424                     )
425                 
426                 if status is rspawn.FINISHED:
427                     self._build_pid = self._build_ppid = None
428                     break
429                 elif status is not rspawn.RUNNING:
430                     self._logger.warn("Busted waiting for %s to finish building at %s %s", self, self.node.hostname,
431                             "(build slave)" if self._master is not None else "(build master)")
432                     bustspin += 1
433                     time.sleep(delay*(5.5+random.random()))
434                     if bustspin > 12:
435                         self._build_pid = self._build_ppid = None
436                         break
437                 else:
438                     if first:
439                         self._logger.info("Waiting for %s to finish building at %s %s", self, self.node.hostname,
440                             "(build slave)" if self._master is not None else "(build master)")
441                         
442                         first = False
443                     time.sleep(delay*(0.5+random.random()))
444                     delay = min(30,delay*1.2)
445                     bustspin = 0
446             
447             # check build token
448             slave_token = ""
449             for i in xrange(3):
450                 (out, err), proc = self._popen_ssh_command(
451                     "cat %(token_path)s" % {
452                         'token_path' : os.path.join(self.home_path, 'build.token'),
453                     },
454                     timeout = 120,
455                     noerrors = True)
456                 if not proc.wait() and out:
457                     slave_token = out.strip()
458                 
459                 if slave_token:
460                     break
461                 else:
462                     time.sleep(2)
463             
464             if slave_token != self._master_token:
465                 # Get buildlog for the error message
466
467                 (buildlog, err), proc = self._popen_ssh_command(
468                     "cat %(buildlog)s" % {
469                         'buildlog' : os.path.join(self.home_path, 'buildlog'),
470                         'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'),
471                     },
472                     timeout = 120,
473                     noerrors = True)
474                 
475                 proc.wait()
476                 
477                 if self.check_bad_host(buildlog, err):
478                     self.node.blacklist()
479                 elif self._master and trial < 3 and 'BAD TOKEN' in buildlog or 'BAD TOKEN' in err:
480                     # bad sync with master, may try again
481                     # but first wait for master
482                     self._master.async_setup_wait()
483                     self._launch_build(trial+1)
484                     return self._do_wait_build(trial+1)
485                 elif trial < 3:
486                     return self._do_wait_build(trial+1)
487                 else:
488                     # No longer need'em
489                     self._master_prk = None
490                     self._master_puk = None
491         
492                     raise RuntimeError, "Failed to set up application %s: "\
493                             "build failed, got wrong token from pid %s/%s "\
494                             "(expected %r, got %r), see buildlog at %s:\n%s" % (
495                         self.home_path, pid, ppid, self._master_token, slave_token, self.node.hostname, buildlog)
496
497             # No longer need'em
498             self._master_prk = None
499             self._master_puk = None
500         
501             self._logger.info("Built %s at %s", self, self.node.hostname)
502
503     def _do_kill_build(self):
504         pid = self._build_pid
505         ppid = self._build_ppid
506         
507         if pid and ppid:
508             self._logger.info("Killing build of %s", self)
509             rspawn.remote_kill(
510                 pid, ppid,
511                 host = self.node.hostname,
512                 port = None,
513                 user = self.node.slicename,
514                 agent = None,
515                 ident_key = self.node.ident_path,
516                 hostip = self.node.hostip
517                 )
518         
519         
520     def _do_build_master(self):
521         if not self.sources and not self.build and not self.buildDepends:
522             return None
523             
524         if self.sources:
525             sources = self.sources.split(' ')
526             
527             # Copy all sources
528             try:
529                 self._popen_scp(
530                     sources,
531                     "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
532                         os.path.join(self.home_path,'.'),)
533                     )
534             except RuntimeError, e:
535                 raise RuntimeError, "Failed upload source file %r: %s %s" \
536                         % (sources, e.args[0], e.args[1],)
537             
538         buildscript = cStringIO.StringIO()
539         
540         buildscript.write("(\n")
541         
542         if self.buildDepends:
543             # Install build dependencies
544             buildscript.write(
545                 "sudo -S yum -y install %(packages)s\n" % {
546                     'packages' : self.buildDepends
547                 }
548             )
549         
550             
551         if self.build:
552             # Build sources
553             buildscript.write(
554                 "mkdir -p build && ( cd build && ( %(command)s ) )\n" % {
555                     'command' : self._replace_paths(self.build),
556                     'home' : server.shell_escape(self.home_path),
557                 }
558             )
559         
560             # Make archive
561             buildscript.write("tar czf build.tar.gz build\n")
562         
563         # Write token
564         buildscript.write("echo %(master_token)s > build.token ) ; echo %(master_token)s > build.token.retcode" % {
565             'master_token' : server.shell_escape(self._master_token)
566         })
567         
568         buildscript.seek(0)
569
570         return buildscript
571
572     def _do_install(self):
573         if self.install:
574             self._logger.info("Installing %s at %s", self, self.node.hostname)
575            
576             # Install application
577             try:
578                 self._popen_ssh_command(
579                     "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/{install,build}log >&2 && false )" % \
580                         {
581                         'command' : self._replace_paths(self.install),
582                         'home' : server.shell_escape(self.home_path),
583                         },
584                     )
585             except RuntimeError, e:
586                 if self.check_bad_host(e.args[0], e.args[1]):
587                     self.node.blacklist()
588                 raise RuntimeError, "Failed install build sources: %s %s" % (e.args[0], e.args[1],)
589
590     def set_master(self, master):
591         self._master = master
592         
593     def install_keys(self, prk, puk, passphrase):
594         # Install keys
595         self._master_passphrase = passphrase
596         self._master_prk = prk
597         self._master_puk = puk
598         self._master_prk_name = os.path.basename(prk.name)
599         self._master_puk_name = os.path.basename(puk.name)
600         
601     def _do_install_keys(self):
602         prk = self._master_prk
603         puk = self._master_puk
604        
605         try:
606             self._popen_scp(
607                 [ prk.name, puk.name ],
608                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path )
609                 )
610         except RuntimeError, e:
611             raise RuntimeError, "Failed to set up application deployment keys: %s %s" \
612                     % (e.args[0], e.args[1],)
613
614         try:
615             self._popen_scp(
616                 cStringIO.StringIO('%s,%s %s\n' % (
617                     self._master.node.hostname, self._master.node.hostip, 
618                     self._master.node.server_key)),
619                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
620                     os.path.join(self.home_path,"master_known_hosts") )
621                 )
622         except RuntimeError, e:
623             raise RuntimeError, "Failed to set up application deployment keys: %s %s" \
624                     % (e.args[0], e.args[1],)
625         
626     
627     def cleanup(self):
628         # make sure there's no leftover build processes
629         self._do_kill_build()
630         
631         # No longer need'em
632         self._master_prk = None
633         self._master_puk = None
634
635     @server.eintr_retry
636     def _popen_scp(self, src, dst, retry = 3):
637         while 1:
638             try:
639                 (out,err),proc = server.popen_scp(
640                     src,
641                     dst, 
642                     port = None,
643                     agent = None,
644                     ident_key = self.node.ident_path,
645                     server_key = self.node.server_key
646                     )
647
648                 if server.eintr_retry(proc.wait)():
649                     raise RuntimeError, (out, err)
650                 return (out, err), proc
651             except:
652                 if retry <= 0:
653                     raise
654                 else:
655                     retry -= 1
656   
657
658     @server.eintr_retry
659     def _popen_ssh_command(self, command, retry = 0, noerrors=False, timeout=None):
660         (out,err),proc = server.popen_ssh_command(
661             command,
662             host = self.node.hostname,
663             port = None,
664             user = self.node.slicename,
665             agent = None,
666             ident_key = self.node.ident_path,
667             server_key = self.node.server_key,
668             timeout = timeout,
669             retry = retry
670             )
671
672         if server.eintr_retry(proc.wait)():
673             if not noerrors:
674                 raise RuntimeError, (out, err)
675         return (out, err), proc
676
677 class Application(Dependency):
678     """
679     An application also has dependencies, but also a command to be ran and monitored.
680     
681     It adds the output of that command as traces.
682     """
683     
684     TRACES = ('stdout','stderr','buildlog', 'output')
685     
686     def __init__(self, api=None):
687         super(Application,self).__init__(api)
688         
689         # Attributes
690         self.command = None
691         self.sudo = False
692         
693         self.stdin = None
694         self.stdout = None
695         self.stderr = None
696         self.output = None
697         
698         # Those are filled when the app is started
699         #   Having both pid and ppid makes it harder
700         #   for pid rollover to induce tracking mistakes
701         self._started = False
702         self._pid = None
703         self._ppid = None
704
705         # Do not add to the python path of nodes
706         self.add_to_path = False
707     
708     def __str__(self):
709         return "%s<command:%s%s>" % (
710             self.__class__.__name__,
711             "sudo " if self.sudo else "",
712             self.command,
713         )
714     
715     def start(self):
716         self._logger.info("Starting %s", self)
717         
718         # Create shell script with the command
719         # This way, complex commands and scripts can be ran seamlessly
720         # sync files
721         command = cStringIO.StringIO()
722         command.write('export PYTHONPATH=$PYTHONPATH:%s\n' % (
723             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
724         ))
725         command.write('export PATH=$PATH:%s\n' % (
726             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
727         ))
728         if self.node.env:
729             for envkey, envvals in self.node.env.iteritems():
730                 for envval in envvals:
731                     command.write('export %s=%s\n' % (envkey, envval))
732         command.write(self.command)
733         command.seek(0)
734
735         try:
736             self._popen_scp(
737                 command,
738                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
739                     os.path.join(self.home_path, "app.sh"))
740                 )
741         except RuntimeError, e:
742             raise RuntimeError, "Failed to set up application: %s %s" \
743                     % (e.args[0], e.args[1],)
744         
745         # Start process in a "daemonized" way, using nohup and heavy
746         # stdin/out redirection to avoid connection issues
747         (out,err),proc = rspawn.remote_spawn(
748             self._replace_paths("bash ./app.sh"),
749             
750             pidfile = './pid',
751             home = self.home_path,
752             stdin = 'stdin' if self.stdin is not None else '/dev/null',
753             stdout = 'stdout' if self.stdout else '/dev/null',
754             stderr = 'stderr' if self.stderr else '/dev/null',
755             sudo = self.sudo,
756             
757             host = self.node.hostname,
758             port = None,
759             user = self.node.slicename,
760             agent = None,
761             ident_key = self.node.ident_path,
762             server_key = self.node.server_key
763             )
764         
765         if proc.wait():
766             if self.check_bad_host(out, err):
767                 self.node.blacklist()
768             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
769
770         self._started = True
771     
772     def recover(self):
773         # Assuming the application is running on PlanetLab,
774         # proper pidfiles should be present at the app's home path.
775         # So we mark this application as started, and check the pidfiles
776         self._started = True
777         self.checkpid()
778
779     def checkpid(self):            
780         # Get PID/PPID
781         # NOTE: wait a bit for the pidfile to be created
782         if self._started and not self._pid or not self._ppid:
783             pidtuple = rspawn.remote_check_pid(
784                 os.path.join(self.home_path,'pid'),
785                 host = self.node.hostname,
786                 port = None,
787                 user = self.node.slicename,
788                 agent = None,
789                 ident_key = self.node.ident_path,
790                 server_key = self.node.server_key
791                 )
792             
793             if pidtuple:
794                 self._pid, self._ppid = pidtuple
795     
796     def status(self):
797         self.checkpid()
798         if not self._started:
799             return AS.STATUS_NOT_STARTED
800         elif not self._pid or not self._ppid:
801             return AS.STATUS_NOT_STARTED
802         else:
803             status = rspawn.remote_status(
804                 self._pid, self._ppid,
805                 host = self.node.hostname,
806                 port = None,
807                 user = self.node.slicename,
808                 agent = None,
809                 ident_key = self.node.ident_path,
810                 server_key = self.node.server_key
811                 )
812             
813             if status is rspawn.NOT_STARTED:
814                 return AS.STATUS_NOT_STARTED
815             elif status is rspawn.RUNNING:
816                 return AS.STATUS_RUNNING
817             elif status is rspawn.FINISHED:
818                 return AS.STATUS_FINISHED
819             else:
820                 # WTF?
821                 return AS.STATUS_NOT_STARTED
822     
823     def kill(self):
824         status = self.status()
825         if status == AS.STATUS_RUNNING:
826             # kill by ppid+pid - SIGTERM first, then try SIGKILL
827             rspawn.remote_kill(
828                 self._pid, self._ppid,
829                 host = self.node.hostname,
830                 port = None,
831                 user = self.node.slicename,
832                 agent = None,
833                 ident_key = self.node.ident_path,
834                 server_key = self.node.server_key,
835                 sudo = self.sudo
836                 )
837             self._logger.info("Killed %s", self)
838
839
840 class NepiDependency(Dependency):
841     """
842     This dependency adds nepi itself to the python path,
843     so that you may run testbeds within PL nodes.
844     """
845     
846     # Class attribute holding a *weak* reference to the shared NEPI tar file
847     # so that they may share it. Don't operate on the file itself, it would
848     # be a mess, just use its path.
849     _shared_nepi_tar = None
850     
851     def __init__(self, api = None):
852         super(NepiDependency, self).__init__(api)
853         
854         self._tarball = None
855         
856         self.depends = 'python python-ipaddr python-setuptools'
857         
858         # our sources are in our ad-hoc tarball
859         self.sources = self.tarball.name
860         
861         tarname = os.path.basename(self.tarball.name)
862         
863         # it's already built - just move the tarball into place
864         self.build = "mv -f ${SOURCES}/%s ." % (tarname,)
865         
866         # unpack it into sources, and we're done
867         self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,)
868     
869     @property
870     def tarball(self):
871         if self._tarball is None:
872             shared_tar = self._shared_nepi_tar and self._shared_nepi_tar()
873             if shared_tar is not None:
874                 self._tarball = shared_tar
875             else:
876                 # Build an ad-hoc tarball
877                 # Prebuilt
878                 import nepi
879                 import tempfile
880                 
881                 shared_tar = tempfile.NamedTemporaryFile(prefix='nepi-src-', suffix='.tar.gz')
882                 
883                 proc = subprocess.Popen(
884                     ["tar", "czf", shared_tar.name, 
885                         '-C', os.path.join(os.path.dirname(os.path.dirname(nepi.__file__)),'.'), 
886                         'nepi'],
887                     stdout = open("/dev/null","w"),
888                     stdin = open("/dev/null","r"))
889
890                 if proc.wait():
891                     raise RuntimeError, "Failed to create nepi tarball"
892                 
893                 self._tarball = self._shared_nepi_tar = shared_tar
894                 
895         return self._tarball
896
897 class NS3Dependency(Dependency):
898     """
899     This dependency adds NS3 libraries to the library paths,
900     so that you may run the NS3 testbed within PL nodes.
901     
902     You'll also need the NepiDependency.
903     """
904     
905     def __init__(self, api = None):
906         super(NS3Dependency, self).__init__(api)
907         
908         self.buildDepends = 'make waf gcc gcc-c++ gccxml unzip bzr'
909         
910         # We have to download the sources, untar, build...
911         #pygccxml_source_url = "http://leaseweb.dl.sourceforge.net/project/pygccxml/pygccxml/pygccxml-1.0/pygccxml-1.0.0.zip"
912         pygccxml_source_url = "http://yans.pl.sophia.inria.fr/libs/pygccxml-1.0.0.zip"
913         ns3_source_url = "http://nepi.inria.fr/code/nepi-ns3.13/archive/tip.tar.gz"
914         passfd_source_url = "http://nepi.inria.fr/code/python-passfd/archive/tip.tar.gz"
915         
916         pybindgen_version = "797"
917
918         self.build =(
919             " ( "
920             "  cd .. && "
921             "  python -c 'import pygccxml, pybindgen, passfd' && "
922             "  test -f lib/ns/_core.so && "
923             "  test -f lib/ns/__init__.py && "
924             "  test -f lib/ns/core.py && "
925             "  test -f lib/libns3-core.so && "
926             "  LD_LIBRARY_PATH=lib PYTHONPATH=lib python -c 'import ns.core' "
927             " ) || ( "
928                 # Not working, rebuild
929                      # Archive SHA1 sums to check
930                      "echo '7158877faff2254e6c094bf18e6b4283cac19137  pygccxml-1.0.0.zip' > archive_sums.txt && "
931                      " ( " # check existing files
932                      " sha1sum -c archive_sums.txt && "
933                      " test -f passfd-src.tar.gz && "
934                      " test -f ns3-src.tar.gz "
935                      " ) || ( " # nope? re-download
936                      " rm -rf pybindgen pygccxml-1.0.0.zip passfd-src.tar.gz ns3-src.tar.gz && "
937                      " bzr checkout lp:pybindgen -r %(pybindgen_version)s && " # continue, to exploit the case when it has already been dl'ed
938                      " wget -q -c -O pygccxml-1.0.0.zip %(pygccxml_source_url)s && " 
939                      " wget -q -c -O passfd-src.tar.gz %(passfd_source_url)s && "
940                      " wget -q -c -O ns3-src.tar.gz %(ns3_source_url)s && "  
941                      " sha1sum -c archive_sums.txt " # Check SHA1 sums when applicable
942                      " ) && "
943                      "unzip -n pygccxml-1.0.0.zip && "
944                      "mkdir -p ns3-src && "
945                      "mkdir -p passfd-src && "
946                      "tar xzf ns3-src.tar.gz --strip-components=1 -C ns3-src && "
947                      "tar xzf passfd-src.tar.gz --strip-components=1 -C passfd-src && "
948                      "rm -rf target && "    # mv doesn't like unclean targets
949                      "mkdir -p target && "
950                      "cd pygccxml-1.0.0 && "
951                      "rm -rf unittests docs && " # pygccxml has ~100M of unit tests - excessive - docs aren't needed either
952                      "python setup.py build && "
953                      "python setup.py install --install-lib ${BUILD}/target && "
954                      "python setup.py clean && "
955                      "cd ../pybindgen && "
956                      "export PYTHONPATH=$PYTHONPATH:${BUILD}/target && "
957                      "./waf configure --prefix=${BUILD}/target -d release && "
958                      "./waf && "
959                      "./waf install && "
960                      "./waf clean && "
961                      "mv -f ${BUILD}/target/lib/python*/site-packages/pybindgen ${BUILD}/target/. && "
962                      "rm -rf ${BUILD}/target/lib && "
963                      "cd ../passfd-src && "
964                      "python setup.py build && "
965                      "python setup.py install --install-lib ${BUILD}/target && "
966                      "python setup.py clean && "
967                      "cd ../ns3-src && "
968                      "./waf configure --prefix=${BUILD}/target --with-pybindgen=../pybindgen-src -d release --disable-examples --disable-tests && "
969                      "./waf &&"
970                      "./waf install && "
971                      "rm -f ${BUILD}/target/lib/*.so && "
972                      "cp -a ${BUILD}/ns3-src/build/libns3*.so ${BUILD}/target/lib && "
973                      "cp -a ${BUILD}/ns3-src/build/bindings/python/ns ${BUILD}/target/lib &&"
974                      "./waf clean "
975              " )"
976                      % dict(
977                         pybindgen_version = server.shell_escape(pybindgen_version),
978                         pygccxml_source_url = server.shell_escape(pygccxml_source_url),
979                         ns3_source_url = server.shell_escape(ns3_source_url),
980                         passfd_source_url = server.shell_escape(passfd_source_url),
981                      ))
982         
983         # Just move ${BUILD}/target
984         self.install = (
985             " ( "
986             "  cd .. && "
987             "  python -c 'import pygccxml, pybindgen, passfd' && "
988             "  test -f lib/ns/_core.so && "
989             "  test -f lib/ns/__init__.py && "
990             "  test -f lib/ns/core.py && "
991             "  test -f lib/libns3-core.so && "
992             "  LD_LIBRARY_PATH=lib PYTHONPATH=lib python -c 'import ns.core' "
993             " ) || ( "
994                 # Not working, reinstall
995                     "test -d ${BUILD}/target && "
996                     "[[ \"x\" != \"x$(find ${BUILD}/target -mindepth 1 -print -quit)\" ]] &&"
997                     "( for i in ${BUILD}/target/* ; do rm -rf ${SOURCES}/${i##*/} ; done ) && " # mv doesn't like unclean targets
998                     "mv -f ${BUILD}/target/* ${SOURCES}"
999             " )"
1000         )
1001         
1002         # Set extra environment paths
1003         self.env['NEPI_NS3BINDINGS'] = "${SOURCES}/lib"
1004         self.env['NEPI_NS3LIBRARY'] = "${SOURCES}/lib"
1005     
1006     @property
1007     def tarball(self):
1008         if self._tarball is None:
1009             shared_tar = self._shared_nepi_tar and self._shared_nepi_tar()
1010             if shared_tar is not None:
1011                 self._tarball = shared_tar
1012             else:
1013                 # Build an ad-hoc tarball
1014                 # Prebuilt
1015                 import nepi
1016                 import tempfile
1017                 
1018                 shared_tar = tempfile.NamedTemporaryFile(prefix='nepi-src-', suffix='.tar.gz')
1019                 
1020                 proc = subprocess.Popen(
1021                     ["tar", "czf", shared_tar.name, 
1022                         '-C', os.path.join(os.path.dirname(os.path.dirname(nepi.__file__)),'.'), 
1023                         'nepi'],
1024                     stdout = open("/dev/null","w"),
1025                     stdin = open("/dev/null","r"))
1026
1027                 if proc.wait():
1028                     raise RuntimeError, "Failed to create nepi tarball"
1029                 
1030                 self._tarball = self._shared_nepi_tar = shared_tar
1031                 
1032         return self._tarball
1033
1034 class YumDependency(Dependency):
1035     """
1036     This dependency is an internal helper class used to
1037     efficiently distribute yum-downloaded rpms.
1038     
1039     It temporarily sets the yum cache as persistent in the
1040     build master, and installs all the required packages.
1041     
1042     The rpm packages left in the yum cache are gathered and
1043     distributed by the underlying Dependency in an efficient
1044     manner. Build slaves will then install those rpms back in
1045     the cache before issuing the install command.
1046     
1047     When packages have been installed already, nothing but an
1048     empty tar is distributed.
1049     """
1050     
1051     # Class attribute holding a *weak* reference to the shared NEPI tar file
1052     # so that they may share it. Don't operate on the file itself, it would
1053     # be a mess, just use its path.
1054     _shared_nepi_tar = None
1055     
1056     def _build_get(self):
1057         # canonical representation of dependencies
1058         depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
1059         
1060         # download rpms and pack into a tar archive
1061         return (
1062             "sudo -S nice yum -y makecache && "
1063             "sudo -S sed -i -r 's/keepcache *= *0/keepcache=1/' /etc/yum.conf && "
1064             " ( ( "
1065                 "sudo -S nice yum -y install %s ; "
1066                 "rm -f ${BUILD}/packages.tar ; "
1067                 "tar -C /var/cache/yum -rf ${BUILD}/packages.tar $(cd /var/cache/yum ; find -iname '*.rpm')"
1068             " ) || /bin/true ) && "
1069             "sudo -S sed -i -r 's/keepcache *= *1/keepcache=0/' /etc/yum.conf && "
1070             "( sudo -S nice yum -y clean packages || /bin/true ) "
1071         ) % ( depends, )
1072     def _build_set(self, value):
1073         # ignore
1074         return
1075     build = property(_build_get, _build_set)
1076     
1077     def _install_get(self):
1078         # canonical representation of dependencies
1079         depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
1080         
1081         # unpack cached rpms into yum cache, install, and cleanup
1082         return (
1083             "sudo -S tar -k --keep-newer-files -C /var/cache/yum -xf packages.tar && "
1084             "sudo -S nice yum -y install %s && "
1085             "( sudo -S nice yum -y clean packages || /bin/true ) "
1086         ) % ( depends, )
1087     def _install_set(self, value):
1088         # ignore
1089         return
1090     install = property(_install_get, _install_set)
1091         
1092     def check_bad_host(self, out, err):
1093         badre = re.compile(r'(?:'
1094                            r'The GPG keys listed for the ".*" repository are already installed but they are not correct for this package'
1095                            r'|Error: Cannot retrieve repository metadata (repomd.xml) for repository: .*[.] Please verify its path and try again'
1096                            r'|Error: disk I/O error'
1097                            r'|MASTER NODE UNREACHABLE'
1098                            r')', 
1099                            re.I)
1100         return badre.search(out) or badre.search(err) or self.node.check_bad_host(out,err)
1101
1102
1103 class CCNxDaemon(Application):
1104     """
1105     An application also has dependencies, but also a command to be ran and monitored.
1106     
1107     It adds the output of that command as traces.
1108     """
1109     
1110     def __init__(self, api=None):
1111         super(CCNxDaemon,self).__init__(api)
1112         
1113         # Attributes
1114         self.ccnLocalPort = None
1115         self.ccnRoutes = None
1116         self.ccnxVersion = "ccnx-0.6.0"
1117         
1118         #self.ccnx_0_5_1_sources = "http://www.ccnx.org/releases/ccnx-0.5.1.tar.gz"
1119         self.ccnx_0_5_1_sources = "http://yans.pl.sophia.inria.fr/libs/ccnx-0.5.1.tar.gz"
1120         #self.ccnx_0_6_0_sources = "http://www.ccnx.org/releases/ccnx-0.6.0.tar.gz"
1121         self.ccnx_0_6_0_sources = "http://yans.pl.sophia.inria.fr/libs/ccnx-0.6.0.tar.gz"
1122         self.buildDepends = 'make gcc development-tools openssl-devel expat-devel libpcap-devel libxml2-devel'
1123
1124         self.ccnx_0_5_1_build = (
1125             " ( "
1126             "  cd .. && "
1127             "  test -d ccnx-0.5.1-src/build/bin "
1128             " ) || ( "
1129                 # Not working, rebuild
1130                 "("
1131                      " mkdir -p ccnx-0.5.1-src && "
1132                      " wget -q -c -O ccnx-0.5.1-src.tar.gz %(ccnx_source_url)s &&"
1133                      " tar xf ccnx-0.5.1-src.tar.gz --strip-components=1 -C ccnx-0.5.1-src "
1134                 ") && "
1135                      "cd ccnx-0.5.1-src && "
1136                      "mkdir -p build/include &&"
1137                      "mkdir -p build/lib &&"
1138                      "mkdir -p build/bin &&"
1139                      "I=$PWD/build && "
1140                      "INSTALL_BASE=$I ./configure &&"
1141                      "make && make install"
1142              " )") % dict(
1143                      ccnx_source_url = server.shell_escape(self.ccnx_0_5_1_sources),
1144                 )
1145
1146         self.ccnx_0_5_1_install = (
1147             " ( "
1148             "  test -d ${BUILD}/ccnx-0.5.1-src/build/bin && "
1149             "  cp -r ${BUILD}/ccnx-0.5.1-src/build/bin ${SOURCES}"
1150             " )"
1151         )
1152
1153         self.ccnx_0_6_0_build = (
1154             " ( "
1155             "  cd .. && "
1156             "  test -d ccnx-0.6.0-src/build/bin "
1157             " ) || ( "
1158                 # Not working, rebuild
1159                 "("
1160                      " mkdir -p ccnx-0.6.0-src && "
1161                      " wget -q -c -O ccnx-0.6.0-src.tar.gz %(ccnx_source_url)s &&"
1162                      " tar xf ccnx-0.6.0-src.tar.gz --strip-components=1 -C ccnx-0.6.0-src "
1163                 ") && "
1164                      "cd ccnx-0.6.0-src && "
1165                      "./configure && make"
1166              " )") % dict(
1167                      ccnx_source_url = server.shell_escape(self.ccnx_0_6_0_sources),
1168                 )
1169
1170         self.ccnx_0_6_0_install = (
1171             " ( "
1172             "  test -d ${BUILD}/ccnx-0.6.0-src/bin && "
1173             "  cp -r ${BUILD}/ccnx-0.6.0-src/bin ${SOURCES}"
1174             " )"
1175         )
1176
1177         self.env['PATH'] = "$PATH:${SOURCES}/bin"
1178
1179     def setup(self):
1180         # setting ccn sources
1181         if not self.build:
1182             if self.ccnxVersion == 'ccnx-0.6.0':
1183                 self.build = self.ccnx_0_6_0_build
1184             elif self.ccnxVersion == 'ccnx-0.5.1':
1185                 self.build = self.ccnx_0_5_1_build
1186
1187         if not self.install:
1188             if self.ccnxVersion == 'ccnx-0.6.0':
1189                 self.install = self.ccnx_0_6_0_install
1190             elif self.ccnxVersion == 'ccnx-0.5.1':
1191                 self.install = self.ccnx_0_5_1_install
1192
1193         super(CCNxDaemon, self).setup()
1194
1195     def start(self):
1196         self.command = ""
1197         if self.ccnLocalPort:
1198             self.command = "export CCN_LOCAL_PORT=%s ; " % self.ccnLocalPort
1199         self.command += " ccndstart "
1200
1201         # configure ccn routes
1202         if self.ccnRoutes:
1203             routes = self.ccnRoutes.split("|")
1204             
1205             if self.ccnLocalPort:
1206                 routes = map(lambda route: "%s %s" %(route, 
1207                     self.ccnLocalPort) if _ccnre.match(route) else route, 
1208                         routes)
1209
1210             routes = map(lambda route: "ccndc add ccnx:/ %s" % route, 
1211                 routes)
1212
1213             routescmd = " ; ".join(routes)
1214             self.command += " ; "
1215             self.command += routescmd
1216
1217         # Start will be invoked in prestart step
1218         super(CCNxDaemon, self).start()
1219             
1220     def kill(self):
1221         self._logger.info("Killing %s", self)
1222
1223         command = "${SOURCES}/bin/ccndstop"
1224
1225         if self.ccnLocalPort:
1226             self.command = "export CCN_LOCAL_PORT=%s; %s" % (self.ccnLocalPort, command)
1227
1228         cmd = self._replace_paths(command)
1229         command = cStringIO.StringIO()
1230         command.write(cmd)
1231         command.seek(0)
1232
1233         try:
1234             self._popen_scp(
1235                 command,
1236                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
1237                     os.path.join(self.home_path, "kill.sh"))
1238                 )
1239         except RuntimeError, e:
1240             raise RuntimeError, "Failed to kill ccndxdaemon: %s %s" \
1241                     % (e.args[0], e.args[1],)
1242         
1243
1244         script = "bash ./kill.sh"
1245         (out,err),proc = rspawn.remote_spawn(
1246             script,
1247             pidfile = 'kill-pid',
1248             home = self.home_path,
1249             stdin = '/dev/null',
1250             stdout = 'killlog',
1251             stderr = rspawn.STDOUT,
1252             
1253             host = self.node.hostname,
1254             port = None,
1255             user = self.node.slicename,
1256             agent = None,
1257             ident_key = self.node.ident_path,
1258             server_key = self.node.server_key,
1259             hostip = self.node.hostip,
1260             )
1261         
1262         if proc.wait():
1263             raise RuntimeError, "Failed to kill cnnxdaemon: %s %s" % (out,err,)
1264         
1265         super(CCNxDaemon, self).kill()
1266