Changes for release
[nepi.git] / src / nepi / resources / omf / wilabt_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.omf.node import OMFNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import time
30 import re
31 import weakref
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class WilabtSfaNode(OMFNode):
38     _rtype = "WilabtSfaNode"
39     _help = "Controls a Wilabt host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "omf"
42
43     @classmethod
44     def _register_attributes(cls):
45         
46         username = Attribute("username", "Local account username",
47                 flags = Flags.Credential)
48
49         identity = Attribute("identity", "SSH identity file",
50                 flags = Flags.Credential)
51
52         server_key = Attribute("serverKey", "Server public key",
53                 flags = Flags.Design)
54
55         sfa_user = Attribute("sfauser", "SFA user",
56                     flags = Flags.Credential)
57
58         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
59                             used to generate the user credential",
60                             flags = Flags.Credential)
61
62         slicename = Attribute("slicename", "SFA slice for the experiment",
63                     flags = Flags.Credential)
64
65         gateway_user = Attribute("gatewayUser", "Gateway account username",
66                 flags = Flags.Design)
67
68         gateway = Attribute("gateway", "Hostname of the gateway machine",
69                 flags = Flags.Design)
70
71         host = Attribute("host", "Name of the physical machine",
72                 flags = Flags.Design)
73
74         #disk_image = Attribute("disk_image", "Specify a specific disk image for a node",
75         #        flags = Flags.Design)
76         
77         cls._register_attribute(username)
78         cls._register_attribute(identity)
79         cls._register_attribute(server_key)
80         cls._register_attribute(sfa_user)
81         cls._register_attribute(sfa_private_key)
82         cls._register_attribute(slicename)
83         cls._register_attribute(gateway_user)
84         cls._register_attribute(gateway)
85         cls._register_attribute(host)
86         #cls._register_attribute(disk_image)
87
88     def __init__(self, ec, guid):
89         super(WilabtSfaNode, self).__init__(ec, guid)
90
91         self._ecobj = weakref.ref(ec)
92         self._sfaapi = None
93         self._node_to_provision = None
94         self._slicenode = False
95         self._host = False
96         self._username = None
97
98     def _skip_provision(self):
99         sfa_user = self.get("sfauser")
100         if not sfa_user:
101             return True
102         else: return False
103     
104     @property
105     def sfaapi(self):
106         """
107         Property to instanciate the SFA API based in sfi client.
108         For each SFA method called this instance is used.
109         """
110         if not self._sfaapi:
111             sfa_user = self.get("sfauser")
112             sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
113             sfa_auth = '.'.join(sfa_user.split('.')[:2])
114             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
115             sfa_private_key = self.get("sfaPrivateKey")
116             batch = True
117
118             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
119                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
120             
121             if not _sfaapi:
122                 self.fail_sfaapi()
123
124             self._sfaapi = weakref.ref(_sfaapi)
125
126         return self._sfaapi()
127
128     def do_discover(self):
129         """
130         Based on the attributes defined by the user, discover the suitable 
131         node for provision.
132         """
133         if self._skip_provision():
134             super(WilabtSfaNode, self).do_discover()
135             return
136
137         nodes = self.sfaapi.get_resources_hrn()
138
139         host = self._get_host()
140         if host:
141             # the user specified one particular node to be provisioned
142             self._host = True
143             host_hrn = nodes[host]
144
145             # check that the node is not blacklisted or being provisioned
146             # by other RM
147             if not self._blacklisted(host_hrn):
148                 if not self._reserved(host_hrn):
149                     if self._check_if_in_slice([host_hrn]):
150                         self.debug("Node already in slice %s" % host_hrn)
151                         self._slicenode = True
152                     host = host + '.wilab2.ilabt.iminds.be'
153                     self.set('host', host)
154                     self._node_to_provision = host_hrn
155                     super(WilabtSfaNode, self).do_discover()
156
157     def do_provision(self):
158         """
159         Add node to user's slice and verifing that the node is functioning
160         correctly. Check ssh, omf rc running, hostname, file system.
161         """
162         if self._skip_provision():
163             super(WilabtSfaNode, self).do_provision()
164             return
165
166         provision_ok = False
167         ssh_ok = False
168         proc_ok = False
169         timeout = 300
170
171         while not provision_ok:
172             node = self._node_to_provision
173             if self._slicenode:
174                 self._delete_from_slice()
175                 self.debug("Waiting 300 sec for re-adding to slice")
176                 time.sleep(300) # Timout for the testbed to allow a new reservation
177             self._add_node_to_slice(node)
178             t = 0
179             while not self._check_if_in_slice([node]) and t < timeout \
180                 and not self._ecobj().abort:
181                 t = t + 5
182                 time.sleep(t)
183                 self.debug("Waiting 5 sec for resources to be added")
184                 continue
185
186             if not self._check_if_in_slice([node]):
187                 self.debug("Couldn't add node %s to slice" % node)
188                 self.fail_node_not_available(node)
189
190             self._get_username()
191             ssh_ok = self._check_ssh_loop()          
192
193             if not ssh_ok:
194                 # the timeout was reach without establishing ssh connection
195                 # the node is blacklisted, and a new
196                 # node to provision is discovered
197                 self._blacklist_node(node)
198                 self.do_discover()
199                 continue
200             
201             # check /proc directory is mounted (ssh_ok = True)
202             # file system is not read only, hostname is correct
203             # and omf_rc process is up
204             else:
205                 if not self._check_fs():
206                     self.do_discover()
207                     continue
208                 if not self._check_omfrc():
209                     self.do_discover()
210                     continue
211                 if not self._check_hostname():
212                     self.do_discover()
213                     continue
214             
215                 else:
216                     provision_ok = True
217                     if not self.get('host'):
218                         self._set_host_attr(node)            
219                     self.info(" Node provisioned ")            
220             
221         super(WilabtSfaNode, self).do_provision()
222
223     def do_deploy(self):
224         if self.state == ResourceState.NEW:
225             self.info("Deploying w-iLab.t node")
226             self.do_discover()
227             self.do_provision()
228         super(WilabtSfaNode, self).do_deploy()
229
230     def do_release(self):
231         super(WilabtSfaNode, self).do_release()
232         if self.state == ResourceState.RELEASED and not self._skip_provision():
233             self.debug(" Releasing SFA API ")
234             self.sfaapi.release()
235
236     def _blacklisted(self, host_hrn):
237         """
238         Check in the SFA API that the node is not in the blacklist.
239         """
240         if self.sfaapi.blacklisted(host_hrn):
241            self.fail_node_not_available(host_hrn)
242         return False
243
244     def _reserved(self, host_hrn):
245         """
246         Check in the SFA API that the node is not in the reserved
247         list.
248         """
249         if self.sfaapi.reserved(host_hrn):
250             self.fail_node_not_available(host_hrn)
251         return False
252
253     def _get_username(self):
254         """
255         Get the username for login in to the nodes from RSpec.
256         Wilabt username is not made out of any convention, it
257         has to be retrived from the manifest RSpec.
258         """
259         slicename = self.get("slicename")
260         if self._username is None:
261             slice_info = self.sfaapi.get_slice_resources(slicename)
262             username = slice_info['resource'][0]['services'][0]['login'][0]['username']
263             self.set('username', username)
264             self.debug("Retriving username information from RSpec %s" % username)
265             self._username = username
266             
267     def _check_ssh_loop(self):
268         """
269         Check that the ssh login is possible. In wilabt is done
270         through the gateway because is private testbed.
271         """
272         t = 0
273         timeout = 1200
274         ssh_ok = False
275         while t < timeout and not ssh_ok:
276             cmd = 'echo \'GOOD NODE\''
277             ((out, err), proc) = self.execute(cmd)
278             if out.find("GOOD NODE") < 0:
279                 self.debug( "No SSH connection, waiting 20s" )
280                 t = t + 20
281                 time.sleep(20)
282                 continue
283             else:
284                 self.debug( "SSH OK" )
285                 ssh_ok = True
286                 continue
287         return ssh_ok
288
289     def _check_fs(self):
290         """
291         Check file system, /proc well mounted.
292         """
293         cmd = 'mount |grep proc'
294         ((out, err), proc) = self.execute(cmd)
295         if out.find("/proc type proc") < 0:
296             self.warning(" Corrupted file system ")
297             self._blacklist_node(node)
298             return False
299         return True
300
301     def _check_omfrc(self):
302         """
303         Check that OMF 6 resource controller is running.
304         """
305         cmd = 'ps aux|grep omf'
306         ((out, err), proc) = self.execute(cmd)
307         if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
308             return False
309         return True
310
311     def _check_hostname(self):
312         """
313         Check that the hostname in the image is not set to localhost.
314         """
315         cmd = 'hostname'
316         ((out, err), proc) = self.execute(cmd)
317         if 'localhost' in out.lower():
318             return False
319         else:
320             self.set('hostname', out.strip()) 
321         return True 
322
323     def execute(self, command,
324         sudo = False,
325         env = None,
326         tty = False,
327         forward_x11 = False,
328         retry = 3,
329         connect_timeout = 30,
330         strict_host_checking = False,
331         persistent = True,
332         blocking = True,
333         ):
334         """ Notice that this invocation will block until the
335         execution finishes. If this is not the desired behavior,
336         use 'run' instead."""
337         (out, err), proc = sshfuncs.rexec(
338             command,
339             host = self.get("host"),
340             user = self.get("username"),
341             port = 22,
342             gwuser = self.get("gatewayUser"),
343             gw = self.get("gateway"),
344             agent = True,
345             sudo = sudo,
346             identity = self.get("identity"),
347             server_key = self.get("serverKey"),
348             env = env,
349             tty = tty,
350             forward_x11 = forward_x11,
351             retry = retry,
352             connect_timeout = connect_timeout,
353             persistent = persistent,
354             blocking = blocking,
355             strict_host_checking = strict_host_checking
356             )
357
358         return (out, err), proc
359
360
361     def _add_node_to_slice(self, host_hrn):
362         """
363         Add node to slice, using SFA API. Actually Wilabt testbed
364         doesn't allow adding nodes, in fact in the API there is method
365         to group all the nodes instanciated as WilabtSfaNodes and the
366         Allocate and Provision is done with the last call at 
367         sfaapi.add_resource_to_slice_batch.
368         """
369         self.info(" Adding node to slice ")
370         slicename = self.get("slicename")
371         #disk_image = self.get("disk_image")
372         #if disk_image is not None:
373         #    properties = {'disk_image': disk_image}
374         #else: properties = None
375         properties = None
376         self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn, properties=properties)
377
378     def _delete_from_slice(self):
379         """
380         Delete every node from slice, using SFA API.
381         Wilabt doesn't allow to remove one sliver so this method 
382         remove every slice from the slice.
383         """
384
385         self.warning(" Deleting all slivers from slice ")
386         slicename = self.get("slicename")
387         self.sfaapi.remove_all_from_slice(slicename)
388
389     def _get_host(self):
390         """
391         Get the attribute hostname.
392         """
393         host = self.get("host")
394         if host:
395             return host
396         else:
397             return None
398
399     def _set_host_attr(self, node):
400         """
401         Query SFAAPI for the hostname of a certain host hrn and sets the
402         attribute hostname, it will over write the previous value.
403         """
404         hosts_hrn = self.sfaapi.get_resources_hrn()
405         for host, hrn  in hosts_hrn.iteritems():
406             if hrn == node:
407                 host = host + '.wilab2.ilabt.iminds.be'
408                 self.set("host", host)
409
410     def _check_if_in_slice(self, hosts_hrn):
411         """
412         Check using SFA API if any host hrn from hosts_hrn is in the user's
413         slice.
414         """
415         slicename = self.get("slicename")
416         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
417         if slice_nodes:
418             if len(slice_nodes[0]['services']) != 0:
419                 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
420         else: slice_nodes_hrn = []
421         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
422         return nodes_inslice
423
424     def _blacklist_node(self, host_hrn):
425         """
426         Add mal functioning node to blacklist (in SFA API).
427         """
428         self.warning(" Blacklisting malfunctioning node ")
429         self.sfaapi.blacklist_resource(host_hrn)
430         if not self._host:
431             self.set('host', None)
432         else:
433             self.set('host', host_hrn.split('.').pop())
434
435     def _put_node_in_provision(self, host_hrn):
436         """
437         Add node to the list of nodes being provisioned, in order for other RMs
438         to not try to provision the same one again.
439         """
440         self.sfaapi.reserve_resource(host_hrn)
441
442     def _get_ip(self, host):
443         """
444         Query cache for the IP of a node with certain hostname
445         """
446         try:
447             ip = sshfuncs.gethostbyname(host)
448         except:
449             # Fail while trying to find the IP
450             return None
451         return ip
452
453     def fail_discovery(self):
454         msg = "Discovery failed. No candidates found for node"
455         self.error(msg)
456         raise RuntimeError, msg
457
458     def fail_node_not_alive(self, host=None):
459         msg = "Node %s not alive" % host
460         raise RuntimeError, msg
461     
462     def fail_node_not_available(self, host):
463         msg = "Some nodes not available for provisioning"
464         raise RuntimeError, msg
465
466     def fail_not_enough_nodes(self):
467         msg = "Not enough nodes available for provisioning"
468         raise RuntimeError, msg
469
470     def fail_sfaapi(self):
471         msg = "Failing while trying to instanciate the SFA API."
472         raise RuntimeError, msg
473
474     def valid_connection(self, guid):
475         # TODO: Validate!
476         return True
477
478