Adding WilabtSfaNode
[nepi.git] / src / nepi / resources / omf / wilabt_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import time
30 import re
31 import weakref
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class WilabtSfaNode(LinuxNode):
38     _rtype = "WilabtSfaNode"
39     _help = "Controls a Wilabt host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "omf"
42
43     @classmethod
44     def _register_attributes(cls):
45
46         sfa_user = Attribute("sfauser", "SFA user",
47                     flags = Flags.Credential)
48
49         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50                             used to generate the user credential",
51                             flags = Flags.Credential)
52
53         slicename = Attribute("slicename", "SFA slice for the experiment",
54                     flags = Flags.Credential)
55
56         gateway_user = Attribute("gatewayUser", "Gateway account username",
57                 flags = Flags.Design)
58
59         gateway = Attribute("gateway", "Hostname of the gateway machine",
60                 flags = Flags.Design)
61
62         cls._register_attribute(sfa_user)
63         cls._register_attribute(sfa_private_key)
64         cls._register_attribute(slicename)
65         cls._register_attribute(gateway_user)
66         cls._register_attribute(gateway)
67
68     def __init__(self, ec, guid):
69         super(WilabtSfaNode, self).__init__(ec, guid)
70
71         self._ecobj = weakref.ref(ec)
72         self._sfaapi = None
73         self._node_to_provision = None
74         self._slicenode = False
75         self._hostname = False
76         self._username = None
77
78     def _skip_provision(self):
79         sfa_user = self.get("sfauser")
80         if not sfa_user:
81             return True
82         else: return False
83     
84     @property
85     def sfaapi(self):
86         if not self._sfaapi:
87             sfa_user = self.get("sfauser")
88             sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
89             sfa_auth = '.'.join(sfa_user.split('.')[:2])
90             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
91             sfa_private_key = self.get("sfaPrivateKey")
92             batch = True
93
94             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
95                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
96             
97             if not _sfaapi:
98                 self.fail_sfaapi()
99
100             self._sfaapi = weakref.ref(_sfaapi)
101
102         return self._sfaapi()
103
104     def do_discover(self):
105         """
106         Based on the attributes defined by the user, discover the suitable 
107         nodes for provision.
108         """
109         if self._skip_provision():
110             super(WilabtSfaNode, self).do_discover()
111             return
112
113         nodes = self.sfaapi.get_resources_hrn()
114
115         hostname = self._get_hostname()
116         if hostname:
117             # the user specified one particular node to be provisioned
118             self._hostname = True
119             host_hrn = nodes[hostname]
120
121             # check that the node is not blacklisted or being provisioned
122             # by other RM
123             if not self._blacklisted(host_hrn):
124                 if not self._reserved(host_hrn):
125                     if self._check_if_in_slice([host_hrn]):
126                         self.debug("Node already in slice %s" % host_hrn)
127                         self._slicenode = True
128                     hostname = hostname + '.wilab2.ilabt.iminds.be'
129                     self.set('hostname', hostname)
130                     self._node_to_provision = host_hrn
131                     super(WilabtSfaNode, self).do_discover()
132
133     def do_provision(self):
134         """
135         Add node to user's slice after verifing that the node is functioning
136         correctly.
137         """
138         if self._skip_provision():
139             super(WilabtSfaNode, self).do_provision()
140             return
141
142         provision_ok = False
143         ssh_ok = False
144         proc_ok = False
145         timeout = 300
146
147         while not provision_ok:
148             node = self._node_to_provision
149             if self._slicenode:
150                 self._delete_from_slice()
151                 self.debug("Waiting 300 seg for re-adding to slice")
152                 time.sleep(300) # Timout for the testbed to allow a new reservation
153             self._add_node_to_slice(node)
154             t = 0
155             while not self._check_if_in_slice([node]) and t < timeout:
156                 t = t + 5
157                 time.sleep(t)
158                 self.debug("Waiting 5 seg for resources to be added")
159                 continue
160             self._get_username()
161             ssh_ok = self._check_ssh_loop()          
162
163             if not ssh_ok:
164                 # the timeout was reach without establishing ssh connection
165                 # the node is blacklisted, and a new
166                 # node to provision is discovered
167                 self._blacklist_node(node)
168                 self.do_discover()
169                 continue
170             
171             # check /proc directory is mounted (ssh_ok = True)
172             # file system is not read only, hostname is correct
173             # and omf_rc process is up
174             else:
175                 if not self._check_fs():
176                     self.do_discover()
177                     continue
178                 if not self._check_omf():
179                     self.do_discover()
180                     continue
181                 if not self._check_hostname():
182                     self.do_discover()
183                     continue
184             
185                 else:
186                     provision_ok = True
187                     if not self.get('hostname'):
188                         self._set_hostname_attr(node)            
189                     self.info(" Node provisioned ")            
190             
191         super(WilabtSfaNode, self).do_provision()
192
193     def _blacklisted(self, host_hrn):
194         if self.sfaapi.blacklisted(host_hrn):
195            self.fail_node_not_available(host_hrn)
196         return False
197
198     def _reserved(self, host_hrn):
199         if self.sfaapi.reserved(host_hrn):
200             self.fail_node_not_available(host_hrn)
201         return False
202
203     def _get_username(self):
204         slicename = self.get("slicename")
205         if self._username is None:
206             slice_info = self.sfaapi.get_slice_resources(slicename)
207             username = slice_info['resource'][0]['services'][0]['login'][0]['username']
208             self.set('username', username)
209             self.debug("Retriving username information from RSpec %s" % username)
210             self._username = username
211             
212     def _check_ssh_loop(self):
213         t = 0
214         timeout = 10
215         ssh_ok = False
216         while t < timeout and not ssh_ok:
217             cmd = 'echo \'GOOD NODE\''
218             ((out, err), proc) = self.execute(cmd)
219             if out.find("GOOD NODE") < 0:
220                 self.debug( "No SSH connection, waiting 60s" )
221                 t = t + 5
222                 time.sleep(5)
223                 continue
224             else:
225                 self.debug( "SSH OK" )
226                 ssh_ok = True
227                 continue
228         return ssh_ok
229
230     def _check_fs(self):
231         cmd = 'mount |grep proc'
232         ((out, err), proc) = self.execute(cmd)
233         if out.find("/proc type proc") < 0:
234             self.warning(" Corrupted file system ")
235             self._blacklist_node(node)
236             return False
237         return True
238
239     def _check_omfrc(self):
240         cmd = 'ps aux|grep omf'
241         ((out, err), proc) = self.execute(cmd)
242         if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
243             return False
244         return True
245
246     def _check_hostname(self):
247         cmd = 'hostname'
248         ((out, err), proc) = self.execute(cmd)
249         if 'localhost' in out.lower():
250             return False
251         return True 
252
253     def _add_node_to_slice(self, host_hrn):
254         self.info(" Adding node to slice ")
255         slicename = self.get("slicename")
256         self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn)
257
258     def _delete_from_slice(self):
259         self.warning(" Deleting all slivers from slice ")
260         slicename = self.get("slicename")
261         self.sfaapi.remove_all_from_slice(slicename)
262
263     def _get_hostname(self):
264         hostname = self.get("hostname")
265         if hostname:
266             return hostname
267         else:
268             return None
269
270     def _set_hostname_attr(self, node):
271         """
272         Query SFAAPI for the hostname of a certain host hrn and sets the
273         attribute hostname, it will over write the previous value
274         """
275         hosts_hrn = self.sfaapi.get_resources_hrn()
276         for hostname, hrn  in hosts_hrn.iteritems():
277             if hrn == node:
278                 hostname = hostname + '.wilab2.ilabt.iminds.be'
279                 self.set("hostname", hostname)
280
281     def _check_if_in_slice(self, hosts_hrn):
282         """
283         Check using SFA API if any host hrn from hosts_hrn is in the user's
284         slice
285         """
286         slicename = self.get("slicename")
287         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
288         if slice_nodes:
289             if len(slice_nodes[0]['services']) != 0:
290                 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
291         else: slice_nodes_hrn = []
292         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
293         return nodes_inslice
294
295     def _do_ping(self, hostname):
296         """
297         Perform ping command on node's IP matching hostname
298         """
299         ping_ok = False
300         guser = self.get("gatewayUser")
301         gw = self.get("gateway")
302         host = hostname + ".wilab2.ilabt.iminds.be"
303         command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
304         (out, err) = lexec(command)
305         m = re.search("(\d+)% packet loss", str(out))
306         if m and int(m.groups()[0]) < 50:
307             ping_ok = True
308
309         return ping_ok
310
311     def _blacklist_node(self, host_hrn):
312         """
313         Add node mal functioning node to blacklist
314         """
315         self.warning(" Blacklisting malfunctioning node ")
316         self.sfaapi.blacklist_resource(host_hrn)
317         if not self._hostname:
318             self.set('hostname', None)
319         else:
320             self.set('hostname', host_hrn.split('.').pop())
321
322     def _put_node_in_provision(self, host_hrn):
323         """
324         Add node to the list of nodes being provisioned, in order for other RMs
325         to not try to provision the same one again
326         """
327         self.sfaapi.reserve_resource(host_hrn)
328
329     def _get_ip(self, hostname):
330         """
331         Query PLCAPI for the IP of a node with certain node id
332         """
333         try:
334             ip = sshfuncs.gethostbyname(hostname)
335         except:
336             # Fail while trying to find the IP
337             return None
338         return ip
339
340     def fail_discovery(self):
341         msg = "Discovery failed. No candidates found for node"
342         self.error(msg)
343         raise RuntimeError, msg
344
345     def fail_node_not_alive(self, hostname=None):
346         msg = "Node %s not alive" % hostname
347         raise RuntimeError, msg
348     
349     def fail_node_not_available(self, hostname):
350         msg = "Node %s not available for provisioning" % hostname
351         raise RuntimeError, msg
352
353     def fail_not_enough_nodes(self):
354         msg = "Not enough nodes available for provisioning"
355         raise RuntimeError, msg
356
357     def fail_plapi(self):
358         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
359             " attributes pluser and plpassword."
360         raise RuntimeError, msg
361
362     def valid_connection(self, guid):
363         # TODO: Validate!
364         return True
365
366