Merge the OMF 6 branch
[nepi.git] / src / nepi / resources / omf / wilabt_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import time
30 import re
31 import weakref
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class WilabtSfaNode(LinuxNode):
38     _rtype = "WilabtSfaNode"
39     _help = "Controls a Wilabt host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "omf"
42
43     @classmethod
44     def _register_attributes(cls):
45
46         sfa_user = Attribute("sfauser", "SFA user",
47                     flags = Flags.Credential)
48
49         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50                             used to generate the user credential",
51                             flags = Flags.Credential)
52
53         slicename = Attribute("slicename", "SFA slice for the experiment",
54                     flags = Flags.Credential)
55
56         gateway_user = Attribute("gatewayUser", "Gateway account username",
57                 flags = Flags.Design)
58
59         gateway = Attribute("gateway", "Hostname of the gateway machine",
60                 flags = Flags.Design)
61
62         cls._register_attribute(sfa_user)
63         cls._register_attribute(sfa_private_key)
64         cls._register_attribute(slicename)
65         cls._register_attribute(gateway_user)
66         cls._register_attribute(gateway)
67
68     def __init__(self, ec, guid):
69         super(WilabtSfaNode, self).__init__(ec, guid)
70
71         self._ecobj = weakref.ref(ec)
72         self._sfaapi = None
73         self._node_to_provision = None
74         self._slicenode = False
75         self._hostname = False
76         self._username = None
77
78     def _skip_provision(self):
79         sfa_user = self.get("sfauser")
80         if not sfa_user:
81             return True
82         else: return False
83     
84     @property
85     def sfaapi(self):
86         """
87         Property to instanciate the SFA API based in sfi client.
88         For each SFA method called this instance is used.
89         """
90         if not self._sfaapi:
91             sfa_user = self.get("sfauser")
92             sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
93             sfa_auth = '.'.join(sfa_user.split('.')[:2])
94             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
95             sfa_private_key = self.get("sfaPrivateKey")
96             batch = True
97
98             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
99                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
100             
101             if not _sfaapi:
102                 self.fail_sfaapi()
103
104             self._sfaapi = weakref.ref(_sfaapi)
105
106         return self._sfaapi()
107
108     def do_discover(self):
109         """
110         Based on the attributes defined by the user, discover the suitable 
111         node for provision.
112         """
113         if self._skip_provision():
114             super(WilabtSfaNode, self).do_discover()
115             return
116
117         nodes = self.sfaapi.get_resources_hrn()
118
119         hostname = self._get_hostname()
120         if hostname:
121             # the user specified one particular node to be provisioned
122             self._hostname = True
123             host_hrn = nodes[hostname]
124
125             # check that the node is not blacklisted or being provisioned
126             # by other RM
127             if not self._blacklisted(host_hrn):
128                 if not self._reserved(host_hrn):
129                     if self._check_if_in_slice([host_hrn]):
130                         self.debug("Node already in slice %s" % host_hrn)
131                         self._slicenode = True
132                     hostname = hostname + '.wilab2.ilabt.iminds.be'
133                     self.set('hostname', hostname)
134                     self._node_to_provision = host_hrn
135                     super(WilabtSfaNode, self).do_discover()
136
137     def do_provision(self):
138         """
139         Add node to user's slice and verifing that the node is functioning
140         correctly. Check ssh, omf rc running, hostname, file system.
141         """
142         if self._skip_provision():
143             super(WilabtSfaNode, self).do_provision()
144             return
145
146         provision_ok = False
147         ssh_ok = False
148         proc_ok = False
149         timeout = 300
150
151         while not provision_ok:
152             node = self._node_to_provision
153             if self._slicenode:
154                 self._delete_from_slice()
155                 self.debug("Waiting 300 seg for re-adding to slice")
156                 time.sleep(300) # Timout for the testbed to allow a new reservation
157             self._add_node_to_slice(node)
158             t = 0
159             while not self._check_if_in_slice([node]) and t < timeout \
160                 and not self._ecobj().abort:
161                 t = t + 5
162                 time.sleep(t)
163                 self.debug("Waiting 5 seg for resources to be added")
164                 continue
165
166             if not self._check_if_in_slice([node]):
167                 self.debug("Couldn't add node %s to slice" % node)
168                 self.fail_node_not_available(node)
169
170             self._get_username()
171             ssh_ok = self._check_ssh_loop()          
172
173             if not ssh_ok:
174                 # the timeout was reach without establishing ssh connection
175                 # the node is blacklisted, and a new
176                 # node to provision is discovered
177                 self._blacklist_node(node)
178                 self.do_discover()
179                 continue
180             
181             # check /proc directory is mounted (ssh_ok = True)
182             # file system is not read only, hostname is correct
183             # and omf_rc process is up
184             else:
185                 if not self._check_fs():
186                     self.do_discover()
187                     continue
188                 if not self._check_omf():
189                     self.do_discover()
190                     continue
191                 if not self._check_hostname():
192                     self.do_discover()
193                     continue
194             
195                 else:
196                     provision_ok = True
197                     if not self.get('hostname'):
198                         self._set_hostname_attr(node)            
199                     self.info(" Node provisioned ")            
200             
201         super(WilabtSfaNode, self).do_provision()
202
203     def _blacklisted(self, host_hrn):
204         """
205         Check in the SFA API that the node is not in the blacklist.
206         """
207         if self.sfaapi.blacklisted(host_hrn):
208            self.fail_node_not_available(host_hrn)
209         return False
210
211     def _reserved(self, host_hrn):
212         """
213         Check in the SFA API that the node is not in the reserved
214         list.
215         """
216         if self.sfaapi.reserved(host_hrn):
217             self.fail_node_not_available(host_hrn)
218         return False
219
220     def _get_username(self):
221         """
222         Get the username for login in to the nodes from RSpec.
223         Wilabt username is not made out of any convention, it
224         has to be retrived from the manifest RSpec.
225         """
226         slicename = self.get("slicename")
227         if self._username is None:
228             slice_info = self.sfaapi.get_slice_resources(slicename)
229             username = slice_info['resource'][0]['services'][0]['login'][0]['username']
230             self.set('username', username)
231             self.debug("Retriving username information from RSpec %s" % username)
232             self._username = username
233             
234     def _check_ssh_loop(self):
235         """
236         Check that the ssh login is possible. In wilabt is done
237         through the gateway because is private testbed.
238         """
239         t = 0
240         timeout = 10
241         ssh_ok = False
242         while t < timeout and not ssh_ok:
243             cmd = 'echo \'GOOD NODE\''
244             ((out, err), proc) = self.execute(cmd)
245             if out.find("GOOD NODE") < 0:
246                 self.debug( "No SSH connection, waiting 60s" )
247                 t = t + 5
248                 time.sleep(5)
249                 continue
250             else:
251                 self.debug( "SSH OK" )
252                 ssh_ok = True
253                 continue
254         return ssh_ok
255
256     def _check_fs(self):
257         """
258         Check file system, /proc well mounted.
259         """
260         cmd = 'mount |grep proc'
261         ((out, err), proc) = self.execute(cmd)
262         if out.find("/proc type proc") < 0:
263             self.warning(" Corrupted file system ")
264             self._blacklist_node(node)
265             return False
266         return True
267
268     def _check_omfrc(self):
269         """
270         Check that OMF 6 resource controller is running.
271         """
272         cmd = 'ps aux|grep omf'
273         ((out, err), proc) = self.execute(cmd)
274         if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
275             return False
276         return True
277
278     def _check_hostname(self):
279         """
280         Check that the hostname in the image is not set to localhost.
281         """
282         cmd = 'hostname'
283         ((out, err), proc) = self.execute(cmd)
284         if 'localhost' in out.lower():
285             return False
286         return True 
287
288     def _add_node_to_slice(self, host_hrn):
289         """
290         Add node to slice, using SFA API. Actually Wilabt testbed
291         doesn't allow adding nodes, in fact in the API there is method
292         to group all the nodes instanciated as WilabtSfaNodes and the
293         Allocate and Provision is done with the last call at 
294         sfaapi.add_resource_to_slice_batch.
295         """
296         self.info(" Adding node to slice ")
297         slicename = self.get("slicename")
298         self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn)
299
300     def _delete_from_slice(self):
301         """
302         Delete every node from slice, using SFA API.
303         Wilabt doesn't allow to remove one sliver so this method 
304         remove every slice from the slice.
305         """
306
307         self.warning(" Deleting all slivers from slice ")
308         slicename = self.get("slicename")
309         self.sfaapi.remove_all_from_slice(slicename)
310
311     def _get_hostname(self):
312         """
313         Get the attribute hostname.
314         """
315         hostname = self.get("hostname")
316         if hostname:
317             return hostname
318         else:
319             return None
320
321     def _set_hostname_attr(self, node):
322         """
323         Query SFAAPI for the hostname of a certain host hrn and sets the
324         attribute hostname, it will over write the previous value.
325         """
326         hosts_hrn = self.sfaapi.get_resources_hrn()
327         for hostname, hrn  in hosts_hrn.iteritems():
328             if hrn == node:
329                 hostname = hostname + '.wilab2.ilabt.iminds.be'
330                 self.set("hostname", hostname)
331
332     def _check_if_in_slice(self, hosts_hrn):
333         """
334         Check using SFA API if any host hrn from hosts_hrn is in the user's
335         slice.
336         """
337         slicename = self.get("slicename")
338         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
339         if slice_nodes:
340             if len(slice_nodes[0]['services']) != 0:
341                 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
342         else: slice_nodes_hrn = []
343         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
344         return nodes_inslice
345
346     def _do_ping(self, hostname):
347         """
348         Perform ping command on node's IP matching hostname.
349         """
350         ping_ok = False
351         guser = self.get("gatewayUser")
352         gw = self.get("gateway")
353         host = hostname + ".wilab2.ilabt.iminds.be"
354         command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
355         (out, err) = lexec(command)
356         m = re.search("(\d+)% packet loss", str(out))
357         if m and int(m.groups()[0]) < 50:
358             ping_ok = True
359
360         return ping_ok
361
362     def _blacklist_node(self, host_hrn):
363         """
364         Add mal functioning node to blacklist (in SFA API).
365         """
366         self.warning(" Blacklisting malfunctioning node ")
367         self.sfaapi.blacklist_resource(host_hrn)
368         if not self._hostname:
369             self.set('hostname', None)
370         else:
371             self.set('hostname', host_hrn.split('.').pop())
372
373     def _put_node_in_provision(self, host_hrn):
374         """
375         Add node to the list of nodes being provisioned, in order for other RMs
376         to not try to provision the same one again.
377         """
378         self.sfaapi.reserve_resource(host_hrn)
379
380     def _get_ip(self, hostname):
381         """
382         Query cache for the IP of a node with certain hostname
383         """
384         try:
385             ip = sshfuncs.gethostbyname(hostname)
386         except:
387             # Fail while trying to find the IP
388             return None
389         return ip
390
391     def fail_discovery(self):
392         msg = "Discovery failed. No candidates found for node"
393         self.error(msg)
394         raise RuntimeError, msg
395
396     def fail_node_not_alive(self, hostname=None):
397         msg = "Node %s not alive" % hostname
398         raise RuntimeError, msg
399     
400     def fail_node_not_available(self, hostname):
401         msg = "Node %s not available for provisioning" % hostname
402         raise RuntimeError, msg
403
404     def fail_not_enough_nodes(self):
405         msg = "Not enough nodes available for provisioning"
406         raise RuntimeError, msg
407
408     def fail_plapi(self):
409         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
410             " attributes pluser and plpassword."
411         raise RuntimeError, msg
412
413     def valid_connection(self, guid):
414         # TODO: Validate!
415         return True
416
417