NECo: A tool to design and run experiments on arbitrary platforms.
[nepi.git] / src / neco / resources / base / linux_node.py
1 from neco.execution.resource import Resource
2 from neco.util.sshfuncs import eintr_retry, shell_escape, rexec, rcopy, \
3         rspawn, rcheck_pid, rstatus, rkill, RUNNING 
4
5 import cStringIO
6 import logging
7 import os.path
8
9 class LinuxNode(Resource):
10     def __init__(self, box, ec):
11         super(LinuxNode, self).__init__(box, ec)
12         self.ip = None
13         self.host = None
14         self.user = None
15         self.port = None
16         self.identity_file = None
17         # packet management system - either yum or apt for now...
18         self._pm = None
19        
20         # Logging
21         loglevel = "debug"
22         self._logger = logging.getLogger("neco.resources.base.LinuxNode.%s" %\
23                 self.box.guid)
24         self._logger.setLevel(getattr(logging, loglevel.upper()))
25
26     @property
27     def pm(self):
28         if self._pm:
29             return self._pm
30
31         if (not (self.host or self.ip) or not self.user):
32             msg = "Can't resolve package management system. Insufficient data."
33             self._logger.error(msg)
34             raise RuntimeError(msg)
35
36         out = self.execute("cat /etc/issue")
37
38         if out.find("Fedora") == 0:
39             self._pm = "yum -y "
40         elif out.find("Debian") == 0 or out.find("Ubuntu") ==0:
41             self._pm = "apt-get -y "
42         else:
43             msg = "Can't resolve package management system. Unknown OS."
44             self._logger.error(msg)
45             raise RuntimeError(msg)
46
47         return self._pm
48     
49     def execute(self, command,
50             agent = True,
51             sudo = False,
52             stdin = "", 
53             tty = False,
54             timeout = None,
55             retry = 0,
56             err_on_timeout = True,
57             connect_timeout = 30,
58             persistent = True):
59         """ Notice that this invocation will block until the
60         execution finishes. If this is not the desired behavior,
61         use 'run' instead."""
62         (out, err), proc = eintr_retry(rexec)(
63                 command, 
64                 self.host or self.ip, 
65                 self.user,
66                 port = self.port, 
67                 agent = agent,
68                 sudo = sudo,
69                 stdin = stdin, 
70                 identity_file = self.identity_file,
71                 tty = tty,
72                 timeout = timeout,
73                 retry = retry,
74                 err_on_timeout = err_on_timeout,
75                 connect_timeout = connect_timeout,
76                 persistent = persistent)
77
78         if proc.wait():
79             msg = "Failed to execute command %s at node %s: %s %s" % \
80                     (command, self.host or self.ip, out, err,)
81             self._logger.warn(msg)
82             raise RuntimeError(msg)
83
84         return out
85
86     def package_install(self, dependencies):
87         if not isinstance(dependencies, list):
88             dependencies = [dependencies]
89
90         for d in dependencies:
91             self.execute("%s install %s" % (self.pm, d), sudo = True, 
92                     tty2 = True)
93
94     def upload(self, src, dst):
95         if not os.path.isfile(src):
96             src = cStringIO.StringIO(src)
97
98         (out, err), proc = eintr_retry(rcopy)(
99             src, dst, 
100             self.host or self.ip, 
101             self.user,
102             port = self.port,
103             identity_file = self.identity_file)
104
105         if proc.wait():
106             msg = "Error uploading to %s got:\n%s%s" %\
107                     (self.host or self.ip, out, err)
108             self._logger.error(msg)
109             raise RuntimeError(msg)
110
111     def is_alive(self, verbose = False):
112         (out, err), proc = eintr_retry(rexec)(
113                 "echo 'ALIVE'",
114                 self.host or self.ip, 
115                 self.user,
116                 port = self.port, 
117                 identity_file = self.identity_file,
118                 timeout = 60,
119                 err_on_timeout = False,
120                 persistent = False)
121         
122         if proc.wait():
123             self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
124             return False
125         elif out.strip().startswith('ALIVE'):
126             return True
127         else:
128             self._logger.warn("Unresponsive node %s got:\n%s%s", self.host, out, err)
129             return False
130
131     def mkdir(self, path, clean = True):
132         if clean:
133             self.execute(
134                 "rm -f %s" % shell_escape(path),
135                 timeout = 120,
136                 retry = 3
137                 )
138
139         self.execute(
140             "mkdir -p %s" % shell_escape(path),
141             timeout = 120,
142             retry = 3
143             )
144
145     def run(self, command, home, 
146             stdin = 'stdin', 
147             stdout = 'stdout', 
148             stderr = 'stderr', 
149             sudo = False):
150         self._logger.info("Running %s", command)
151
152         # Start process in a "daemonized" way, using nohup and heavy
153         # stdin/out redirection to avoid connection issues
154         (out,err), proc = rspawn(
155             command,
156             pidfile = './pid',
157             home = home,
158             stdin = stdin if stdin is not None else '/dev/null',
159             stdout = stdout if stdout else '/dev/null',
160             stderr = stderr if stderr else '/dev/null',
161             sudo = sudo,
162             host = self.host,
163             user = self.user,
164             port = self.port,
165             identity_file = self.identity_file
166             )
167         
168         if proc.wait():
169             raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
170     
171     def checkpid(self, path):            
172         # Get PID/PPID
173         # NOTE: wait a bit for the pidfile to be created
174         pidtuple = rcheck_pid(
175             os.path.join(path, 'pid'),
176             host = self.host,
177             user = self.user,
178             port = self.port,
179             identity_file = self.identity_file
180             )
181         
182         return pidtuple
183     
184     def status(self, pid, ppid):
185         status = rstatus(
186                 pid, ppid,
187                 host = self.host,
188                 user = self.user,
189                 port = self.port,
190                 identity_file = self.identity_file
191                 )
192            
193         return status
194     
195     def kill(self, pid, ppid, sudo = False):
196         status = self.status(pid, ppid)
197         if status == RUNNING:
198             # kill by ppid+pid - SIGTERM first, then try SIGKILL
199             rkill(
200                 pid, ppid,
201                 host = self.host,
202                 user = self.user,
203                 port = self.port,
204                 sudo = sudo,
205                 identity_file = self.identity_file
206                 )
207