2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 from nepi.resources.omf.node import OMFNode
23 from nepi.util.sfaapi import SFAAPIFactory
24 from nepi.util.execfuncs import lexec
25 from nepi.util import sshfuncs
27 from random import randint
36 class WilabtSfaNode(OMFNode):
37 _rtype = "wilabt::sfa::Node"
38 _help = "Controls a Wilabt host accessible using a SSH key " \
39 "and provisioned using SFA"
43 def _register_attributes(cls):
45 username = Attribute("username", "Local account username",
46 flags = Flags.Credential)
48 identity = Attribute("identity", "SSH identity file",
49 flags = Flags.Credential)
51 server_key = Attribute("serverKey", "Server public key",
54 sfa_user = Attribute("sfauser", "SFA user",
55 flags = Flags.Credential)
57 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
58 used to generate the user credential",
59 flags = Flags.Credential)
61 slicename = Attribute("slicename", "SFA slice for the experiment",
62 flags = Flags.Credential)
64 gateway_user = Attribute("gatewayUser", "Gateway account username",
67 gateway = Attribute("gateway", "Hostname of the gateway machine",
70 host = Attribute("host", "Name of the physical machine",
73 disk_image = Attribute("disk_image", "Specify a specific disk image for a node",
76 cls._register_attribute(username)
77 cls._register_attribute(identity)
78 cls._register_attribute(server_key)
79 cls._register_attribute(sfa_user)
80 cls._register_attribute(sfa_private_key)
81 cls._register_attribute(slicename)
82 cls._register_attribute(gateway_user)
83 cls._register_attribute(gateway)
84 cls._register_attribute(host)
85 cls._register_attribute(disk_image)
87 def __init__(self, ec, guid):
88 super(WilabtSfaNode, self).__init__(ec, guid)
90 self._ecobj = weakref.ref(ec)
92 self._node_to_provision = None
93 self._slicenode = False
97 def _skip_provision(self):
98 sfa_user = self.get("sfauser")
106 Property to instanciate the SFA API based in sfi client.
107 For each SFA method called this instance is used.
110 sfa_user = self.get("sfauser")
111 sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
112 sfa_auth = '.'.join(sfa_user.split('.')[:2])
113 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
114 sfa_private_key = self.get("sfaPrivateKey")
117 _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
118 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
123 self._sfaapi = weakref.ref(_sfaapi)
125 return self._sfaapi()
127 def do_discover(self):
129 Based on the attributes defined by the user, discover the suitable
132 nodes = self.sfaapi.get_resources_hrn()
134 host = self._get_host()
136 # the user specified one particular node to be provisioned
138 host_hrn = nodes[host]
140 # check that the node is not blacklisted or being provisioned
142 if not self._blacklisted(host_hrn):
143 if not self._reserved(host_hrn):
144 if self._check_if_in_slice([host_hrn]):
145 self.debug("Node already in slice %s" % host_hrn)
146 self._slicenode = True
147 host = host + '.wilab2.ilabt.iminds.be'
148 self.set('host', host)
149 self._node_to_provision = host_hrn
150 super(WilabtSfaNode, self).do_discover()
152 def do_provision(self):
154 Add node to user's slice and verifing that the node is functioning
155 correctly. Check ssh, omf rc running, hostname, file system.
162 while not provision_ok:
163 node = self._node_to_provision
165 # self._delete_from_slice()
166 # self.debug("Waiting 480 sec for re-adding to slice")
167 # time.sleep(480) # Timout for the testbed to allow a new reservation
168 self._add_node_to_slice(node)
170 while not self._check_if_in_slice([node]) and t < timeout \
171 and not self._ecobj().abort:
174 self.debug("Waiting 5 sec for resources to be added")
177 if not self._check_if_in_slice([node]):
178 self.debug("Couldn't add node %s to slice" % node)
179 self.fail_node_not_available(node)
182 ssh_ok = self._check_ssh_loop()
185 # the timeout was reach without establishing ssh connection
186 # the node is blacklisted, and a new
187 # node to provision is discovered
188 self._blacklist_node(node)
192 # check /proc directory is mounted (ssh_ok = True)
193 # file system is not read only, hostname is correct
194 # and omf_rc process is up
196 if not self._check_fs():
199 if not self._check_omfrc():
202 if not self._check_hostname():
208 if not self.get('host'):
209 self._set_host_attr(node)
210 self.info(" Node provisioned ")
212 super(WilabtSfaNode, self).do_provision()
215 if self.state == ResourceState.NEW:
216 self.info("Deploying w-iLab.t node")
219 super(WilabtSfaNode, self).do_deploy()
221 def do_release(self):
222 super(WilabtSfaNode, self).do_release()
223 if self.state == ResourceState.RELEASED and not self._skip_provision():
224 self.debug(" Releasing SFA API ")
225 self.sfaapi.release()
227 def _blacklisted(self, host_hrn):
229 Check in the SFA API that the node is not in the blacklist.
231 if self.sfaapi.blacklisted(host_hrn):
232 self.fail_node_not_available(host_hrn)
235 def _reserved(self, host_hrn):
237 Check in the SFA API that the node is not in the reserved
240 if self.sfaapi.reserved(host_hrn):
241 self.fail_node_not_available(host_hrn)
244 def _get_username(self):
246 Get the username for login in to the nodes from RSpec.
247 Wilabt username is not made out of any convention, it
248 has to be retrived from the manifest RSpec.
250 slicename = self.get("slicename")
251 if self._username is None:
252 slice_info = self.sfaapi.get_slice_resources(slicename)
253 username = slice_info['resource'][0]['services'][0]['login'][0]['username']
254 self.set('username', username)
255 self.debug("Retriving username information from RSpec %s" % username)
256 self._username = username
258 def _check_ssh_loop(self):
260 Check that the ssh login is possible. In wilabt is done
261 through the gateway because is private testbed.
266 while t < timeout and not ssh_ok:
267 cmd = 'echo \'GOOD NODE\''
268 ((out, err), proc) = self.execute(cmd)
269 if out.find("GOOD NODE") < 0:
270 self.debug( "No SSH connection, waiting 20s" )
275 self.debug( "SSH OK" )
282 Check file system, /proc well mounted.
284 cmd = 'mount |grep proc'
285 ((out, err), proc) = self.execute(cmd)
286 if out.find("/proc type proc") < 0:
287 self.warning(" Corrupted file system ")
288 self._blacklist_node(node)
292 def _check_omfrc(self):
294 Check that OMF 6 resource controller is running.
296 cmd = 'ps aux|grep omf'
297 ((out, err), proc) = self.execute(cmd)
298 if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
302 def _check_hostname(self):
304 Check that the hostname in the image is not set to localhost.
307 ((out, err), proc) = self.execute(cmd)
308 if 'localhost' in out.lower():
311 self.set('hostname', out.strip())
314 def execute(self, command,
320 connect_timeout = 30,
321 strict_host_checking = False,
325 """ Notice that this invocation will block until the
326 execution finishes. If this is not the desired behavior,
327 use 'run' instead."""
328 (out, err), proc = sshfuncs.rexec(
330 host = self.get("host"),
331 user = self.get("username"),
333 gwuser = self.get("gatewayUser"),
334 gw = self.get("gateway"),
337 identity = self.get("identity"),
338 server_key = self.get("serverKey"),
341 forward_x11 = forward_x11,
343 connect_timeout = connect_timeout,
344 persistent = persistent,
346 strict_host_checking = strict_host_checking
349 return (out, err), proc
352 def _add_node_to_slice(self, host_hrn):
354 Add node to slice, using SFA API. Actually Wilabt testbed
355 doesn't allow adding nodes, in fact in the API there is method
356 to group all the nodes instanciated as WilabtSfaNodes and the
357 Allocate and Provision is done with the last call at
358 sfaapi.add_resource_to_slice_batch.
360 self.info(" Adding node to slice ")
361 slicename = self.get("slicename")
362 disk_image = self.get("disk_image")
363 if disk_image is not None:
364 properties = {'disk_image': disk_image}
365 else: properties = None
367 self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn, properties=properties)
369 def _delete_from_slice(self):
371 Delete every node from slice, using SFA API.
372 Wilabt doesn't allow to remove one sliver so this method
373 remove every slice from the slice.
376 self.warning(" Deleting all slivers from slice ")
377 slicename = self.get("slicename")
378 self.sfaapi.remove_all_from_slice(slicename)
382 Get the attribute hostname.
384 host = self.get("host")
390 def _set_host_attr(self, node):
392 Query SFAAPI for the hostname of a certain host hrn and sets the
393 attribute hostname, it will over write the previous value.
395 hosts_hrn = self.sfaapi.get_resources_hrn()
396 for host, hrn in hosts_hrn.iteritems():
398 host = host + '.wilab2.ilabt.iminds.be'
399 self.set("host", host)
401 def _check_if_in_slice(self, hosts_hrn):
403 Check using SFA API if any host hrn from hosts_hrn is in the user's
406 slicename = self.get("slicename")
407 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
409 if len(slice_nodes[0]['services']) != 0:
410 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
411 else: slice_nodes_hrn = []
412 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
415 def _blacklist_node(self, host_hrn):
417 Add mal functioning node to blacklist (in SFA API).
419 self.warning(" Blacklisting malfunctioning node ")
420 self.sfaapi.blacklist_resource(host_hrn)
422 self.set('host', None)
424 self.set('host', host_hrn.split('.').pop())
426 def _put_node_in_provision(self, host_hrn):
428 Add node to the list of nodes being provisioned, in order for other RMs
429 to not try to provision the same one again.
431 self.sfaapi.reserve_resource(host_hrn)
433 def _get_ip(self, host):
435 Query cache for the IP of a node with certain hostname
438 ip = sshfuncs.gethostbyname(host)
440 # Fail while trying to find the IP
444 def fail_discovery(self):
445 msg = "Discovery failed. No candidates found for node"
447 raise RuntimeError(msg)
449 def fail_node_not_alive(self, host=None):
450 msg = "Node %s not alive" % host
451 raise RuntimeError(msg)
453 def fail_node_not_available(self, host):
454 msg = "Some nodes not available for provisioning"
455 raise RuntimeError(msg)
457 def fail_not_enough_nodes(self):
458 msg = "Not enough nodes available for provisioning"
459 raise RuntimeError(msg)
461 def fail_sfaapi(self):
462 msg = "Failing while trying to instanciate the SFA API."
463 raise RuntimeError(msg)
465 def valid_connection(self, guid):