From: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr> Date: Thu, 22 May 2014 13:46:54 +0000 (+0200) Subject: Adding WilabtSfaNode X-Git-Tag: nepi-3.1.0~76 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=6dc930b2d410f9b333d69995ef11f8917dfbd547;p=nepi.git Adding WilabtSfaNode --- diff --git a/src/nepi/resources/omf/wilabt_node.py b/src/nepi/resources/omf/wilabt_node.py new file mode 100644 index 00000000..c8807b4a --- /dev/null +++ b/src/nepi/resources/omf/wilabt_node.py @@ -0,0 +1,366 @@ +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr> + +from nepi.execution.attribute import Attribute, Flags, Types +from nepi.execution.resource import ResourceManager, clsinit_copy, \ + ResourceState, reschedule_delay +from nepi.resources.linux.node import LinuxNode +from nepi.util.sfaapi import SFAAPIFactory +from nepi.util.execfuncs import lexec +from nepi.util import sshfuncs + +from random import randint +import time +import re +import weakref +import socket +import threading +import datetime + +@clsinit_copy +class WilabtSfaNode(LinuxNode): + _rtype = "WilabtSfaNode" + _help = "Controls a Wilabt host accessible using a SSH key " \ + "and provisioned using SFA" + _backend = "omf" + + @classmethod + def _register_attributes(cls): + + sfa_user = Attribute("sfauser", "SFA user", + flags = Flags.Credential) + + sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \ + used to generate the user credential", + flags = Flags.Credential) + + slicename = Attribute("slicename", "SFA slice for the experiment", + flags = Flags.Credential) + + gateway_user = Attribute("gatewayUser", "Gateway account username", + flags = Flags.Design) + + gateway = Attribute("gateway", "Hostname of the gateway machine", + flags = Flags.Design) + + cls._register_attribute(sfa_user) + cls._register_attribute(sfa_private_key) + cls._register_attribute(slicename) + cls._register_attribute(gateway_user) + cls._register_attribute(gateway) + + def __init__(self, ec, guid): + super(WilabtSfaNode, self).__init__(ec, guid) + + self._ecobj = weakref.ref(ec) + self._sfaapi = None + self._node_to_provision = None + self._slicenode = False + self._hostname = False + self._username = None + + def _skip_provision(self): + sfa_user = self.get("sfauser") + if not sfa_user: + return True + else: return False + + @property + def sfaapi(self): + if not self._sfaapi: + sfa_user = self.get("sfauser") + sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0" + sfa_auth = '.'.join(sfa_user.split('.')[:2]) + sfa_registry = "http://sfa3.planet-lab.eu:12345/" + sfa_private_key = self.get("sfaPrivateKey") + batch = True + + _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, + sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype) + + if not _sfaapi: + self.fail_sfaapi() + + self._sfaapi = weakref.ref(_sfaapi) + + return self._sfaapi() + + def do_discover(self): + """ + Based on the attributes defined by the user, discover the suitable + nodes for provision. + """ + if self._skip_provision(): + super(WilabtSfaNode, self).do_discover() + return + + nodes = self.sfaapi.get_resources_hrn() + + hostname = self._get_hostname() + if hostname: + # the user specified one particular node to be provisioned + self._hostname = True + host_hrn = nodes[hostname] + + # check that the node is not blacklisted or being provisioned + # by other RM + if not self._blacklisted(host_hrn): + if not self._reserved(host_hrn): + if self._check_if_in_slice([host_hrn]): + self.debug("Node already in slice %s" % host_hrn) + self._slicenode = True + hostname = hostname + '.wilab2.ilabt.iminds.be' + self.set('hostname', hostname) + self._node_to_provision = host_hrn + super(WilabtSfaNode, self).do_discover() + + def do_provision(self): + """ + Add node to user's slice after verifing that the node is functioning + correctly. + """ + if self._skip_provision(): + super(WilabtSfaNode, self).do_provision() + return + + provision_ok = False + ssh_ok = False + proc_ok = False + timeout = 300 + + while not provision_ok: + node = self._node_to_provision + if self._slicenode: + self._delete_from_slice() + self.debug("Waiting 300 seg for re-adding to slice") + time.sleep(300) # Timout for the testbed to allow a new reservation + self._add_node_to_slice(node) + t = 0 + while not self._check_if_in_slice([node]) and t < timeout: + t = t + 5 + time.sleep(t) + self.debug("Waiting 5 seg for resources to be added") + continue + self._get_username() + ssh_ok = self._check_ssh_loop() + + if not ssh_ok: + # the timeout was reach without establishing ssh connection + # the node is blacklisted, and a new + # node to provision is discovered + self._blacklist_node(node) + self.do_discover() + continue + + # check /proc directory is mounted (ssh_ok = True) + # file system is not read only, hostname is correct + # and omf_rc process is up + else: + if not self._check_fs(): + self.do_discover() + continue + if not self._check_omf(): + self.do_discover() + continue + if not self._check_hostname(): + self.do_discover() + continue + + else: + provision_ok = True + if not self.get('hostname'): + self._set_hostname_attr(node) + self.info(" Node provisioned ") + + super(WilabtSfaNode, self).do_provision() + + def _blacklisted(self, host_hrn): + if self.sfaapi.blacklisted(host_hrn): + self.fail_node_not_available(host_hrn) + return False + + def _reserved(self, host_hrn): + if self.sfaapi.reserved(host_hrn): + self.fail_node_not_available(host_hrn) + return False + + def _get_username(self): + slicename = self.get("slicename") + if self._username is None: + slice_info = self.sfaapi.get_slice_resources(slicename) + username = slice_info['resource'][0]['services'][0]['login'][0]['username'] + self.set('username', username) + self.debug("Retriving username information from RSpec %s" % username) + self._username = username + + def _check_ssh_loop(self): + t = 0 + timeout = 10 + ssh_ok = False + while t < timeout and not ssh_ok: + cmd = 'echo \'GOOD NODE\'' + ((out, err), proc) = self.execute(cmd) + if out.find("GOOD NODE") < 0: + self.debug( "No SSH connection, waiting 60s" ) + t = t + 5 + time.sleep(5) + continue + else: + self.debug( "SSH OK" ) + ssh_ok = True + continue + return ssh_ok + + def _check_fs(self): + cmd = 'mount |grep proc' + ((out, err), proc) = self.execute(cmd) + if out.find("/proc type proc") < 0: + self.warning(" Corrupted file system ") + self._blacklist_node(node) + return False + return True + + def _check_omfrc(self): + cmd = 'ps aux|grep omf' + ((out, err), proc) = self.execute(cmd) + if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0: + return False + return True + + def _check_hostname(self): + cmd = 'hostname' + ((out, err), proc) = self.execute(cmd) + if 'localhost' in out.lower(): + return False + return True + + def _add_node_to_slice(self, host_hrn): + self.info(" Adding node to slice ") + slicename = self.get("slicename") + self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn) + + def _delete_from_slice(self): + self.warning(" Deleting all slivers from slice ") + slicename = self.get("slicename") + self.sfaapi.remove_all_from_slice(slicename) + + def _get_hostname(self): + hostname = self.get("hostname") + if hostname: + return hostname + else: + return None + + def _set_hostname_attr(self, node): + """ + Query SFAAPI for the hostname of a certain host hrn and sets the + attribute hostname, it will over write the previous value + """ + hosts_hrn = self.sfaapi.get_resources_hrn() + for hostname, hrn in hosts_hrn.iteritems(): + if hrn == node: + hostname = hostname + '.wilab2.ilabt.iminds.be' + self.set("hostname", hostname) + + def _check_if_in_slice(self, hosts_hrn): + """ + Check using SFA API if any host hrn from hosts_hrn is in the user's + slice + """ + slicename = self.get("slicename") + slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource'] + if slice_nodes: + if len(slice_nodes[0]['services']) != 0: + slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values() + else: slice_nodes_hrn = [] + nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn)) + return nodes_inslice + + def _do_ping(self, hostname): + """ + Perform ping command on node's IP matching hostname + """ + ping_ok = False + guser = self.get("gatewayUser") + gw = self.get("gateway") + host = hostname + ".wilab2.ilabt.iminds.be" + command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host) + (out, err) = lexec(command) + m = re.search("(\d+)% packet loss", str(out)) + if m and int(m.groups()[0]) < 50: + ping_ok = True + + return ping_ok + + def _blacklist_node(self, host_hrn): + """ + Add node mal functioning node to blacklist + """ + self.warning(" Blacklisting malfunctioning node ") + self.sfaapi.blacklist_resource(host_hrn) + if not self._hostname: + self.set('hostname', None) + else: + self.set('hostname', host_hrn.split('.').pop()) + + def _put_node_in_provision(self, host_hrn): + """ + Add node to the list of nodes being provisioned, in order for other RMs + to not try to provision the same one again + """ + self.sfaapi.reserve_resource(host_hrn) + + def _get_ip(self, hostname): + """ + Query PLCAPI for the IP of a node with certain node id + """ + try: + ip = sshfuncs.gethostbyname(hostname) + except: + # Fail while trying to find the IP + return None + return ip + + def fail_discovery(self): + msg = "Discovery failed. No candidates found for node" + self.error(msg) + raise RuntimeError, msg + + def fail_node_not_alive(self, hostname=None): + msg = "Node %s not alive" % hostname + raise RuntimeError, msg + + def fail_node_not_available(self, hostname): + msg = "Node %s not available for provisioning" % hostname + raise RuntimeError, msg + + def fail_not_enough_nodes(self): + msg = "Not enough nodes available for provisioning" + raise RuntimeError, msg + + def fail_plapi(self): + msg = "Failing while trying to instanciate the PLC API.\nSet the" + \ + " attributes pluser and plpassword." + raise RuntimeError, msg + + def valid_connection(self, guid): + # TODO: Validate! + return True + +