d18769acb2fdaaffbfa8ebf6975dd79e246c61b2
[nepi.git] / src / nepi / resources / omf / wilabt_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.omf.node import OMFNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import time
30 import re
31 import weakref
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class WilabtSfaNode(OMFNode):
38     _rtype = "WilabtSfaNode"
39     _help = "Controls a Wilabt host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "omf"
42
43     @classmethod
44     def _register_attributes(cls):
45         
46         username = Attribute("username", "Local account username",
47                 flags = Flags.Credential)
48
49         identity = Attribute("identity", "SSH identity file",
50                 flags = Flags.Credential)
51
52         server_key = Attribute("serverKey", "Server public key",
53                 flags = Flags.Design)
54
55         sfa_user = Attribute("sfauser", "SFA user",
56                     flags = Flags.Credential)
57
58         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
59                             used to generate the user credential",
60                             flags = Flags.Credential)
61
62         slicename = Attribute("slicename", "SFA slice for the experiment",
63                     flags = Flags.Credential)
64
65         gateway_user = Attribute("gatewayUser", "Gateway account username",
66                 flags = Flags.Design)
67
68         gateway = Attribute("gateway", "Hostname of the gateway machine",
69                 flags = Flags.Design)
70
71         hostxmpp = Attribute("hostxmpp", "Hostname from RSpec to use in xmpp messages",
72                 flags = Flags.Design)
73
74         disk_image = Attribute("disk_image", "Specify a specific disk image for a node",
75                 flags = Flags.Design)
76         
77         cls._register_attribute(username)
78         cls._register_attribute(identity)
79         cls._register_attribute(server_key)
80         cls._register_attribute(sfa_user)
81         cls._register_attribute(sfa_private_key)
82         cls._register_attribute(slicename)
83         cls._register_attribute(gateway_user)
84         cls._register_attribute(gateway)
85         cls._register_attribute(hostxmpp)
86         cls._register_attribute(disk_image)
87
88     def __init__(self, ec, guid):
89         super(WilabtSfaNode, self).__init__(ec, guid)
90
91         self._ecobj = weakref.ref(ec)
92         self._sfaapi = None
93         self._node_to_provision = None
94         self._slicenode = False
95         self._hostname = False
96         self._username = None
97
98     def _skip_provision(self):
99         sfa_user = self.get("sfauser")
100         if not sfa_user:
101             return True
102         else: return False
103     
104     @property
105     def sfaapi(self):
106         """
107         Property to instanciate the SFA API based in sfi client.
108         For each SFA method called this instance is used.
109         """
110         if not self._sfaapi:
111             sfa_user = self.get("sfauser")
112             sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
113             sfa_auth = '.'.join(sfa_user.split('.')[:2])
114             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
115             sfa_private_key = self.get("sfaPrivateKey")
116             batch = True
117
118             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
119                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
120             
121             if not _sfaapi:
122                 self.fail_sfaapi()
123
124             self._sfaapi = weakref.ref(_sfaapi)
125
126         return self._sfaapi()
127
128     def do_discover(self):
129         """
130         Based on the attributes defined by the user, discover the suitable 
131         node for provision.
132         """
133         if self._skip_provision():
134             super(WilabtSfaNode, self).do_discover()
135             return
136
137         nodes = self.sfaapi.get_resources_hrn()
138
139         hostname = self._get_hostname()
140         if hostname:
141             # the user specified one particular node to be provisioned
142             self._hostname = True
143             host_hrn = nodes[hostname]
144
145             # check that the node is not blacklisted or being provisioned
146             # by other RM
147             if not self._blacklisted(host_hrn):
148                 if not self._reserved(host_hrn):
149                     if self._check_if_in_slice([host_hrn]):
150                         self.debug("Node already in slice %s" % host_hrn)
151                         self._slicenode = True
152                     hostname = hostname + '.wilab2.ilabt.iminds.be'
153                     self.set('hostname', hostname)
154                     self._node_to_provision = host_hrn
155                     super(WilabtSfaNode, self).do_discover()
156
157     def do_provision(self):
158         """
159         Add node to user's slice and verifing that the node is functioning
160         correctly. Check ssh, omf rc running, hostname, file system.
161         """
162         if self._skip_provision():
163             super(WilabtSfaNode, self).do_provision()
164             return
165
166         provision_ok = False
167         ssh_ok = False
168         proc_ok = False
169         timeout = 300
170
171         while not provision_ok:
172             node = self._node_to_provision
173             if self._slicenode:
174                 self._delete_from_slice()
175                 self.debug("Waiting 300 sec for re-adding to slice")
176                 time.sleep(300) # Timout for the testbed to allow a new reservation
177             self._add_node_to_slice(node)
178             t = 0
179             while not self._check_if_in_slice([node]) and t < timeout \
180                 and not self._ecobj().abort:
181                 t = t + 5
182                 time.sleep(t)
183                 self.debug("Waiting 5 sec for resources to be added")
184                 continue
185
186             if not self._check_if_in_slice([node]):
187                 self.debug("Couldn't add node %s to slice" % node)
188                 self.fail_node_not_available(node)
189
190             self._get_username()
191             ssh_ok = self._check_ssh_loop()          
192
193             if not ssh_ok:
194                 # the timeout was reach without establishing ssh connection
195                 # the node is blacklisted, and a new
196                 # node to provision is discovered
197                 self._blacklist_node(node)
198                 self.do_discover()
199                 continue
200             
201             # check /proc directory is mounted (ssh_ok = True)
202             # file system is not read only, hostname is correct
203             # and omf_rc process is up
204             else:
205                 if not self._check_fs():
206                     self.do_discover()
207                     continue
208                 if not self._check_omfrc():
209                     self.do_discover()
210                     continue
211                 if not self._check_hostname():
212                     self.do_discover()
213                     continue
214             
215                 else:
216                     provision_ok = True
217                     if not self.get('hostname'):
218                         self._set_hostname_attr(node)            
219                     self.info(" Node provisioned ")            
220             
221         super(WilabtSfaNode, self).do_provision()
222
223     def do_release(self):
224         super(WilabtSfaNode, self).do_release()
225         if self.state == ResourceState.RELEASED and not self._skip_provision():
226             self.debug(" Releasing SFA API ")
227             self.sfaapi.release()
228
229     def _blacklisted(self, host_hrn):
230         """
231         Check in the SFA API that the node is not in the blacklist.
232         """
233         if self.sfaapi.blacklisted(host_hrn):
234            self.fail_node_not_available(host_hrn)
235         return False
236
237     def _reserved(self, host_hrn):
238         """
239         Check in the SFA API that the node is not in the reserved
240         list.
241         """
242         if self.sfaapi.reserved(host_hrn):
243             self.fail_node_not_available(host_hrn)
244         return False
245
246     def _get_username(self):
247         """
248         Get the username for login in to the nodes from RSpec.
249         Wilabt username is not made out of any convention, it
250         has to be retrived from the manifest RSpec.
251         """
252         slicename = self.get("slicename")
253         if self._username is None:
254             slice_info = self.sfaapi.get_slice_resources(slicename)
255             username = slice_info['resource'][0]['services'][0]['login'][0]['username']
256             self.set('username', username)
257             self.debug("Retriving username information from RSpec %s" % username)
258             self._username = username
259             
260     def _check_ssh_loop(self):
261         """
262         Check that the ssh login is possible. In wilabt is done
263         through the gateway because is private testbed.
264         """
265         t = 0
266         timeout = 300
267         ssh_ok = False
268         while t < timeout and not ssh_ok:
269             cmd = 'echo \'GOOD NODE\''
270             ((out, err), proc) = self.execute(cmd)
271             if out.find("GOOD NODE") < 0:
272                 self.debug( "No SSH connection, waiting 20s" )
273                 t = t + 20
274                 time.sleep(20)
275                 continue
276             else:
277                 self.debug( "SSH OK" )
278                 ssh_ok = True
279                 continue
280         return ssh_ok
281
282     def _check_fs(self):
283         """
284         Check file system, /proc well mounted.
285         """
286         cmd = 'mount |grep proc'
287         ((out, err), proc) = self.execute(cmd)
288         if out.find("/proc type proc") < 0:
289             self.warning(" Corrupted file system ")
290             self._blacklist_node(node)
291             return False
292         return True
293
294     def _check_omfrc(self):
295         """
296         Check that OMF 6 resource controller is running.
297         """
298         cmd = 'ps aux|grep omf'
299         ((out, err), proc) = self.execute(cmd)
300         if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
301             return False
302         return True
303
304     def _check_hostname(self):
305         """
306         Check that the hostname in the image is not set to localhost.
307         """
308         cmd = 'hostname'
309         ((out, err), proc) = self.execute(cmd)
310         if 'localhost' in out.lower():
311             return False
312         else:
313             self.set('hostxmpp', out.strip()) 
314         return True 
315
316     def execute(self, command,
317         sudo = False,
318         env = None,
319         tty = False,
320         forward_x11 = False,
321         retry = 3,
322         connect_timeout = 30,
323         strict_host_checking = False,
324         persistent = True,
325         blocking = True,
326         ):
327         """ Notice that this invocation will block until the
328         execution finishes. If this is not the desired behavior,
329         use 'run' instead."""
330         (out, err), proc = sshfuncs.rexec(
331             command,
332             host = self.get("hostname"),
333             user = self.get("username"),
334             port = 22,
335             gwuser = self.get("gatewayUser"),
336             gw = self.get("gateway"),
337             agent = True,
338             sudo = sudo,
339             identity = self.get("identity"),
340             server_key = self.get("serverKey"),
341             env = env,
342             tty = tty,
343             forward_x11 = forward_x11,
344             retry = retry,
345             connect_timeout = connect_timeout,
346             persistent = persistent,
347             blocking = blocking,
348             strict_host_checking = strict_host_checking
349             )
350
351         return (out, err), proc
352
353
354     def _add_node_to_slice(self, host_hrn):
355         """
356         Add node to slice, using SFA API. Actually Wilabt testbed
357         doesn't allow adding nodes, in fact in the API there is method
358         to group all the nodes instanciated as WilabtSfaNodes and the
359         Allocate and Provision is done with the last call at 
360         sfaapi.add_resource_to_slice_batch.
361         """
362         self.info(" Adding node to slice ")
363         slicename = self.get("slicename")
364         disk_image = self.get("disk_image")
365         if disk_image is not None:
366             properties = {'disk_image': disk_image}
367         else: properties = None
368         self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn, properties=properties)
369
370     def _delete_from_slice(self):
371         """
372         Delete every node from slice, using SFA API.
373         Wilabt doesn't allow to remove one sliver so this method 
374         remove every slice from the slice.
375         """
376
377         self.warning(" Deleting all slivers from slice ")
378         slicename = self.get("slicename")
379         self.sfaapi.remove_all_from_slice(slicename)
380
381     def _get_hostname(self):
382         """
383         Get the attribute hostname.
384         """
385         hostname = self.get("hostname")
386         if hostname:
387             return hostname
388         else:
389             return None
390
391     def _set_hostname_attr(self, node):
392         """
393         Query SFAAPI for the hostname of a certain host hrn and sets the
394         attribute hostname, it will over write the previous value.
395         """
396         hosts_hrn = self.sfaapi.get_resources_hrn()
397         for hostname, hrn  in hosts_hrn.iteritems():
398             if hrn == node:
399                 hostname = hostname + '.wilab2.ilabt.iminds.be'
400                 self.set("hostname", hostname)
401
402     def _check_if_in_slice(self, hosts_hrn):
403         """
404         Check using SFA API if any host hrn from hosts_hrn is in the user's
405         slice.
406         """
407         slicename = self.get("slicename")
408         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
409         if slice_nodes:
410             if len(slice_nodes[0]['services']) != 0:
411                 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
412         else: slice_nodes_hrn = []
413         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
414         return nodes_inslice
415
416     def _do_ping(self, hostname):
417         """
418         Perform ping command on node's IP matching hostname.
419         """
420         ping_ok = False
421         guser = self.get("gatewayUser")
422         gw = self.get("gateway")
423         host = hostname + ".wilab2.ilabt.iminds.be"
424         command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
425         (out, err) = lexec(command)
426         m = re.search("(\d+)% packet loss", str(out))
427         if m and int(m.groups()[0]) < 50:
428             ping_ok = True
429
430         return ping_ok
431
432     def _blacklist_node(self, host_hrn):
433         """
434         Add mal functioning node to blacklist (in SFA API).
435         """
436         self.warning(" Blacklisting malfunctioning node ")
437         self.sfaapi.blacklist_resource(host_hrn)
438         if not self._hostname:
439             self.set('hostname', None)
440         else:
441             self.set('hostname', host_hrn.split('.').pop())
442
443     def _put_node_in_provision(self, host_hrn):
444         """
445         Add node to the list of nodes being provisioned, in order for other RMs
446         to not try to provision the same one again.
447         """
448         self.sfaapi.reserve_resource(host_hrn)
449
450     def _get_ip(self, hostname):
451         """
452         Query cache for the IP of a node with certain hostname
453         """
454         try:
455             ip = sshfuncs.gethostbyname(hostname)
456         except:
457             # Fail while trying to find the IP
458             return None
459         return ip
460
461     def fail_discovery(self):
462         msg = "Discovery failed. No candidates found for node"
463         self.error(msg)
464         raise RuntimeError, msg
465
466     def fail_node_not_alive(self, hostname=None):
467         msg = "Node %s not alive" % hostname
468         raise RuntimeError, msg
469     
470     def fail_node_not_available(self, hostname):
471         msg = "Some nodes not available for provisioning"
472         raise RuntimeError, msg
473
474     def fail_not_enough_nodes(self):
475         msg = "Not enough nodes available for provisioning"
476         raise RuntimeError, msg
477
478     def fail_sfaapi(self):
479         msg = "Failing while trying to instanciate the SFA API."
480         raise RuntimeError, msg
481
482     def valid_connection(self, guid):
483         # TODO: Validate!
484         return True
485
486