c29e8538155a576d753d45e4ba4ed105364975aa
[nepi.git] / src / nepi / resources / omf / wilabt_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState 
23 from nepi.resources.omf.node import OMFNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import time
30 import re
31 import weakref
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class WilabtSfaNode(OMFNode):
38     _rtype = "wilabt::sfa::Node"
39     _help = "Controls a Wilabt host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "omf"
42
43     @classmethod
44     def _register_attributes(cls):
45         
46         username = Attribute("username", "Local account username",
47                 flags = Flags.Credential)
48
49         identity = Attribute("identity", "SSH identity file",
50                 flags = Flags.Credential)
51
52         server_key = Attribute("serverKey", "Server public key",
53                 flags = Flags.Design)
54
55         sfa_user = Attribute("sfauser", "SFA user",
56                     flags = Flags.Credential)
57
58         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
59                             used to generate the user credential",
60                             flags = Flags.Credential)
61
62         slicename = Attribute("slicename", "SFA slice for the experiment",
63                     flags = Flags.Credential)
64
65         gateway_user = Attribute("gatewayUser", "Gateway account username",
66                 flags = Flags.Design)
67
68         gateway = Attribute("gateway", "Hostname of the gateway machine",
69                 flags = Flags.Design)
70
71         host = Attribute("host", "Name of the physical machine",
72                 flags = Flags.Design)
73
74         disk_image = Attribute("disk_image", "Specify a specific disk image for a node",
75                 flags = Flags.Design)
76         
77         cls._register_attribute(username)
78         cls._register_attribute(identity)
79         cls._register_attribute(server_key)
80         cls._register_attribute(sfa_user)
81         cls._register_attribute(sfa_private_key)
82         cls._register_attribute(slicename)
83         cls._register_attribute(gateway_user)
84         cls._register_attribute(gateway)
85         cls._register_attribute(host)
86         cls._register_attribute(disk_image)
87
88     def __init__(self, ec, guid):
89         super(WilabtSfaNode, self).__init__(ec, guid)
90
91         self._ecobj = weakref.ref(ec)
92         self._sfaapi = None
93         self._node_to_provision = None
94         self._slicenode = False
95         self._host = False
96         self._username = None
97
98     def _skip_provision(self):
99         sfa_user = self.get("sfauser")
100         if not sfa_user:
101             return True
102         else: return False
103     
104     @property
105     def sfaapi(self):
106         """
107         Property to instanciate the SFA API based in sfi client.
108         For each SFA method called this instance is used.
109         """
110         if not self._sfaapi:
111             sfa_user = self.get("sfauser")
112             sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
113             sfa_auth = '.'.join(sfa_user.split('.')[:2])
114             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
115             sfa_private_key = self.get("sfaPrivateKey")
116             batch = True
117
118             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
119                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
120             
121             if not _sfaapi:
122                 self.fail_sfaapi()
123
124             self._sfaapi = weakref.ref(_sfaapi)
125
126         return self._sfaapi()
127
128     def do_discover(self):
129         """
130         Based on the attributes defined by the user, discover the suitable 
131         node for provision.
132         """
133         nodes = self.sfaapi.get_resources_hrn()
134
135         host = self._get_host()
136         if host:
137             # the user specified one particular node to be provisioned
138             self._host = True
139             host_hrn = nodes[host]
140
141             # check that the node is not blacklisted or being provisioned
142             # by other RM
143             if not self._blacklisted(host_hrn):
144                 if not self._reserved(host_hrn):
145                     if self._check_if_in_slice([host_hrn]):
146                         self.debug("Node already in slice %s" % host_hrn)
147                         self._slicenode = True
148                     host = host + '.wilab2.ilabt.iminds.be'
149                     self.set('host', host)
150                     self._node_to_provision = host_hrn
151                     super(WilabtSfaNode, self).do_discover()
152
153     def do_provision(self):
154         """
155         Add node to user's slice and verifing that the node is functioning
156         correctly. Check ssh, omf rc running, hostname, file system.
157         """
158         provision_ok = False
159         ssh_ok = False
160         proc_ok = False
161         timeout = 300
162
163         while not provision_ok:
164             node = self._node_to_provision
165             #if self._slicenode:
166             #    self._delete_from_slice()
167             #    self.debug("Waiting 480 sec for re-adding to slice")
168             #    time.sleep(480) # Timout for the testbed to allow a new reservation
169             self._add_node_to_slice(node)
170             t = 0
171             while not self._check_if_in_slice([node]) and t < timeout \
172                 and not self._ecobj().abort:
173                 t = t + 5
174                 time.sleep(t)
175                 self.debug("Waiting 5 sec for resources to be added")
176                 continue
177
178             if not self._check_if_in_slice([node]):
179                 self.debug("Couldn't add node %s to slice" % node)
180                 self.fail_node_not_available(node)
181
182             self._get_username()
183             ssh_ok = self._check_ssh_loop()          
184
185             if not ssh_ok:
186                 # the timeout was reach without establishing ssh connection
187                 # the node is blacklisted, and a new
188                 # node to provision is discovered
189                 self._blacklist_node(node)
190                 self.do_discover()
191                 continue
192             
193             # check /proc directory is mounted (ssh_ok = True)
194             # file system is not read only, hostname is correct
195             # and omf_rc process is up
196             else:
197                 if not self._check_fs():
198                     self.do_discover()
199                     continue
200                 if not self._check_omfrc():
201                     self.do_discover()
202                     continue
203                 if not self._check_hostname():
204                     self.do_discover()
205                     continue
206             
207                 else:
208                     provision_ok = True
209                     if not self.get('host'):
210                         self._set_host_attr(node)            
211                     self.info(" Node provisioned ")            
212             
213         super(WilabtSfaNode, self).do_provision()
214
215     def do_deploy(self):
216         if self.state == ResourceState.NEW:
217             self.info("Deploying w-iLab.t node")
218             self.do_discover()
219             self.do_provision()
220         super(WilabtSfaNode, self).do_deploy()
221
222     def do_release(self):
223         super(WilabtSfaNode, self).do_release()
224         if self.state == ResourceState.RELEASED and not self._skip_provision():
225             self.debug(" Releasing SFA API ")
226             self.sfaapi.release()
227
228     def _blacklisted(self, host_hrn):
229         """
230         Check in the SFA API that the node is not in the blacklist.
231         """
232         if self.sfaapi.blacklisted(host_hrn):
233            self.fail_node_not_available(host_hrn)
234         return False
235
236     def _reserved(self, host_hrn):
237         """
238         Check in the SFA API that the node is not in the reserved
239         list.
240         """
241         if self.sfaapi.reserved(host_hrn):
242             self.fail_node_not_available(host_hrn)
243         return False
244
245     def _get_username(self):
246         """
247         Get the username for login in to the nodes from RSpec.
248         Wilabt username is not made out of any convention, it
249         has to be retrived from the manifest RSpec.
250         """
251         slicename = self.get("slicename")
252         if self._username is None:
253             slice_info = self.sfaapi.get_slice_resources(slicename)
254             username = slice_info['resource'][0]['services'][0]['login'][0]['username']
255             self.set('username', username)
256             self.debug("Retriving username information from RSpec %s" % username)
257             self._username = username
258             
259     def _check_ssh_loop(self):
260         """
261         Check that the ssh login is possible. In wilabt is done
262         through the gateway because is private testbed.
263         """
264         t = 0
265         timeout = 1200
266         ssh_ok = False
267         while t < timeout and not ssh_ok:
268             cmd = 'echo \'GOOD NODE\''
269             ((out, err), proc) = self.execute(cmd)
270             if out.find("GOOD NODE") < 0:
271                 self.debug( "No SSH connection, waiting 20s" )
272                 t = t + 20
273                 time.sleep(20)
274                 continue
275             else:
276                 self.debug( "SSH OK" )
277                 ssh_ok = True
278                 continue
279         return ssh_ok
280
281     def _check_fs(self):
282         """
283         Check file system, /proc well mounted.
284         """
285         cmd = 'mount |grep proc'
286         ((out, err), proc) = self.execute(cmd)
287         if out.find("/proc type proc") < 0:
288             self.warning(" Corrupted file system ")
289             self._blacklist_node(node)
290             return False
291         return True
292
293     def _check_omfrc(self):
294         """
295         Check that OMF 6 resource controller is running.
296         """
297         cmd = 'ps aux|grep omf'
298         ((out, err), proc) = self.execute(cmd)
299         if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
300             return False
301         return True
302
303     def _check_hostname(self):
304         """
305         Check that the hostname in the image is not set to localhost.
306         """
307         cmd = 'hostname'
308         ((out, err), proc) = self.execute(cmd)
309         if 'localhost' in out.lower():
310             return False
311         else:
312             self.set('hostname', out.strip()) 
313         return True 
314
315     def execute(self, command,
316         sudo = False,
317         env = None,
318         tty = False,
319         forward_x11 = False,
320         retry = 3,
321         connect_timeout = 30,
322         strict_host_checking = False,
323         persistent = True,
324         blocking = True,
325         ):
326         """ Notice that this invocation will block until the
327         execution finishes. If this is not the desired behavior,
328         use 'run' instead."""
329         (out, err), proc = sshfuncs.rexec(
330             command,
331             host = self.get("host"),
332             user = self.get("username"),
333             port = 22,
334             gwuser = self.get("gatewayUser"),
335             gw = self.get("gateway"),
336             agent = True,
337             sudo = sudo,
338             identity = self.get("identity"),
339             server_key = self.get("serverKey"),
340             env = env,
341             tty = tty,
342             forward_x11 = forward_x11,
343             retry = retry,
344             connect_timeout = connect_timeout,
345             persistent = persistent,
346             blocking = blocking,
347             strict_host_checking = strict_host_checking
348             )
349
350         return (out, err), proc
351
352
353     def _add_node_to_slice(self, host_hrn):
354         """
355         Add node to slice, using SFA API. Actually Wilabt testbed
356         doesn't allow adding nodes, in fact in the API there is method
357         to group all the nodes instanciated as WilabtSfaNodes and the
358         Allocate and Provision is done with the last call at 
359         sfaapi.add_resource_to_slice_batch.
360         """
361         self.info(" Adding node to slice ")
362         slicename = self.get("slicename")
363         disk_image = self.get("disk_image")
364         if disk_image is not None:
365             properties = {'disk_image': disk_image}
366         else: properties = None
367         #properties = None
368         self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn, properties=properties)
369
370     def _delete_from_slice(self):
371         """
372         Delete every node from slice, using SFA API.
373         Wilabt doesn't allow to remove one sliver so this method 
374         remove every slice from the slice.
375         """
376
377         self.warning(" Deleting all slivers from slice ")
378         slicename = self.get("slicename")
379         self.sfaapi.remove_all_from_slice(slicename)
380
381     def _get_host(self):
382         """
383         Get the attribute hostname.
384         """
385         host = self.get("host")
386         if host:
387             return host
388         else:
389             return None
390
391     def _set_host_attr(self, node):
392         """
393         Query SFAAPI for the hostname of a certain host hrn and sets the
394         attribute hostname, it will over write the previous value.
395         """
396         hosts_hrn = self.sfaapi.get_resources_hrn()
397         for host, hrn  in hosts_hrn.iteritems():
398             if hrn == node:
399                 host = host + '.wilab2.ilabt.iminds.be'
400                 self.set("host", host)
401
402     def _check_if_in_slice(self, hosts_hrn):
403         """
404         Check using SFA API if any host hrn from hosts_hrn is in the user's
405         slice.
406         """
407         slicename = self.get("slicename")
408         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
409         if slice_nodes:
410             if len(slice_nodes[0]['services']) != 0:
411                 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
412         else: slice_nodes_hrn = []
413         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
414         return nodes_inslice
415
416     def _blacklist_node(self, host_hrn):
417         """
418         Add mal functioning node to blacklist (in SFA API).
419         """
420         self.warning(" Blacklisting malfunctioning node ")
421         self.sfaapi.blacklist_resource(host_hrn)
422         if not self._host:
423             self.set('host', None)
424         else:
425             self.set('host', host_hrn.split('.').pop())
426
427     def _put_node_in_provision(self, host_hrn):
428         """
429         Add node to the list of nodes being provisioned, in order for other RMs
430         to not try to provision the same one again.
431         """
432         self.sfaapi.reserve_resource(host_hrn)
433
434     def _get_ip(self, host):
435         """
436         Query cache for the IP of a node with certain hostname
437         """
438         try:
439             ip = sshfuncs.gethostbyname(host)
440         except:
441             # Fail while trying to find the IP
442             return None
443         return ip
444
445     def fail_discovery(self):
446         msg = "Discovery failed. No candidates found for node"
447         self.error(msg)
448         raise RuntimeError, msg
449
450     def fail_node_not_alive(self, host=None):
451         msg = "Node %s not alive" % host
452         raise RuntimeError, msg
453     
454     def fail_node_not_available(self, host):
455         msg = "Some nodes not available for provisioning"
456         raise RuntimeError, msg
457
458     def fail_not_enough_nodes(self):
459         msg = "Not enough nodes available for provisioning"
460         raise RuntimeError, msg
461
462     def fail_sfaapi(self):
463         msg = "Failing while trying to instanciate the SFA API."
464         raise RuntimeError, msg
465
466     def valid_connection(self, guid):
467         # TODO: Validate!
468         return True
469
470