2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.omf.node import OMFNode
24 from nepi.util.sfaapi import SFAAPIFactory
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
28 from random import randint
37 class WilabtSfaNode(OMFNode):
38 _rtype = "WilabtSfaNode"
39 _help = "Controls a Wilabt host accessible using a SSH key " \
40 "and provisioned using SFA"
44 def _register_attributes(cls):
46 username = Attribute("username", "Local account username",
47 flags = Flags.Credential)
49 identity = Attribute("identity", "SSH identity file",
50 flags = Flags.Credential)
52 server_key = Attribute("serverKey", "Server public key",
55 sfa_user = Attribute("sfauser", "SFA user",
56 flags = Flags.Credential)
58 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
59 used to generate the user credential",
60 flags = Flags.Credential)
62 slicename = Attribute("slicename", "SFA slice for the experiment",
63 flags = Flags.Credential)
65 gateway_user = Attribute("gatewayUser", "Gateway account username",
68 gateway = Attribute("gateway", "Hostname of the gateway machine",
71 host = Attribute("host", "Name of the physical machine",
74 #disk_image = Attribute("disk_image", "Specify a specific disk image for a node",
75 # flags = Flags.Design)
77 cls._register_attribute(username)
78 cls._register_attribute(identity)
79 cls._register_attribute(server_key)
80 cls._register_attribute(sfa_user)
81 cls._register_attribute(sfa_private_key)
82 cls._register_attribute(slicename)
83 cls._register_attribute(gateway_user)
84 cls._register_attribute(gateway)
85 cls._register_attribute(host)
86 #cls._register_attribute(disk_image)
88 def __init__(self, ec, guid):
89 super(WilabtSfaNode, self).__init__(ec, guid)
91 self._ecobj = weakref.ref(ec)
93 self._node_to_provision = None
94 self._slicenode = False
98 def _skip_provision(self):
99 sfa_user = self.get("sfauser")
107 Property to instanciate the SFA API based in sfi client.
108 For each SFA method called this instance is used.
111 sfa_user = self.get("sfauser")
112 sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
113 sfa_auth = '.'.join(sfa_user.split('.')[:2])
114 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
115 sfa_private_key = self.get("sfaPrivateKey")
118 _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
119 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
124 self._sfaapi = weakref.ref(_sfaapi)
126 return self._sfaapi()
128 def do_discover(self):
130 Based on the attributes defined by the user, discover the suitable
133 if self._skip_provision():
134 super(WilabtSfaNode, self).do_discover()
137 nodes = self.sfaapi.get_resources_hrn()
139 host = self._get_host()
141 # the user specified one particular node to be provisioned
143 host_hrn = nodes[host]
145 # check that the node is not blacklisted or being provisioned
147 if not self._blacklisted(host_hrn):
148 if not self._reserved(host_hrn):
149 if self._check_if_in_slice([host_hrn]):
150 self.debug("Node already in slice %s" % host_hrn)
151 self._slicenode = True
152 host = host + '.wilab2.ilabt.iminds.be'
153 self.set('host', host)
154 self._node_to_provision = host_hrn
155 super(WilabtSfaNode, self).do_discover()
157 def do_provision(self):
159 Add node to user's slice and verifing that the node is functioning
160 correctly. Check ssh, omf rc running, hostname, file system.
162 if self._skip_provision():
163 super(WilabtSfaNode, self).do_provision()
171 while not provision_ok:
172 node = self._node_to_provision
174 self._delete_from_slice()
175 self.debug("Waiting 300 sec for re-adding to slice")
176 time.sleep(300) # Timout for the testbed to allow a new reservation
177 self._add_node_to_slice(node)
179 while not self._check_if_in_slice([node]) and t < timeout \
180 and not self._ecobj().abort:
183 self.debug("Waiting 5 sec for resources to be added")
186 if not self._check_if_in_slice([node]):
187 self.debug("Couldn't add node %s to slice" % node)
188 self.fail_node_not_available(node)
191 ssh_ok = self._check_ssh_loop()
194 # the timeout was reach without establishing ssh connection
195 # the node is blacklisted, and a new
196 # node to provision is discovered
197 self._blacklist_node(node)
201 # check /proc directory is mounted (ssh_ok = True)
202 # file system is not read only, hostname is correct
203 # and omf_rc process is up
205 if not self._check_fs():
208 if not self._check_omfrc():
211 if not self._check_hostname():
217 if not self.get('host'):
218 self._set_host_attr(node)
219 self.info(" Node provisioned ")
221 super(WilabtSfaNode, self).do_provision()
224 if self.state == ResourceState.NEW:
225 self.info("Deploying w-iLab.t node")
228 super(WilabtSfaNode, self).do_deploy()
230 def do_release(self):
231 super(WilabtSfaNode, self).do_release()
232 if self.state == ResourceState.RELEASED and not self._skip_provision():
233 self.debug(" Releasing SFA API ")
234 self.sfaapi.release()
236 def _blacklisted(self, host_hrn):
238 Check in the SFA API that the node is not in the blacklist.
240 if self.sfaapi.blacklisted(host_hrn):
241 self.fail_node_not_available(host_hrn)
244 def _reserved(self, host_hrn):
246 Check in the SFA API that the node is not in the reserved
249 if self.sfaapi.reserved(host_hrn):
250 self.fail_node_not_available(host_hrn)
253 def _get_username(self):
255 Get the username for login in to the nodes from RSpec.
256 Wilabt username is not made out of any convention, it
257 has to be retrived from the manifest RSpec.
259 slicename = self.get("slicename")
260 if self._username is None:
261 slice_info = self.sfaapi.get_slice_resources(slicename)
262 username = slice_info['resource'][0]['services'][0]['login'][0]['username']
263 self.set('username', username)
264 self.debug("Retriving username information from RSpec %s" % username)
265 self._username = username
267 def _check_ssh_loop(self):
269 Check that the ssh login is possible. In wilabt is done
270 through the gateway because is private testbed.
275 while t < timeout and not ssh_ok:
276 cmd = 'echo \'GOOD NODE\''
277 ((out, err), proc) = self.execute(cmd)
278 if out.find("GOOD NODE") < 0:
279 self.debug( "No SSH connection, waiting 20s" )
284 self.debug( "SSH OK" )
291 Check file system, /proc well mounted.
293 cmd = 'mount |grep proc'
294 ((out, err), proc) = self.execute(cmd)
295 if out.find("/proc type proc") < 0:
296 self.warning(" Corrupted file system ")
297 self._blacklist_node(node)
301 def _check_omfrc(self):
303 Check that OMF 6 resource controller is running.
305 cmd = 'ps aux|grep omf'
306 ((out, err), proc) = self.execute(cmd)
307 if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
311 def _check_hostname(self):
313 Check that the hostname in the image is not set to localhost.
316 ((out, err), proc) = self.execute(cmd)
317 if 'localhost' in out.lower():
320 self.set('hostname', out.strip())
323 def execute(self, command,
329 connect_timeout = 30,
330 strict_host_checking = False,
334 """ Notice that this invocation will block until the
335 execution finishes. If this is not the desired behavior,
336 use 'run' instead."""
337 (out, err), proc = sshfuncs.rexec(
339 host = self.get("host"),
340 user = self.get("username"),
342 gwuser = self.get("gatewayUser"),
343 gw = self.get("gateway"),
346 identity = self.get("identity"),
347 server_key = self.get("serverKey"),
350 forward_x11 = forward_x11,
352 connect_timeout = connect_timeout,
353 persistent = persistent,
355 strict_host_checking = strict_host_checking
358 return (out, err), proc
361 def _add_node_to_slice(self, host_hrn):
363 Add node to slice, using SFA API. Actually Wilabt testbed
364 doesn't allow adding nodes, in fact in the API there is method
365 to group all the nodes instanciated as WilabtSfaNodes and the
366 Allocate and Provision is done with the last call at
367 sfaapi.add_resource_to_slice_batch.
369 self.info(" Adding node to slice ")
370 slicename = self.get("slicename")
371 #disk_image = self.get("disk_image")
372 #if disk_image is not None:
373 # properties = {'disk_image': disk_image}
374 #else: properties = None
376 self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn, properties=properties)
378 def _delete_from_slice(self):
380 Delete every node from slice, using SFA API.
381 Wilabt doesn't allow to remove one sliver so this method
382 remove every slice from the slice.
385 self.warning(" Deleting all slivers from slice ")
386 slicename = self.get("slicename")
387 self.sfaapi.remove_all_from_slice(slicename)
391 Get the attribute hostname.
393 host = self.get("host")
399 def _set_host_attr(self, node):
401 Query SFAAPI for the hostname of a certain host hrn and sets the
402 attribute hostname, it will over write the previous value.
404 hosts_hrn = self.sfaapi.get_resources_hrn()
405 for host, hrn in hosts_hrn.iteritems():
407 host = host + '.wilab2.ilabt.iminds.be'
408 self.set("host", host)
410 def _check_if_in_slice(self, hosts_hrn):
412 Check using SFA API if any host hrn from hosts_hrn is in the user's
415 slicename = self.get("slicename")
416 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
418 if len(slice_nodes[0]['services']) != 0:
419 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
420 else: slice_nodes_hrn = []
421 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
424 def _blacklist_node(self, host_hrn):
426 Add mal functioning node to blacklist (in SFA API).
428 self.warning(" Blacklisting malfunctioning node ")
429 self.sfaapi.blacklist_resource(host_hrn)
431 self.set('host', None)
433 self.set('host', host_hrn.split('.').pop())
435 def _put_node_in_provision(self, host_hrn):
437 Add node to the list of nodes being provisioned, in order for other RMs
438 to not try to provision the same one again.
440 self.sfaapi.reserve_resource(host_hrn)
442 def _get_ip(self, host):
444 Query cache for the IP of a node with certain hostname
447 ip = sshfuncs.gethostbyname(host)
449 # Fail while trying to find the IP
453 def fail_discovery(self):
454 msg = "Discovery failed. No candidates found for node"
456 raise RuntimeError, msg
458 def fail_node_not_alive(self, host=None):
459 msg = "Node %s not alive" % host
460 raise RuntimeError, msg
462 def fail_node_not_available(self, host):
463 msg = "Some nodes not available for provisioning"
464 raise RuntimeError, msg
466 def fail_not_enough_nodes(self):
467 msg = "Not enough nodes available for provisioning"
468 raise RuntimeError, msg
470 def fail_sfaapi(self):
471 msg = "Failing while trying to instanciate the SFA API."
472 raise RuntimeError, msg
474 def valid_connection(self, guid):