Updated ccnx allowed versions to 0.6.0 and 0.7.1 for PlanetLab CCNxDaemon
[nepi.git] / src / nepi / testbeds / planetlab / application.py
1 # -*- coding: utf-8 -*-
2
3 from constants import TESTBED_ID
4 import plcapi
5 import operator
6 import os
7 import os.path
8 import sys
9 import nepi.util.server as server
10 import cStringIO
11 import subprocess
12 import rspawn
13 import random
14 import time
15 import socket
16 import threading
17 import logging
18 import re
19
20 from nepi.util.constants import ApplicationStatus as AS
21
22 _ccnre = re.compile("\s*(udp|tcp)\s+(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\s*$")
23
24 class Dependency(object):
25     """
26     A Dependency is in every respect like an application.
27     
28     It depends on some packages, it may require building binaries, it must deploy
29     them...
30     
31     But it has no command. Dependencies aren't ever started, or stopped, and have
32     no status.
33     """
34
35     TRACES = ('buildlog')
36
37     def __init__(self, api=None):
38         if not api:
39             api = plcapi.PLCAPI()
40         self._api = api
41         
42         # Attributes
43         self.command = None
44         self.sudo = False
45         
46         self.build = None
47         self.install = None
48         self.depends = None
49         self.buildDepends = None
50         self.sources = None
51         self.rpmFusion = False
52         self.env = {}
53         
54         self.stdin = None
55         self.stdout = None
56         self.stderr = None
57         self.buildlog = None
58         
59         self.add_to_path = True
60         
61         # Those are filled when the app is configured
62         self.home_path = None
63         
64         # Those are filled when an actual node is connected
65         self.node = None
66         
67         # Those are filled when the app is started
68         #   Having both pid and ppid makes it harder
69         #   for pid rollover to induce tracking mistakes
70         self._started = False
71         self._setup = False
72         self._setuper = None
73         self._pid = None
74         self._ppid = None
75
76         # Spanning tree deployment
77         self._master = None
78         self._master_passphrase = None
79         self._master_prk = None
80         self._master_puk = None
81         self._master_token = os.urandom(8).encode("hex")
82         self._build_pid = None
83         self._build_ppid = None
84         
85         # Logging
86         self._logger = logging.getLogger('nepi.testbeds.planetlab')
87         
88     
89     def __str__(self):
90         return "%s<%s>" % (
91             self.__class__.__name__,
92             ' '.join(filter(bool,(self.depends, self.sources)))
93         )
94    
95     def deployed(self):
96         return self._setup
97
98     def validate(self):
99         if self.home_path is None:
100             raise AssertionError, "Misconfigured application: missing home path"
101         if self.node.ident_path is None or not os.access(self.node.ident_path, os.R_OK):
102             raise AssertionError, "Misconfigured application: missing slice SSH key"
103         if self.node is None:
104             raise AssertionError, "Misconfigured application: unconnected node"
105         if self.node.hostname is None:
106             raise AssertionError, "Misconfigured application: misconfigured node"
107         if self.node.slicename is None:
108             raise AssertionError, "Misconfigured application: unspecified slice"
109     
110     def check_bad_host(self, out, err):
111         """
112         Called whenever an operation fails, it's given the output to be checked for
113         telltale signs of unhealthy hosts.
114         """
115         return False
116     
117     def remote_trace_path(self, whichtrace):
118         if whichtrace in self.TRACES:
119             tracefile = os.path.join(self.home_path, whichtrace)
120         else:
121             tracefile = None
122         
123         return tracefile
124
125     def remote_trace_name(self, whichtrace):
126         if whichtrace in self.TRACES:
127             return whichtrace
128         return None
129
130     def sync_trace(self, local_dir, whichtrace):
131         tracefile = self.remote_trace_path(whichtrace)
132         if not tracefile:
133             return None
134         
135         local_path = os.path.join(local_dir, tracefile)
136         
137         # create parent local folders
138         proc = subprocess.Popen(
139             ["mkdir", "-p", os.path.dirname(local_path)],
140             stdout = open("/dev/null","w"),
141             stdin = open("/dev/null","r"))
142
143         if proc.wait():
144             raise RuntimeError, "Failed to synchronize trace"
145         
146         # sync files
147         try:
148             self._popen_scp(
149                 '%s@%s:%s' % (self.node.slicename, self.node.hostname,
150                     tracefile),
151                 local_path
152                 )
153         except RuntimeError, e:
154             raise RuntimeError, "Failed to synchronize trace: %s %s" \
155                     % (e.args[0], e.args[1],)
156         
157         return local_path
158     
159     def recover(self):
160         # We assume a correct deployment, so recovery only
161         # means we mark this dependency as deployed
162         self._setup = True
163
164     def setup(self):
165         self._logger.info("Setting up %s", self)
166         self._make_home()
167         self._launch_build()
168         self._finish_build()
169         self._setup = True
170     
171     def async_setup(self):
172         if not self._setuper:
173             def setuper():
174                 try:
175                     self.setup()
176                 except:
177                     self._setuper._exc.append(sys.exc_info())
178             self._setuper = threading.Thread(
179                 target = setuper)
180             self._setuper._exc = []
181             self._setuper.start()
182     
183     def async_setup_wait(self):
184         if not self._setup:
185             self._logger.info("Waiting for %s to be setup", self)
186             if self._setuper:
187                 self._setuper.join()
188                 if not self._setup:
189                     if self._setuper._exc:
190                         exctyp,exval,exctrace = self._setuper._exc[0]
191                         raise exctyp,exval,exctrace
192                     else:
193                         raise RuntimeError, "Failed to setup application"
194                 else:
195                     self._logger.info("Setup ready: %s at %s", self, self.node.hostname)
196             else:
197                 self.setup()
198         
199     def _make_home(self):
200         # Make sure all the paths are created where 
201         # they have to be created for deployment
202         # sync files
203         try:
204             self._popen_ssh_command(
205                 "mkdir -p %(home)s && ( rm -f %(home)s/{pid,build-pid,nepi-build.sh} >/dev/null 2>&1 || /bin/true )" \
206                     % { 'home' : server.shell_escape(self.home_path) },
207                 timeout = 120,
208                 retry = 3
209                 )
210         except RuntimeError, e:
211             raise RuntimeError, "Failed to set up application %s: %s %s" % (self.home_path, e.args[0], e.args[1],)
212         
213         if self.stdin:
214             stdin = self.stdin
215             if not os.path.isfile(stdin):
216                 stdin = cStringIO.StringIO(self.stdin)
217
218             # Write program input
219             try:
220                 self._popen_scp(stdin,
221                     '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
222                         os.path.join(self.home_path, 'stdin') ),
223                     )
224             except RuntimeError, e:
225                 raise RuntimeError, "Failed to set up application %s: %s %s" \
226                         % (self.home_path, e.args[0], e.args[1],)
227
228     def _replace_paths(self, command):
229         """
230         Replace all special path tags with shell-escaped actual paths.
231         """
232         # need to append ${HOME} if paths aren't absolute, to MAKE them absolute.
233         root = '' if self.home_path.startswith('/') else "${HOME}/"
234         return ( command
235             .replace("${SOURCES}", root+server.shell_escape(self.home_path))
236             .replace("${BUILD}", root+server.shell_escape(os.path.join(self.home_path,'build'))) )
237
238     def _launch_build(self, trial=0):
239         if self._master is not None:
240             if not trial or self._master_prk is not None:
241                 self._do_install_keys()
242             buildscript = self._do_build_slave()
243         else:
244             buildscript = self._do_build_master()
245             
246         if buildscript is not None:
247             self._logger.info("Building %s at %s", self, self.node.hostname)
248             
249             # upload build script
250             try:
251                 self._popen_scp(
252                     buildscript,
253                     '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
254                         os.path.join(self.home_path, 'nepi-build.sh') )
255                     )
256             except RuntimeError, e:
257                 raise RuntimeError, "Failed to set up application %s: %s %s" \
258                         % (self.home_path, e.args[0], e.args[1],)
259             
260             # launch build
261             self._do_launch_build()
262     
263     def _finish_build(self):
264         self._do_wait_build()
265         self._do_install()
266
267     def _do_build_slave(self):
268         if not self.sources and not self.build:
269             return None
270             
271         # Create build script
272         files = set()
273         
274         if self.sources:
275             sources = self.sources.split(' ')
276             files.update(
277                 "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostip, 
278                     os.path.join(self._master.home_path, os.path.basename(source)),)
279                 for source in sources
280             )
281         
282         if self.build:
283             files.add(
284                 "%s@%s:%s" % (self._master.node.slicename, self._master.node.hostip, 
285                     os.path.join(self._master.home_path, 'build.tar.gz'),)
286             )
287         
288         sshopts = "-o ConnectTimeout=30 -o ConnectionAttempts=3 -o ServerAliveInterval=30 -o TCPKeepAlive=yes"
289         
290         launch_agent = "{ ( echo -e '#!/bin/sh\\ncat' > .ssh-askpass ) && chmod u+x .ssh-askpass"\
291                         " && export SSH_ASKPASS=$(pwd)/.ssh-askpass "\
292                         " && ssh-agent > .ssh-agent.sh ; } && . ./.ssh-agent.sh && ( echo $NEPI_MASTER_PASSPHRASE | ssh-add %(prk)s ) && rm -rf %(prk)s %(puk)s" %  \
293         {
294             'prk' : server.shell_escape(self._master_prk_name),
295             'puk' : server.shell_escape(self._master_puk_name),
296         }
297         
298         kill_agent = "kill $SSH_AGENT_PID"
299         
300         waitmaster = (
301             "{ "
302             "echo 'Checking master reachability' ; "
303             "if ping -c 3 %(master_host)s && (. ./.ssh-agent.sh > /dev/null ; ssh -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s echo MASTER SAYS HI ) ; then "
304             "echo 'Master node reachable' ; "
305             "else "
306             "echo 'MASTER NODE UNREACHABLE' && "
307             "exit 1 ; "
308             "fi ; "
309             ". ./.ssh-agent.sh ; "
310             "while [[ $(. ./.ssh-agent.sh > /dev/null ; ssh -q -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s cat %(token_path)s.retcode || /bin/true) != %(token)s ]] ; do sleep 5 ; done ; "
311             "if [[ $(. ./.ssh-agent.sh > /dev/null ; ssh -q -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(master)s cat %(token_path)s || /bin/true) != %(token)s ]] ; then echo BAD TOKEN ; exit 1 ; fi ; "
312             "}" 
313         ) % {
314             'hostkey' : 'master_known_hosts',
315             'master' : "%s@%s" % (self._master.node.slicename, self._master.node.hostip),
316             'master_host' : self._master.node.hostip,
317             'token_path' : os.path.join(self._master.home_path, 'build.token'),
318             'token' : server.shell_escape(self._master._master_token),
319             'sshopts' : sshopts,
320         }
321         
322         syncfiles = ". ./.ssh-agent.sh && scp -p -o UserKnownHostsFile=%(hostkey)s %(sshopts)s %(files)s ." % {
323             'hostkey' : 'master_known_hosts',
324             'files' : ' '.join(files),
325             'sshopts' : sshopts,
326         }
327         if self.build:
328             syncfiles += " && tar xzf build.tar.gz"
329         syncfiles += " && ( echo %s > build.token )" % (server.shell_escape(self._master_token),)
330         syncfiles += " && ( echo %s > build.token.retcode )" % (server.shell_escape(self._master_token),)
331         syncfiles = "{ . ./.ssh-agent.sh ; %s ; }" % (syncfiles,)
332         
333         cleanup = "{ . ./.ssh-agent.sh ; kill $SSH_AGENT_PID ; rm -rf %(prk)s %(puk)s master_known_hosts .ssh-askpass ; }" % {
334             'prk' : server.shell_escape(self._master_prk_name),
335             'puk' : server.shell_escape(self._master_puk_name),
336         }
337         
338         slavescript = "( ( %(launch_agent)s && %(waitmaster)s && %(syncfiles)s && %(kill_agent)s && %(cleanup)s ) || %(cleanup)s ) ; echo %(token)s > build.token.retcode" % {
339             'waitmaster' : waitmaster,
340             'syncfiles' : syncfiles,
341             'cleanup' : cleanup,
342             'kill_agent' : kill_agent,
343             'launch_agent' : launch_agent,
344             'home' : server.shell_escape(self.home_path),
345             'token' : server.shell_escape(self._master_token),
346         }
347        
348         return cStringIO.StringIO(slavescript)
349          
350     def _do_launch_build(self):
351         script = "bash ./nepi-build.sh"
352         if self._master_passphrase:
353             script = "NEPI_MASTER_PASSPHRASE=%s %s" % (
354                 server.shell_escape(self._master_passphrase),
355                 script
356             )
357         (out,err),proc = rspawn.remote_spawn(
358             script,
359             pidfile = 'build-pid',
360             home = self.home_path,
361             stdin = '/dev/null',
362             stdout = 'buildlog',
363             stderr = rspawn.STDOUT,
364             
365             host = self.node.hostname,
366             port = None,
367             user = self.node.slicename,
368             agent = None,
369             ident_key = self.node.ident_path,
370             server_key = self.node.server_key,
371             hostip = self.node.hostip,
372             )
373         
374         if proc.wait():
375             if self.check_bad_host(out, err):
376                 self.node.blacklist()
377             raise RuntimeError, "Failed to set up build slave %s: %s %s" % (self.home_path, out,err,)
378         
379         
380         pid = ppid = None
381         delay = 1.0
382         for i in xrange(5):
383             pidtuple = rspawn.remote_check_pid(
384                 os.path.join(self.home_path,'build-pid'),
385                 host = self.node.hostip,
386                 port = None,
387                 user = self.node.slicename,
388                 agent = None,
389                 ident_key = self.node.ident_path,
390                 server_key = self.node.server_key,
391                 hostip = self.node.hostip
392                 )
393             
394             if pidtuple:
395                 pid, ppid = pidtuple
396                 self._build_pid, self._build_ppid = pidtuple
397                 break
398             else:
399                 time.sleep(delay)
400                 delay = min(30,delay*1.2)
401         else:
402             raise RuntimeError, "Failed to set up build slave %s: cannot get pid" % (self.home_path,)
403
404         self._logger.info("Deploying %s at %s", self, self.node.hostname)
405         
406     def _do_wait_build(self, trial=0):
407         pid = self._build_pid
408         ppid = self._build_ppid
409         
410         if pid and ppid:
411             delay = 1.0
412             first = True
413             bustspin = 0
414             while True:
415                 status = rspawn.remote_status(
416                     pid, ppid,
417                     host = self.node.hostname,
418                     port = None,
419                     user = self.node.slicename,
420                     agent = None,
421                     ident_key = self.node.ident_path,
422                     server_key = self.node.server_key,
423                     hostip = self.node.hostip
424                     )
425                 
426                 if status is rspawn.FINISHED:
427                     self._build_pid = self._build_ppid = None
428                     break
429                 elif status is not rspawn.RUNNING:
430                     self._logger.warn("Busted waiting for %s to finish building at %s %s", self, self.node.hostname,
431                             "(build slave)" if self._master is not None else "(build master)")
432                     bustspin += 1
433                     time.sleep(delay*(5.5+random.random()))
434                     if bustspin > 12:
435                         self._build_pid = self._build_ppid = None
436                         break
437                 else:
438                     if first:
439                         self._logger.info("Waiting for %s to finish building at %s %s", self, self.node.hostname,
440                             "(build slave)" if self._master is not None else "(build master)")
441                         
442                         first = False
443                     time.sleep(delay*(0.5+random.random()))
444                     delay = min(30,delay*1.2)
445                     bustspin = 0
446         
447             # check build token
448             slave_token = ""
449             for i in xrange(3):
450                 (out, err), proc = self._popen_ssh_command(
451                     "cat %(token_path)s" % {
452                         'token_path' : os.path.join(self.home_path, 'build.token'),
453                     },
454                     timeout = 120,
455                     noerrors = True)
456                 if not proc.wait() and out:
457                     slave_token = out.strip()
458                 
459                 if slave_token:
460                     break
461                 else:
462                     time.sleep(2)
463             
464             if slave_token != self._master_token:
465                 # Get buildlog for the error message
466
467                 (buildlog, err), proc = self._popen_ssh_command(
468                     "cat %(buildlog)s" % {
469                         'buildlog' : os.path.join(self.home_path, 'buildlog'),
470                         'buildscript' : os.path.join(self.home_path, 'nepi-build.sh'),
471                     },
472                     timeout = 120,
473                     noerrors = True)
474                 
475                 proc.wait()
476                 
477                 if self.check_bad_host(buildlog, err):
478                     self.node.blacklist()
479                 elif self._master and trial < 3 and 'BAD TOKEN' in buildlog or 'BAD TOKEN' in err:
480                     # bad sync with master, may try again
481                     # but first wait for master
482                     self._master.async_setup_wait()
483                     self._launch_build(trial+1)
484                     return self._do_wait_build(trial+1)
485                 elif trial < 3:
486                     return self._do_wait_build(trial+1)
487                 else:
488                     # No longer need'em
489                     self._master_prk = None
490                     self._master_puk = None
491         
492                     raise RuntimeError, "Failed to set up application %s: "\
493                             "build failed, got wrong token from pid %s/%s "\
494                             "(expected %r, got %r), see buildlog at %s:\n%s" % (
495                         self.home_path, pid, ppid, self._master_token, slave_token, self.node.hostname, buildlog)
496
497             # No longer need'em
498             self._master_prk = None
499             self._master_puk = None
500         
501             self._logger.info("Built %s at %s", self, self.node.hostname)
502
503     def _do_kill_build(self):
504         pid = self._build_pid
505         ppid = self._build_ppid
506         
507         if pid and ppid:
508             self._logger.info("Killing build of %s", self)
509             rspawn.remote_kill(
510                 pid, ppid,
511                 host = self.node.hostname,
512                 port = None,
513                 user = self.node.slicename,
514                 agent = None,
515                 ident_key = self.node.ident_path,
516                 hostip = self.node.hostip
517                 )
518         
519         
520     def _do_build_master(self):
521         if not self.sources and not self.build and not self.buildDepends:
522             return None
523             
524         if self.sources:
525             sources = self.sources.split(' ')
526
527             http_sources = list()
528             for source in list(sources):
529                 if source.startswith("http") or source.startswith("https"):
530                     http_sources.append(source)
531                     sources.remove(source)
532
533             # Download http sources
534             try:
535                 for source in http_sources:
536                     path = os.path.join(self.home_path, source.split("/")[-1])
537                     command = "wget -o %s %s" % (path, source)
538                     self._popen_ssh(command)
539             except RuntimeError, e:
540                 raise RuntimeError, "Failed wget source file %r: %s %s" \
541                         % (sources, e.args[0], e.args[1],)
542
543             # Copy all other sources
544             try:
545                 self._popen_scp(
546                     sources,
547                     "%s@%s:%s" % (self.node.slicename, self.node.hostname, 
548                         os.path.join(self.home_path,'.'),)
549                     )
550             except RuntimeError, e:
551                 raise RuntimeError, "Failed upload source file %r: %s %s" \
552                         % (sources, e.args[0], e.args[1],)
553             
554         buildscript = cStringIO.StringIO()
555         
556         buildscript.write("(\n")
557         
558         if self.buildDepends:
559             # Install build dependencies
560             buildscript.write(
561                 "sudo -S yum -y install %(packages)s\n" % {
562                     'packages' : self.buildDepends
563                 }
564             )
565         
566             
567         if self.build:
568             # Build sources
569             buildscript.write(
570                 "mkdir -p build && ( cd build && ( %(command)s ) )\n" % {
571                     'command' : self._replace_paths(self.build),
572                     'home' : server.shell_escape(self.home_path),
573                 }
574             )
575         
576             # Make archive
577             buildscript.write("tar czf build.tar.gz build\n")
578         
579         # Write token
580         buildscript.write("echo %(master_token)s > build.token ) ; echo %(master_token)s > build.token.retcode" % {
581             'master_token' : server.shell_escape(self._master_token)
582         })
583         
584         buildscript.seek(0)
585
586         return buildscript
587
588     def _do_install(self):
589         if self.install:
590             self._logger.info("Installing %s at %s", self, self.node.hostname)
591  
592             # Install application
593             try:
594                 command = "cd %(home)s && cd build && ( %(command)s ) > ${HOME}/%(home)s/installlog 2>&1 || ( tail ${HOME}/%(home)s/{install,build}log >&2 && false )" % \
595                     {
596                     'command' : self._replace_paths(self.install),
597                     'home' : server.shell_escape(self.home_path),
598                     }
599                 self._popen_ssh_command(command)
600             except RuntimeError, e:
601                 if self.check_bad_host(e.args[0], e.args[1]):
602                     self.node.blacklist()
603                 raise RuntimeError, "Failed install build sources on node %s: %s %s. COMMAND %s" % (
604                         self.node.hostname, e.args[0], e.args[1], command)
605
606     def set_master(self, master):
607         self._master = master
608         
609     def install_keys(self, prk, puk, passphrase):
610         # Install keys
611         self._master_passphrase = passphrase
612         self._master_prk = prk
613         self._master_puk = puk
614         self._master_prk_name = os.path.basename(prk.name)
615         self._master_puk_name = os.path.basename(puk.name)
616         
617     def _do_install_keys(self):
618         prk = self._master_prk
619         puk = self._master_puk
620        
621         try:
622             self._popen_scp(
623                 [ prk.name, puk.name ],
624                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, self.home_path )
625                 )
626         except RuntimeError, e:
627             raise RuntimeError, "Failed to set up application deployment keys: %s %s" \
628                     % (e.args[0], e.args[1],)
629
630         try:
631             self._popen_scp(
632                 cStringIO.StringIO('%s,%s %s\n' % (
633                     self._master.node.hostname, self._master.node.hostip, 
634                     self._master.node.server_key)),
635                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
636                     os.path.join(self.home_path,"master_known_hosts") )
637                 )
638         except RuntimeError, e:
639             raise RuntimeError, "Failed to set up application deployment keys: %s %s" \
640                     % (e.args[0], e.args[1],)
641         
642     
643     def cleanup(self):
644         # make sure there's no leftover build processes
645         self._do_kill_build()
646         
647         # No longer need'em
648         self._master_prk = None
649         self._master_puk = None
650
651     @server.eintr_retry
652     def _popen_scp(self, src, dst, retry = 3):
653         while 1:
654             try:
655                 (out,err),proc = server.popen_scp(
656                     src,
657                     dst, 
658                     port = None,
659                     agent = None,
660                     ident_key = self.node.ident_path,
661                     server_key = self.node.server_key
662                     )
663
664                 if server.eintr_retry(proc.wait)():
665                     raise RuntimeError, (out, err)
666                 return (out, err), proc
667             except:
668                 if retry <= 0:
669                     raise
670                 else:
671                     retry -= 1
672   
673
674     @server.eintr_retry
675     def _popen_ssh_command(self, command, retry = 0, noerrors=False, timeout=None):
676         (out,err),proc = server.popen_ssh_command(
677             command,
678             host = self.node.hostname,
679             port = None,
680             user = self.node.slicename,
681             agent = None,
682             ident_key = self.node.ident_path,
683             server_key = self.node.server_key,
684             timeout = timeout,
685             retry = retry
686             )
687
688         if server.eintr_retry(proc.wait)():
689             if not noerrors:
690                 raise RuntimeError, (out, err)
691         return (out, err), proc
692
693 class Application(Dependency):
694     """
695     An application also has dependencies, but also a command to be ran and monitored.
696     
697     It adds the output of that command as traces.
698     """
699     
700     TRACES = ('stdout','stderr','buildlog', 'output')
701     
702     def __init__(self, api=None):
703         super(Application,self).__init__(api)
704         
705         # Attributes
706         self.command = None
707         self.sudo = False
708         
709         self.stdin = None
710         self.stdout = None
711         self.stderr = None
712         self.output = None
713         
714         # Those are filled when the app is started
715         #   Having both pid and ppid makes it harder
716         #   for pid rollover to induce tracking mistakes
717         self._started = False
718         self._pid = None
719         self._ppid = None
720
721         # Do not add to the python path of nodes
722         self.add_to_path = False
723     
724     def __str__(self):
725         return "%s<command:%s%s>" % (
726             self.__class__.__name__,
727             "sudo " if self.sudo else "",
728             self.command,
729         )
730     
731     def start(self):
732         self._logger.info("Starting %s", self)
733         
734         # Create shell script with the command
735         # This way, complex commands and scripts can be ran seamlessly
736         # sync files
737         command = cStringIO.StringIO()
738         command.write('export PYTHONPATH=$PYTHONPATH:%s\n' % (
739             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
740         ))
741         command.write('export PATH=$PATH:%s\n' % (
742             ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.node.pythonpath])
743         ))
744         if self.node.env:
745             for envkey, envvals in self.node.env.iteritems():
746                 for envval in envvals:
747                     command.write('export %s=%s\n' % (envkey, envval))
748         command.write(self.command)
749         command.seek(0)
750
751         try:
752             self._popen_scp(
753                 command,
754                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
755                     os.path.join(self.home_path, "app.sh"))
756                 )
757         except RuntimeError, e:
758             raise RuntimeError, "Failed to set up application: %s %s" \
759                     % (e.args[0], e.args[1],)
760         
761         # Start process in a "daemonized" way, using nohup and heavy
762         # stdin/out redirection to avoid connection issues
763         (out,err),proc = rspawn.remote_spawn(
764             self._replace_paths("bash ./app.sh"),
765             
766             pidfile = './pid',
767             home = self.home_path,
768             stdin = 'stdin' if self.stdin is not None else '/dev/null',
769             stdout = 'stdout' if self.stdout else '/dev/null',
770             stderr = 'stderr' if self.stderr else '/dev/null',
771             sudo = self.sudo,
772             host = self.node.hostname,
773             port = None,
774             user = self.node.slicename,
775             agent = None,
776             ident_key = self.node.ident_path,
777             server_key = self.node.server_key
778             )
779         
780         if proc.wait():
781             if self.check_bad_host(out, err):
782                 self.node.blacklist()
783             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
784
785         self._started = True
786     
787     def recover(self):
788         # Assuming the application is running on PlanetLab,
789         # proper pidfiles should be present at the app's home path.
790         # So we mark this application as started, and check the pidfiles
791         self._started = True
792         self.checkpid()
793
794     def checkpid(self):            
795         # Get PID/PPID
796         # NOTE: wait a bit for the pidfile to be created
797         if self._started and not self._pid or not self._ppid:
798             pidtuple = rspawn.remote_check_pid(
799                 os.path.join(self.home_path,'pid'),
800                 host = self.node.hostname,
801                 port = None,
802                 user = self.node.slicename,
803                 agent = None,
804                 ident_key = self.node.ident_path,
805                 server_key = self.node.server_key
806                 )
807             
808             if pidtuple:
809                 self._pid, self._ppid = pidtuple
810     
811     def status(self):
812         self.checkpid()
813         if not self._started:
814             return AS.STATUS_NOT_STARTED
815         elif not self._pid or not self._ppid:
816             return AS.STATUS_NOT_STARTED
817         else:
818             status = rspawn.remote_status(
819                 self._pid, self._ppid,
820                 host = self.node.hostname,
821                 port = None,
822                 user = self.node.slicename,
823                 agent = None,
824                 ident_key = self.node.ident_path,
825                 server_key = self.node.server_key
826                 )
827             
828             if status is rspawn.NOT_STARTED:
829                 return AS.STATUS_NOT_STARTED
830             elif status is rspawn.RUNNING:
831                 return AS.STATUS_RUNNING
832             elif status is rspawn.FINISHED:
833                 return AS.STATUS_FINISHED
834             else:
835                 # WTF?
836                 return AS.STATUS_NOT_STARTED
837     
838     def kill(self):
839         status = self.status()
840         if status == AS.STATUS_RUNNING:
841             # kill by ppid+pid - SIGTERM first, then try SIGKILL
842             rspawn.remote_kill(
843                 self._pid, self._ppid,
844                 host = self.node.hostname,
845                 port = None,
846                 user = self.node.slicename,
847                 agent = None,
848                 ident_key = self.node.ident_path,
849                 server_key = self.node.server_key,
850                 sudo = self.sudo
851                 )
852             self._logger.info("Killed %s", self)
853
854
855 class NepiDependency(Dependency):
856     """
857     This dependency adds nepi itself to the python path,
858     so that you may run testbeds within PL nodes.
859     """
860     
861     # Class attribute holding a *weak* reference to the shared NEPI tar file
862     # so that they may share it. Don't operate on the file itself, it would
863     # be a mess, just use its path.
864     _shared_nepi_tar = None
865     
866     def __init__(self, api = None):
867         super(NepiDependency, self).__init__(api)
868         
869         self._tarball = None
870         
871         self.depends = 'python python-ipaddr python-setuptools'
872         
873         # our sources are in our ad-hoc tarball
874         self.sources = self.tarball.name
875         
876         tarname = os.path.basename(self.tarball.name)
877         
878         # it's already built - just move the tarball into place
879         self.build = "mv -f ${SOURCES}/%s ." % (tarname,)
880         
881         # unpack it into sources, and we're done
882         self.install = "tar xzf ${BUILD}/%s -C .." % (tarname,)
883     
884     @property
885     def tarball(self):
886         if self._tarball is None:
887             shared_tar = self._shared_nepi_tar and self._shared_nepi_tar()
888             if shared_tar is not None:
889                 self._tarball = shared_tar
890             else:
891                 # Build an ad-hoc tarball
892                 # Prebuilt
893                 import nepi
894                 import tempfile
895                 
896                 shared_tar = tempfile.NamedTemporaryFile(prefix='nepi-src-', suffix='.tar.gz')
897                 
898                 proc = subprocess.Popen(
899                     ["tar", "czf", shared_tar.name, 
900                         '-C', os.path.join(os.path.dirname(os.path.dirname(nepi.__file__)),'.'), 
901                         'nepi'],
902                     stdout = open("/dev/null","w"),
903                     stdin = open("/dev/null","r"))
904
905                 if proc.wait():
906                     raise RuntimeError, "Failed to create nepi tarball"
907                 
908                 self._tarball = self._shared_nepi_tar = shared_tar
909                 
910         return self._tarball
911
912 class NS3Dependency(Dependency):
913     """
914     This dependency adds NS3 libraries to the library paths,
915     so that you may run the NS3 testbed within PL nodes.
916     
917     You'll also need the NepiDependency.
918     """
919     
920     def __init__(self, api = None):
921         super(NS3Dependency, self).__init__(api)
922         
923         self.buildDepends = 'make waf gcc gcc-c++ gccxml unzip bzr'
924         
925         # We have to download the sources, untar, build...
926         #pygccxml_source_url = "http://leaseweb.dl.sourceforge.net/project/pygccxml/pygccxml/pygccxml-1.0/pygccxml-1.0.0.zip"
927         pygccxml_source_url = "http://yans.pl.sophia.inria.fr/libs/pygccxml-1.0.0.zip"
928         ns3_source_url = "http://nepi.inria.fr/code/nepi-ns3.13/archive/tip.tar.gz"
929         passfd_source_url = "http://nepi.inria.fr/code/python-passfd/archive/tip.tar.gz"
930         
931         pybindgen_version = "797"
932
933         self.build =(
934             " ( "
935             "  cd .. && "
936             "  python -c 'import pygccxml, pybindgen, passfd' && "
937             "  test -f lib/ns/_core.so && "
938             "  test -f lib/ns/__init__.py && "
939             "  test -f lib/ns/core.py && "
940             "  test -f lib/libns3-core.so && "
941             "  LD_LIBRARY_PATH=lib PYTHONPATH=lib python -c 'import ns.core' "
942             " ) || ( "
943                 # Not working, rebuild
944                      # Archive SHA1 sums to check
945                      "echo '7158877faff2254e6c094bf18e6b4283cac19137  pygccxml-1.0.0.zip' > archive_sums.txt && "
946                      " ( " # check existing files
947                      " sha1sum -c archive_sums.txt && "
948                      " test -f passfd-src.tar.gz && "
949                      " test -f ns3-src.tar.gz "
950                      " ) || ( " # nope? re-download
951                      " rm -rf pybindgen pygccxml-1.0.0.zip passfd-src.tar.gz ns3-src.tar.gz && "
952                      " bzr checkout lp:pybindgen -r %(pybindgen_version)s && " # continue, to exploit the case when it has already been dl'ed
953                      " wget -q -c -O pygccxml-1.0.0.zip %(pygccxml_source_url)s && " 
954                      " wget -q -c -O passfd-src.tar.gz %(passfd_source_url)s && "
955                      " wget -q -c -O ns3-src.tar.gz %(ns3_source_url)s && "  
956                      " sha1sum -c archive_sums.txt " # Check SHA1 sums when applicable
957                      " ) && "
958                      "unzip -n pygccxml-1.0.0.zip && "
959                      "mkdir -p ns3-src && "
960                      "mkdir -p passfd-src && "
961                      "tar xzf ns3-src.tar.gz --strip-components=1 -C ns3-src && "
962                      "tar xzf passfd-src.tar.gz --strip-components=1 -C passfd-src && "
963                      "rm -rf target && "    # mv doesn't like unclean targets
964                      "mkdir -p target && "
965                      "cd pygccxml-1.0.0 && "
966                      "rm -rf unittests docs && " # pygccxml has ~100M of unit tests - excessive - docs aren't needed either
967                      "python setup.py build && "
968                      "python setup.py install --install-lib ${BUILD}/target && "
969                      "python setup.py clean && "
970                      "cd ../pybindgen && "
971                      "export PYTHONPATH=$PYTHONPATH:${BUILD}/target && "
972                      "./waf configure --prefix=${BUILD}/target -d release && "
973                      "./waf && "
974                      "./waf install && "
975                      "./waf clean && "
976                      "mv -f ${BUILD}/target/lib/python*/site-packages/pybindgen ${BUILD}/target/. && "
977                      "rm -rf ${BUILD}/target/lib && "
978                      "cd ../passfd-src && "
979                      "python setup.py build && "
980                      "python setup.py install --install-lib ${BUILD}/target && "
981                      "python setup.py clean && "
982                      "cd ../ns3-src && "
983                      "./waf configure --prefix=${BUILD}/target --with-pybindgen=../pybindgen-src -d release --disable-examples --disable-tests && "
984                      "./waf &&"
985                      "./waf install && "
986                      "rm -f ${BUILD}/target/lib/*.so && "
987                      "cp -a ${BUILD}/ns3-src/build/libns3*.so ${BUILD}/target/lib && "
988                      "cp -a ${BUILD}/ns3-src/build/bindings/python/ns ${BUILD}/target/lib &&"
989                      "./waf clean "
990              " )"
991                      % dict(
992                         pybindgen_version = server.shell_escape(pybindgen_version),
993                         pygccxml_source_url = server.shell_escape(pygccxml_source_url),
994                         ns3_source_url = server.shell_escape(ns3_source_url),
995                         passfd_source_url = server.shell_escape(passfd_source_url),
996                      ))
997         
998         # Just move ${BUILD}/target
999         self.install = (
1000             " ( "
1001             "  cd .. && "
1002             "  python -c 'import pygccxml, pybindgen, passfd' && "
1003             "  test -f lib/ns/_core.so && "
1004             "  test -f lib/ns/__init__.py && "
1005             "  test -f lib/ns/core.py && "
1006             "  test -f lib/libns3-core.so && "
1007             "  LD_LIBRARY_PATH=lib PYTHONPATH=lib python -c 'import ns.core' "
1008             " ) || ( "
1009                 # Not working, reinstall
1010                     "test -d ${BUILD}/target && "
1011                     "[[ \"x\" != \"x$(find ${BUILD}/target -mindepth 1 -print -quit)\" ]] &&"
1012                     "( for i in ${BUILD}/target/* ; do rm -rf ${SOURCES}/${i##*/} ; done ) && " # mv doesn't like unclean targets
1013                     "mv -f ${BUILD}/target/* ${SOURCES}"
1014             " )"
1015         )
1016         
1017         # Set extra environment paths
1018         self.env['NEPI_NS3BINDINGS'] = "${SOURCES}/lib"
1019         self.env['NEPI_NS3LIBRARY'] = "${SOURCES}/lib"
1020     
1021     @property
1022     def tarball(self):
1023         if self._tarball is None:
1024             shared_tar = self._shared_nepi_tar and self._shared_nepi_tar()
1025             if shared_tar is not None:
1026                 self._tarball = shared_tar
1027             else:
1028                 # Build an ad-hoc tarball
1029                 # Prebuilt
1030                 import nepi
1031                 import tempfile
1032                 
1033                 shared_tar = tempfile.NamedTemporaryFile(prefix='nepi-src-', suffix='.tar.gz')
1034                 
1035                 proc = subprocess.Popen(
1036                     ["tar", "czf", shared_tar.name, 
1037                         '-C', os.path.join(os.path.dirname(os.path.dirname(nepi.__file__)),'.'), 
1038                         'nepi'],
1039                     stdout = open("/dev/null","w"),
1040                     stdin = open("/dev/null","r"))
1041
1042                 if proc.wait():
1043                     raise RuntimeError, "Failed to create nepi tarball"
1044                 
1045                 self._tarball = self._shared_nepi_tar = shared_tar
1046                 
1047         return self._tarball
1048
1049 class YumDependency(Dependency):
1050     """
1051     This dependency is an internal helper class used to
1052     efficiently distribute yum-downloaded rpms.
1053     
1054     It temporarily sets the yum cache as persistent in the
1055     build master, and installs all the required packages.
1056     
1057     The rpm packages left in the yum cache are gathered and
1058     distributed by the underlying Dependency in an efficient
1059     manner. Build slaves will then install those rpms back in
1060     the cache before issuing the install command.
1061     
1062     When packages have been installed already, nothing but an
1063     empty tar is distributed.
1064     """
1065     
1066     # Class attribute holding a *weak* reference to the shared NEPI tar file
1067     # so that they may share it. Don't operate on the file itself, it would
1068     # be a mess, just use its path.
1069     _shared_nepi_tar = None
1070     
1071     def _build_get(self):
1072         # canonical representation of dependencies
1073         depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
1074         
1075         # download rpms and pack into a tar archive
1076         return (
1077             "sudo -S nice yum -y makecache && "
1078             "sudo -S sed -i -r 's/keepcache *= *0/keepcache=1/' /etc/yum.conf && "
1079             " ( ( "
1080                 "sudo -S nice yum -y install %s ; "
1081                 "rm -f ${BUILD}/packages.tar ; "
1082                 "tar -C /var/cache/yum -rf ${BUILD}/packages.tar $(cd /var/cache/yum ; find -iname '*.rpm')"
1083             " ) || /bin/true ) && "
1084             "sudo -S sed -i -r 's/keepcache *= *1/keepcache=0/' /etc/yum.conf && "
1085             "( sudo -S nice yum -y clean packages || /bin/true ) "
1086         ) % ( depends, )
1087     def _build_set(self, value):
1088         # ignore
1089         return
1090     build = property(_build_get, _build_set)
1091     
1092     def _install_get(self):
1093         # canonical representation of dependencies
1094         depends = ' '.join( sorted( (self.depends or "").split(' ') ) )
1095         
1096         # unpack cached rpms into yum cache, install, and cleanup
1097         return (
1098             "sudo -S tar -k --keep-newer-files -C /var/cache/yum -xf packages.tar && "
1099             "sudo -S nice yum -y install %s && "
1100             "( sudo -S nice yum -y clean packages || /bin/true ) "
1101         ) % ( depends, )
1102     def _install_set(self, value):
1103         # ignore
1104         return
1105     install = property(_install_get, _install_set)
1106         
1107     def check_bad_host(self, out, err):
1108         badre = re.compile(r'(?:'
1109                            r'The GPG keys listed for the ".*" repository are already installed but they are not correct for this package'
1110                            r'|Error: Cannot retrieve repository metadata (repomd.xml) for repository: .*[.] Please verify its path and try again'
1111                            r'|Error: disk I/O error'
1112                            r'|MASTER NODE UNREACHABLE'
1113                            r')', 
1114                            re.I)
1115         return badre.search(out) or badre.search(err) or self.node.check_bad_host(out,err)
1116
1117
1118 class CCNxDaemon(Application):
1119     """
1120     An application also has dependencies, but also a command to be ran and monitored.
1121     
1122     It adds the output of that command as traces.
1123     """
1124     
1125     def __init__(self, api=None):
1126         super(CCNxDaemon,self).__init__(api)
1127         
1128         # Attributes
1129         self.ccnLocalPort = None
1130         self.ccnRoutes = None
1131         self.ccnxVersion = "0.7.1"
1132         
1133         #self.ccnx_0_6_0_sources = "http://yans.pl.sophia.inria.fr/libs/ccnx-0.6.0.tar.gz"
1134         self.ccnx_sources = "http://www.ccnx.org/releases/ccnx-%s.tar.gz"
1135         self.buildDepends = 'make gcc openssl-devel expat-devel libpcap-devel libxml2-devel'
1136
1137         self.ccnx_build = (
1138             " ( "
1139             "  cd .. && "
1140             "  test -d ccnx-src/build/bin "
1141             " ) || ( "
1142                 # Not working, rebuild
1143                 "("
1144                      " mkdir -p ccnx-src && "
1145                      " wget -q -c -O ccnx-src.tar.gz %(ccnx_source_url)s &&"
1146                      " tar xf ccnx-src.tar.gz --strip-components=1 -C ccnx-src "
1147                 ") && "
1148                      "cd ccnx-src && "
1149                      "./configure && make"
1150              " )") % dict(
1151                      ccnx_source_url = server.shell_escape(self.ccnx_sources % self.ccnxVersion),
1152                 )
1153
1154         self.ccnx_install = (
1155             " ( "
1156             "  test -d ${BUILD}/ccnx-src/bin && "
1157             "  cp -r ${BUILD}/ccnx-src/bin ${SOURCES}"
1158             " )"
1159         )
1160
1161         self.env['PATH'] = "$PATH:${SOURCES}/bin"
1162
1163     def setup(self):
1164         # setting ccn sources
1165         if not self.build:
1166             self.build = self.ccnx_build
1167
1168         if not self.install:
1169                 self.install = self.ccnx_install
1170
1171         super(CCNxDaemon, self).setup()
1172
1173     def start(self):
1174         self.command = ""
1175         if self.ccnLocalPort:
1176             self.command = "export CCN_LOCAL_PORT=%s ; " % self.ccnLocalPort
1177         self.command += " ccndstart "
1178
1179         # configure ccn routes
1180         if self.ccnRoutes:
1181             routes = self.ccnRoutes.split("|")
1182             
1183             if self.ccnLocalPort:
1184                 routes = map(lambda route: "%s %s" %(route, 
1185                     self.ccnLocalPort) if _ccnre.match(route) else route, 
1186                         routes)
1187
1188             routes = map(lambda route: "ccndc add ccnx:/ %s" % route, 
1189                 routes)
1190
1191             routescmd = " ; ".join(routes)
1192             self.command += " ; "
1193             self.command += routescmd
1194
1195         # Start will be invoked in prestart step
1196         super(CCNxDaemon, self).start()
1197             
1198     def kill(self):
1199         self._logger.info("Killing %s", self)
1200
1201         command = "${SOURCES}/bin/ccndstop"
1202
1203         if self.ccnLocalPort:
1204             self.command = "export CCN_LOCAL_PORT=%s; %s" % (self.ccnLocalPort, command)
1205
1206         cmd = self._replace_paths(command)
1207         command = cStringIO.StringIO()
1208         command.write(cmd)
1209         command.seek(0)
1210
1211         try:
1212             self._popen_scp(
1213                 command,
1214                 '%s@%s:%s' % (self.node.slicename, self.node.hostname, 
1215                     os.path.join(self.home_path, "kill.sh"))
1216                 )
1217         except RuntimeError, e:
1218             raise RuntimeError, "Failed to kill ccndxdaemon: %s %s" \
1219                     % (e.args[0], e.args[1],)
1220         
1221
1222         script = "bash ./kill.sh"
1223         (out,err),proc = rspawn.remote_spawn(
1224             script,
1225             pidfile = 'kill-pid',
1226             home = self.home_path,
1227             stdin = '/dev/null',
1228             stdout = 'killlog',
1229             stderr = rspawn.STDOUT,
1230             
1231             host = self.node.hostname,
1232             port = None,
1233             user = self.node.slicename,
1234             agent = None,
1235             ident_key = self.node.ident_path,
1236             server_key = self.node.server_key,
1237             hostip = self.node.hostip,
1238             )
1239         
1240         if proc.wait():
1241             raise RuntimeError, "Failed to kill cnnxdaemon: %s %s" % (out,err,)
1242         
1243         super(CCNxDaemon, self).kill()
1244