2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
28 from random import randint
37 class WilabtSfaNode(LinuxNode):
38 _rtype = "WilabtSfaNode"
39 _help = "Controls a Wilabt host accessible using a SSH key " \
40 "and provisioned using SFA"
44 def _register_attributes(cls):
46 sfa_user = Attribute("sfauser", "SFA user",
47 flags = Flags.Credential)
49 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50 used to generate the user credential",
51 flags = Flags.Credential)
53 slicename = Attribute("slicename", "SFA slice for the experiment",
54 flags = Flags.Credential)
56 gateway_user = Attribute("gatewayUser", "Gateway account username",
59 gateway = Attribute("gateway", "Hostname of the gateway machine",
62 cls._register_attribute(sfa_user)
63 cls._register_attribute(sfa_private_key)
64 cls._register_attribute(slicename)
65 cls._register_attribute(gateway_user)
66 cls._register_attribute(gateway)
68 def __init__(self, ec, guid):
69 super(WilabtSfaNode, self).__init__(ec, guid)
71 self._ecobj = weakref.ref(ec)
73 self._node_to_provision = None
74 self._slicenode = False
75 self._hostname = False
78 def _skip_provision(self):
79 sfa_user = self.get("sfauser")
87 sfa_user = self.get("sfauser")
88 sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
89 sfa_auth = '.'.join(sfa_user.split('.')[:2])
90 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
91 sfa_private_key = self.get("sfaPrivateKey")
94 _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
95 sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
100 self._sfaapi = weakref.ref(_sfaapi)
102 return self._sfaapi()
104 def do_discover(self):
106 Based on the attributes defined by the user, discover the suitable
109 if self._skip_provision():
110 super(WilabtSfaNode, self).do_discover()
113 nodes = self.sfaapi.get_resources_hrn()
115 hostname = self._get_hostname()
117 # the user specified one particular node to be provisioned
118 self._hostname = True
119 host_hrn = nodes[hostname]
121 # check that the node is not blacklisted or being provisioned
123 if not self._blacklisted(host_hrn):
124 if not self._reserved(host_hrn):
125 if self._check_if_in_slice([host_hrn]):
126 self.debug("Node already in slice %s" % host_hrn)
127 self._slicenode = True
128 hostname = hostname + '.wilab2.ilabt.iminds.be'
129 self.set('hostname', hostname)
130 self._node_to_provision = host_hrn
131 super(WilabtSfaNode, self).do_discover()
133 def do_provision(self):
135 Add node to user's slice after verifing that the node is functioning
138 if self._skip_provision():
139 super(WilabtSfaNode, self).do_provision()
147 while not provision_ok:
148 node = self._node_to_provision
150 self._delete_from_slice()
151 self.debug("Waiting 300 seg for re-adding to slice")
152 time.sleep(300) # Timout for the testbed to allow a new reservation
153 self._add_node_to_slice(node)
155 while not self._check_if_in_slice([node]) and t < timeout:
158 self.debug("Waiting 5 seg for resources to be added")
161 ssh_ok = self._check_ssh_loop()
164 # the timeout was reach without establishing ssh connection
165 # the node is blacklisted, and a new
166 # node to provision is discovered
167 self._blacklist_node(node)
171 # check /proc directory is mounted (ssh_ok = True)
172 # file system is not read only, hostname is correct
173 # and omf_rc process is up
175 if not self._check_fs():
178 if not self._check_omf():
181 if not self._check_hostname():
187 if not self.get('hostname'):
188 self._set_hostname_attr(node)
189 self.info(" Node provisioned ")
191 super(WilabtSfaNode, self).do_provision()
193 def _blacklisted(self, host_hrn):
194 if self.sfaapi.blacklisted(host_hrn):
195 self.fail_node_not_available(host_hrn)
198 def _reserved(self, host_hrn):
199 if self.sfaapi.reserved(host_hrn):
200 self.fail_node_not_available(host_hrn)
203 def _get_username(self):
204 slicename = self.get("slicename")
205 if self._username is None:
206 slice_info = self.sfaapi.get_slice_resources(slicename)
207 username = slice_info['resource'][0]['services'][0]['login'][0]['username']
208 self.set('username', username)
209 self.debug("Retriving username information from RSpec %s" % username)
210 self._username = username
212 def _check_ssh_loop(self):
216 while t < timeout and not ssh_ok:
217 cmd = 'echo \'GOOD NODE\''
218 ((out, err), proc) = self.execute(cmd)
219 if out.find("GOOD NODE") < 0:
220 self.debug( "No SSH connection, waiting 60s" )
225 self.debug( "SSH OK" )
231 cmd = 'mount |grep proc'
232 ((out, err), proc) = self.execute(cmd)
233 if out.find("/proc type proc") < 0:
234 self.warning(" Corrupted file system ")
235 self._blacklist_node(node)
239 def _check_omfrc(self):
240 cmd = 'ps aux|grep omf'
241 ((out, err), proc) = self.execute(cmd)
242 if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
246 def _check_hostname(self):
248 ((out, err), proc) = self.execute(cmd)
249 if 'localhost' in out.lower():
253 def _add_node_to_slice(self, host_hrn):
254 self.info(" Adding node to slice ")
255 slicename = self.get("slicename")
256 self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn)
258 def _delete_from_slice(self):
259 self.warning(" Deleting all slivers from slice ")
260 slicename = self.get("slicename")
261 self.sfaapi.remove_all_from_slice(slicename)
263 def _get_hostname(self):
264 hostname = self.get("hostname")
270 def _set_hostname_attr(self, node):
272 Query SFAAPI for the hostname of a certain host hrn and sets the
273 attribute hostname, it will over write the previous value
275 hosts_hrn = self.sfaapi.get_resources_hrn()
276 for hostname, hrn in hosts_hrn.iteritems():
278 hostname = hostname + '.wilab2.ilabt.iminds.be'
279 self.set("hostname", hostname)
281 def _check_if_in_slice(self, hosts_hrn):
283 Check using SFA API if any host hrn from hosts_hrn is in the user's
286 slicename = self.get("slicename")
287 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
289 if len(slice_nodes[0]['services']) != 0:
290 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
291 else: slice_nodes_hrn = []
292 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
295 def _do_ping(self, hostname):
297 Perform ping command on node's IP matching hostname
300 guser = self.get("gatewayUser")
301 gw = self.get("gateway")
302 host = hostname + ".wilab2.ilabt.iminds.be"
303 command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
304 (out, err) = lexec(command)
305 m = re.search("(\d+)% packet loss", str(out))
306 if m and int(m.groups()[0]) < 50:
311 def _blacklist_node(self, host_hrn):
313 Add node mal functioning node to blacklist
315 self.warning(" Blacklisting malfunctioning node ")
316 self.sfaapi.blacklist_resource(host_hrn)
317 if not self._hostname:
318 self.set('hostname', None)
320 self.set('hostname', host_hrn.split('.').pop())
322 def _put_node_in_provision(self, host_hrn):
324 Add node to the list of nodes being provisioned, in order for other RMs
325 to not try to provision the same one again
327 self.sfaapi.reserve_resource(host_hrn)
329 def _get_ip(self, hostname):
331 Query PLCAPI for the IP of a node with certain node id
334 ip = sshfuncs.gethostbyname(hostname)
336 # Fail while trying to find the IP
340 def fail_discovery(self):
341 msg = "Discovery failed. No candidates found for node"
343 raise RuntimeError, msg
345 def fail_node_not_alive(self, hostname=None):
346 msg = "Node %s not alive" % hostname
347 raise RuntimeError, msg
349 def fail_node_not_available(self, hostname):
350 msg = "Node %s not available for provisioning" % hostname
351 raise RuntimeError, msg
353 def fail_not_enough_nodes(self):
354 msg = "Not enough nodes available for provisioning"
355 raise RuntimeError, msg
357 def fail_plapi(self):
358 msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
359 " attributes pluser and plpassword."
360 raise RuntimeError, msg
362 def valid_connection(self, guid):