2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.omf.node import OMFNode
24 from nepi.util.sfaapi import SFAAPIFactory
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
28 from random import randint
37 class WilabtSfaNode(OMFNode):
38 _rtype = "WilabtSfaNode"
39 _help = "Controls a Wilabt host accessible using a SSH key " \
40 "and provisioned using SFA"
44 def _register_attributes(cls):
46 username = Attribute("username", "Local account username",
47 flags = Flags.Credential)
49 identity = Attribute("identity", "SSH identity file",
50 flags = Flags.Credential)
52 server_key = Attribute("serverKey", "Server public key",
55 sfa_user = Attribute("sfauser", "SFA user",
56 flags = Flags.Credential)
58 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
59 used to generate the user credential",
60 flags = Flags.Credential)
62 slicename = Attribute("slicename", "SFA slice for the experiment",
63 flags = Flags.Credential)
65 gateway_user = Attribute("gatewayUser", "Gateway account username",
68 gateway = Attribute("gateway", "Hostname of the gateway machine",
71 hostxmpp = Attribute("hostxmpp", "Hostname from RSpec to use in xmpp messages",
74 disk_image = Attribute("disk_image", "Specify a specific disk image for a node",
77 cls._register_attribute(username)
78 cls._register_attribute(identity)
79 cls._register_attribute(server_key)
80 cls._register_attribute(sfa_user)
81 cls._register_attribute(sfa_private_key)
82 cls._register_attribute(slicename)
83 cls._register_attribute(gateway_user)
84 cls._register_attribute(gateway)
85 cls._register_attribute(hostxmpp)
86 cls._register_attribute(disk_image)
88 def __init__(self, ec, guid):
89 super(WilabtSfaNode, self).__init__(ec, guid)
91 self._ecobj = weakref.ref(ec)
93 self._node_to_provision = None
94 self._slicenode = False
95 self._hostname = False
98 def _skip_provision(self):
99 sfa_user = self.get("sfauser")
107 Property to instanciate the SFA API based in sfi client.
108 For each SFA method called this instance is used.
111 sfa_user = self.get("sfauser")
112 sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
113 sfa_auth = '.'.join(sfa_user.split('.')[:2])
114 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
115 sfa_private_key = self.get("sfaPrivateKey")
118 _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
119 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
124 self._sfaapi = weakref.ref(_sfaapi)
126 return self._sfaapi()
128 def do_discover(self):
130 Based on the attributes defined by the user, discover the suitable
133 if self._skip_provision():
134 super(WilabtSfaNode, self).do_discover()
137 nodes = self.sfaapi.get_resources_hrn()
139 hostname = self._get_hostname()
141 # the user specified one particular node to be provisioned
142 self._hostname = True
143 host_hrn = nodes[hostname]
145 # check that the node is not blacklisted or being provisioned
147 if not self._blacklisted(host_hrn):
148 if not self._reserved(host_hrn):
149 if self._check_if_in_slice([host_hrn]):
150 self.debug("Node already in slice %s" % host_hrn)
151 self._slicenode = True
152 hostname = hostname + '.wilab2.ilabt.iminds.be'
153 self.set('hostname', hostname)
154 self._node_to_provision = host_hrn
155 super(WilabtSfaNode, self).do_discover()
157 def do_provision(self):
159 Add node to user's slice and verifing that the node is functioning
160 correctly. Check ssh, omf rc running, hostname, file system.
162 if self._skip_provision():
163 super(WilabtSfaNode, self).do_provision()
171 while not provision_ok:
172 node = self._node_to_provision
174 self._delete_from_slice()
175 self.debug("Waiting 300 sec for re-adding to slice")
176 time.sleep(300) # Timout for the testbed to allow a new reservation
177 self._add_node_to_slice(node)
179 while not self._check_if_in_slice([node]) and t < timeout \
180 and not self._ecobj().abort:
183 self.debug("Waiting 5 sec for resources to be added")
186 if not self._check_if_in_slice([node]):
187 self.debug("Couldn't add node %s to slice" % node)
188 self.fail_node_not_available(node)
191 ssh_ok = self._check_ssh_loop()
194 # the timeout was reach without establishing ssh connection
195 # the node is blacklisted, and a new
196 # node to provision is discovered
197 self._blacklist_node(node)
201 # check /proc directory is mounted (ssh_ok = True)
202 # file system is not read only, hostname is correct
203 # and omf_rc process is up
205 if not self._check_fs():
208 if not self._check_omfrc():
211 if not self._check_hostname():
217 if not self.get('hostname'):
218 self._set_hostname_attr(node)
219 self.info(" Node provisioned ")
221 super(WilabtSfaNode, self).do_provision()
223 def do_release(self):
224 super(WilabtSfaNode, self).do_release()
225 if self.state == ResourceState.RELEASED and not self._skip_provision():
226 self.debug(" Releasing SFA API ")
227 self.sfaapi.release()
229 def _blacklisted(self, host_hrn):
231 Check in the SFA API that the node is not in the blacklist.
233 if self.sfaapi.blacklisted(host_hrn):
234 self.fail_node_not_available(host_hrn)
237 def _reserved(self, host_hrn):
239 Check in the SFA API that the node is not in the reserved
242 if self.sfaapi.reserved(host_hrn):
243 self.fail_node_not_available(host_hrn)
246 def _get_username(self):
248 Get the username for login in to the nodes from RSpec.
249 Wilabt username is not made out of any convention, it
250 has to be retrived from the manifest RSpec.
252 slicename = self.get("slicename")
253 if self._username is None:
254 slice_info = self.sfaapi.get_slice_resources(slicename)
255 username = slice_info['resource'][0]['services'][0]['login'][0]['username']
256 self.set('username', username)
257 self.debug("Retriving username information from RSpec %s" % username)
258 self._username = username
260 def _check_ssh_loop(self):
262 Check that the ssh login is possible. In wilabt is done
263 through the gateway because is private testbed.
268 while t < timeout and not ssh_ok:
269 cmd = 'echo \'GOOD NODE\''
270 ((out, err), proc) = self.execute(cmd)
271 if out.find("GOOD NODE") < 0:
272 self.debug( "No SSH connection, waiting 20s" )
277 self.debug( "SSH OK" )
284 Check file system, /proc well mounted.
286 cmd = 'mount |grep proc'
287 ((out, err), proc) = self.execute(cmd)
288 if out.find("/proc type proc") < 0:
289 self.warning(" Corrupted file system ")
290 self._blacklist_node(node)
294 def _check_omfrc(self):
296 Check that OMF 6 resource controller is running.
298 cmd = 'ps aux|grep omf'
299 ((out, err), proc) = self.execute(cmd)
300 if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
304 def _check_hostname(self):
306 Check that the hostname in the image is not set to localhost.
309 ((out, err), proc) = self.execute(cmd)
310 if 'localhost' in out.lower():
313 self.set('hostxmpp', out.strip())
316 def execute(self, command,
322 connect_timeout = 30,
323 strict_host_checking = False,
327 """ Notice that this invocation will block until the
328 execution finishes. If this is not the desired behavior,
329 use 'run' instead."""
330 (out, err), proc = sshfuncs.rexec(
332 host = self.get("hostname"),
333 user = self.get("username"),
335 gwuser = self.get("gatewayUser"),
336 gw = self.get("gateway"),
339 identity = self.get("identity"),
340 server_key = self.get("serverKey"),
343 forward_x11 = forward_x11,
345 connect_timeout = connect_timeout,
346 persistent = persistent,
348 strict_host_checking = strict_host_checking
351 return (out, err), proc
354 def _add_node_to_slice(self, host_hrn):
356 Add node to slice, using SFA API. Actually Wilabt testbed
357 doesn't allow adding nodes, in fact in the API there is method
358 to group all the nodes instanciated as WilabtSfaNodes and the
359 Allocate and Provision is done with the last call at
360 sfaapi.add_resource_to_slice_batch.
362 self.info(" Adding node to slice ")
363 slicename = self.get("slicename")
364 disk_image = self.get("disk_image")
365 if disk_image is not None:
366 properties = {'disk_image': disk_image}
367 else: properties = None
368 self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn, properties=properties)
370 def _delete_from_slice(self):
372 Delete every node from slice, using SFA API.
373 Wilabt doesn't allow to remove one sliver so this method
374 remove every slice from the slice.
377 self.warning(" Deleting all slivers from slice ")
378 slicename = self.get("slicename")
379 self.sfaapi.remove_all_from_slice(slicename)
381 def _get_hostname(self):
383 Get the attribute hostname.
385 hostname = self.get("hostname")
391 def _set_hostname_attr(self, node):
393 Query SFAAPI for the hostname of a certain host hrn and sets the
394 attribute hostname, it will over write the previous value.
396 hosts_hrn = self.sfaapi.get_resources_hrn()
397 for hostname, hrn in hosts_hrn.iteritems():
399 hostname = hostname + '.wilab2.ilabt.iminds.be'
400 self.set("hostname", hostname)
402 def _check_if_in_slice(self, hosts_hrn):
404 Check using SFA API if any host hrn from hosts_hrn is in the user's
407 slicename = self.get("slicename")
408 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
410 if len(slice_nodes[0]['services']) != 0:
411 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
412 else: slice_nodes_hrn = []
413 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
416 def _do_ping(self, hostname):
418 Perform ping command on node's IP matching hostname.
421 guser = self.get("gatewayUser")
422 gw = self.get("gateway")
423 host = hostname + ".wilab2.ilabt.iminds.be"
424 command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
425 (out, err) = lexec(command)
426 m = re.search("(\d+)% packet loss", str(out))
427 if m and int(m.groups()[0]) < 50:
432 def _blacklist_node(self, host_hrn):
434 Add mal functioning node to blacklist (in SFA API).
436 self.warning(" Blacklisting malfunctioning node ")
437 self.sfaapi.blacklist_resource(host_hrn)
438 if not self._hostname:
439 self.set('hostname', None)
441 self.set('hostname', host_hrn.split('.').pop())
443 def _put_node_in_provision(self, host_hrn):
445 Add node to the list of nodes being provisioned, in order for other RMs
446 to not try to provision the same one again.
448 self.sfaapi.reserve_resource(host_hrn)
450 def _get_ip(self, hostname):
452 Query cache for the IP of a node with certain hostname
455 ip = sshfuncs.gethostbyname(hostname)
457 # Fail while trying to find the IP
461 def fail_discovery(self):
462 msg = "Discovery failed. No candidates found for node"
464 raise RuntimeError, msg
466 def fail_node_not_alive(self, hostname=None):
467 msg = "Node %s not alive" % hostname
468 raise RuntimeError, msg
470 def fail_node_not_available(self, hostname):
471 msg = "Some nodes not available for provisioning"
472 raise RuntimeError, msg
474 def fail_not_enough_nodes(self):
475 msg = "Not enough nodes available for provisioning"
476 raise RuntimeError, msg
478 def fail_sfaapi(self):
479 msg = "Failing while trying to instanciate the SFA API."
480 raise RuntimeError, msg
482 def valid_connection(self, guid):