2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
28 from random import randint
37 class WilabtSfaNode(LinuxNode):
38 _rtype = "WilabtSfaNode"
39 _help = "Controls a Wilabt host accessible using a SSH key " \
40 "and provisioned using SFA"
44 def _register_attributes(cls):
46 sfa_user = Attribute("sfauser", "SFA user",
47 flags = Flags.Credential)
49 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50 used to generate the user credential",
51 flags = Flags.Credential)
53 slicename = Attribute("slicename", "SFA slice for the experiment",
54 flags = Flags.Credential)
56 gateway_user = Attribute("gatewayUser", "Gateway account username",
59 gateway = Attribute("gateway", "Hostname of the gateway machine",
62 cls._register_attribute(sfa_user)
63 cls._register_attribute(sfa_private_key)
64 cls._register_attribute(slicename)
65 cls._register_attribute(gateway_user)
66 cls._register_attribute(gateway)
68 def __init__(self, ec, guid):
69 super(WilabtSfaNode, self).__init__(ec, guid)
71 self._ecobj = weakref.ref(ec)
73 self._node_to_provision = None
74 self._slicenode = False
75 self._hostname = False
78 def _skip_provision(self):
79 sfa_user = self.get("sfauser")
87 Property to instanciate the SFA API based in sfi client.
88 For each SFA method called this instance is used.
91 sfa_user = self.get("sfauser")
92 sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
93 sfa_auth = '.'.join(sfa_user.split('.')[:2])
94 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
95 sfa_private_key = self.get("sfaPrivateKey")
98 _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
99 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
104 self._sfaapi = weakref.ref(_sfaapi)
106 return self._sfaapi()
108 def do_discover(self):
110 Based on the attributes defined by the user, discover the suitable
113 if self._skip_provision():
114 super(WilabtSfaNode, self).do_discover()
117 nodes = self.sfaapi.get_resources_hrn()
119 hostname = self._get_hostname()
121 # the user specified one particular node to be provisioned
122 self._hostname = True
123 host_hrn = nodes[hostname]
125 # check that the node is not blacklisted or being provisioned
127 if not self._blacklisted(host_hrn):
128 if not self._reserved(host_hrn):
129 if self._check_if_in_slice([host_hrn]):
130 self.debug("Node already in slice %s" % host_hrn)
131 self._slicenode = True
132 hostname = hostname + '.wilab2.ilabt.iminds.be'
133 self.set('hostname', hostname)
134 self._node_to_provision = host_hrn
135 super(WilabtSfaNode, self).do_discover()
137 def do_provision(self):
139 Add node to user's slice and verifing that the node is functioning
140 correctly. Check ssh, omf rc running, hostname, file system.
142 if self._skip_provision():
143 super(WilabtSfaNode, self).do_provision()
151 while not provision_ok:
152 node = self._node_to_provision
154 self._delete_from_slice()
155 self.debug("Waiting 300 seg for re-adding to slice")
156 time.sleep(300) # Timout for the testbed to allow a new reservation
157 self._add_node_to_slice(node)
159 while not self._check_if_in_slice([node]) and t < timeout \
160 and not self._ecobj().abort:
163 self.debug("Waiting 5 seg for resources to be added")
166 if not self._check_if_in_slice([node]):
167 self.debug("Couldn't add node %s to slice" % node)
168 self.fail_node_not_available(node)
171 ssh_ok = self._check_ssh_loop()
174 # the timeout was reach without establishing ssh connection
175 # the node is blacklisted, and a new
176 # node to provision is discovered
177 self._blacklist_node(node)
181 # check /proc directory is mounted (ssh_ok = True)
182 # file system is not read only, hostname is correct
183 # and omf_rc process is up
185 if not self._check_fs():
188 if not self._check_omf():
191 if not self._check_hostname():
197 if not self.get('hostname'):
198 self._set_hostname_attr(node)
199 self.info(" Node provisioned ")
201 super(WilabtSfaNode, self).do_provision()
203 def _blacklisted(self, host_hrn):
205 Check in the SFA API that the node is not in the blacklist.
207 if self.sfaapi.blacklisted(host_hrn):
208 self.fail_node_not_available(host_hrn)
211 def _reserved(self, host_hrn):
213 Check in the SFA API that the node is not in the reserved
216 if self.sfaapi.reserved(host_hrn):
217 self.fail_node_not_available(host_hrn)
220 def _get_username(self):
222 Get the username for login in to the nodes from RSpec.
223 Wilabt username is not made out of any convention, it
224 has to be retrived from the manifest RSpec.
226 slicename = self.get("slicename")
227 if self._username is None:
228 slice_info = self.sfaapi.get_slice_resources(slicename)
229 username = slice_info['resource'][0]['services'][0]['login'][0]['username']
230 self.set('username', username)
231 self.debug("Retriving username information from RSpec %s" % username)
232 self._username = username
234 def _check_ssh_loop(self):
236 Check that the ssh login is possible. In wilabt is done
237 through the gateway because is private testbed.
242 while t < timeout and not ssh_ok:
243 cmd = 'echo \'GOOD NODE\''
244 ((out, err), proc) = self.execute(cmd)
245 if out.find("GOOD NODE") < 0:
246 self.debug( "No SSH connection, waiting 60s" )
251 self.debug( "SSH OK" )
258 Check file system, /proc well mounted.
260 cmd = 'mount |grep proc'
261 ((out, err), proc) = self.execute(cmd)
262 if out.find("/proc type proc") < 0:
263 self.warning(" Corrupted file system ")
264 self._blacklist_node(node)
268 def _check_omfrc(self):
270 Check that OMF 6 resource controller is running.
272 cmd = 'ps aux|grep omf'
273 ((out, err), proc) = self.execute(cmd)
274 if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
278 def _check_hostname(self):
280 Check that the hostname in the image is not set to localhost.
283 ((out, err), proc) = self.execute(cmd)
284 if 'localhost' in out.lower():
288 def _add_node_to_slice(self, host_hrn):
290 Add node to slice, using SFA API. Actually Wilabt testbed
291 doesn't allow adding nodes, in fact in the API there is method
292 to group all the nodes instanciated as WilabtSfaNodes and the
293 Allocate and Provision is done with the last call at
294 sfaapi.add_resource_to_slice_batch.
296 self.info(" Adding node to slice ")
297 slicename = self.get("slicename")
298 self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn)
300 def _delete_from_slice(self):
302 Delete every node from slice, using SFA API.
303 Wilabt doesn't allow to remove one sliver so this method
304 remove every slice from the slice.
307 self.warning(" Deleting all slivers from slice ")
308 slicename = self.get("slicename")
309 self.sfaapi.remove_all_from_slice(slicename)
311 def _get_hostname(self):
313 Get the attribute hostname.
315 hostname = self.get("hostname")
321 def _set_hostname_attr(self, node):
323 Query SFAAPI for the hostname of a certain host hrn and sets the
324 attribute hostname, it will over write the previous value.
326 hosts_hrn = self.sfaapi.get_resources_hrn()
327 for hostname, hrn in hosts_hrn.iteritems():
329 hostname = hostname + '.wilab2.ilabt.iminds.be'
330 self.set("hostname", hostname)
332 def _check_if_in_slice(self, hosts_hrn):
334 Check using SFA API if any host hrn from hosts_hrn is in the user's
337 slicename = self.get("slicename")
338 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
340 if len(slice_nodes[0]['services']) != 0:
341 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
342 else: slice_nodes_hrn = []
343 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
346 def _do_ping(self, hostname):
348 Perform ping command on node's IP matching hostname.
351 guser = self.get("gatewayUser")
352 gw = self.get("gateway")
353 host = hostname + ".wilab2.ilabt.iminds.be"
354 command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
355 (out, err) = lexec(command)
356 m = re.search("(\d+)% packet loss", str(out))
357 if m and int(m.groups()[0]) < 50:
362 def _blacklist_node(self, host_hrn):
364 Add mal functioning node to blacklist (in SFA API).
366 self.warning(" Blacklisting malfunctioning node ")
367 self.sfaapi.blacklist_resource(host_hrn)
368 if not self._hostname:
369 self.set('hostname', None)
371 self.set('hostname', host_hrn.split('.').pop())
373 def _put_node_in_provision(self, host_hrn):
375 Add node to the list of nodes being provisioned, in order for other RMs
376 to not try to provision the same one again.
378 self.sfaapi.reserve_resource(host_hrn)
380 def _get_ip(self, hostname):
382 Query cache for the IP of a node with certain hostname
385 ip = sshfuncs.gethostbyname(hostname)
387 # Fail while trying to find the IP
391 def fail_discovery(self):
392 msg = "Discovery failed. No candidates found for node"
394 raise RuntimeError, msg
396 def fail_node_not_alive(self, hostname=None):
397 msg = "Node %s not alive" % hostname
398 raise RuntimeError, msg
400 def fail_node_not_available(self, hostname):
401 msg = "Node %s not available for provisioning" % hostname
402 raise RuntimeError, msg
404 def fail_not_enough_nodes(self):
405 msg = "Not enough nodes available for provisioning"
406 raise RuntimeError, msg
408 def fail_plapi(self):
409 msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
410 " attributes pluser and plpassword."
411 raise RuntimeError, msg
413 def valid_connection(self, guid):