# # NEPI, a framework to manage network experiments # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Author: Lucia Guevgeozian from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ ResourceState, reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.util.sfaapi import SFAAPIFactory from nepi.util.execfuncs import lexec from nepi.util import sshfuncs from random import randint import time import re import weakref import socket import threading import datetime @clsinit_copy class WilabtSfaNode(LinuxNode): _rtype = "WilabtSfaNode" _help = "Controls a Wilabt host accessible using a SSH key " \ "and provisioned using SFA" _backend = "omf" @classmethod def _register_attributes(cls): sfa_user = Attribute("sfauser", "SFA user", flags = Flags.Credential) sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \ used to generate the user credential", flags = Flags.Credential) slicename = Attribute("slicename", "SFA slice for the experiment", flags = Flags.Credential) gateway_user = Attribute("gatewayUser", "Gateway account username", flags = Flags.Design) gateway = Attribute("gateway", "Hostname of the gateway machine", flags = Flags.Design) cls._register_attribute(sfa_user) cls._register_attribute(sfa_private_key) cls._register_attribute(slicename) cls._register_attribute(gateway_user) cls._register_attribute(gateway) def __init__(self, ec, guid): super(WilabtSfaNode, self).__init__(ec, guid) self._ecobj = weakref.ref(ec) self._sfaapi = None self._node_to_provision = None self._slicenode = False self._hostname = False self._username = None def _skip_provision(self): sfa_user = self.get("sfauser") if not sfa_user: return True else: return False @property def sfaapi(self): if not self._sfaapi: sfa_user = self.get("sfauser") sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0" sfa_auth = '.'.join(sfa_user.split('.')[:2]) sfa_registry = "http://sfa3.planet-lab.eu:12345/" sfa_private_key = self.get("sfaPrivateKey") batch = True _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype) if not _sfaapi: self.fail_sfaapi() self._sfaapi = weakref.ref(_sfaapi) return self._sfaapi() def do_discover(self): """ Based on the attributes defined by the user, discover the suitable nodes for provision. """ if self._skip_provision(): super(WilabtSfaNode, self).do_discover() return nodes = self.sfaapi.get_resources_hrn() hostname = self._get_hostname() if hostname: # the user specified one particular node to be provisioned self._hostname = True host_hrn = nodes[hostname] # check that the node is not blacklisted or being provisioned # by other RM if not self._blacklisted(host_hrn): if not self._reserved(host_hrn): if self._check_if_in_slice([host_hrn]): self.debug("Node already in slice %s" % host_hrn) self._slicenode = True hostname = hostname + '.wilab2.ilabt.iminds.be' self.set('hostname', hostname) self._node_to_provision = host_hrn super(WilabtSfaNode, self).do_discover() def do_provision(self): """ Add node to user's slice after verifing that the node is functioning correctly. """ if self._skip_provision(): super(WilabtSfaNode, self).do_provision() return provision_ok = False ssh_ok = False proc_ok = False timeout = 300 while not provision_ok: node = self._node_to_provision if self._slicenode: self._delete_from_slice() self.debug("Waiting 300 seg for re-adding to slice") time.sleep(300) # Timout for the testbed to allow a new reservation self._add_node_to_slice(node) t = 0 while not self._check_if_in_slice([node]) and t < timeout: t = t + 5 time.sleep(t) self.debug("Waiting 5 seg for resources to be added") continue self._get_username() ssh_ok = self._check_ssh_loop() if not ssh_ok: # the timeout was reach without establishing ssh connection # the node is blacklisted, and a new # node to provision is discovered self._blacklist_node(node) self.do_discover() continue # check /proc directory is mounted (ssh_ok = True) # file system is not read only, hostname is correct # and omf_rc process is up else: if not self._check_fs(): self.do_discover() continue if not self._check_omf(): self.do_discover() continue if not self._check_hostname(): self.do_discover() continue else: provision_ok = True if not self.get('hostname'): self._set_hostname_attr(node) self.info(" Node provisioned ") super(WilabtSfaNode, self).do_provision() def _blacklisted(self, host_hrn): if self.sfaapi.blacklisted(host_hrn): self.fail_node_not_available(host_hrn) return False def _reserved(self, host_hrn): if self.sfaapi.reserved(host_hrn): self.fail_node_not_available(host_hrn) return False def _get_username(self): slicename = self.get("slicename") if self._username is None: slice_info = self.sfaapi.get_slice_resources(slicename) username = slice_info['resource'][0]['services'][0]['login'][0]['username'] self.set('username', username) self.debug("Retriving username information from RSpec %s" % username) self._username = username def _check_ssh_loop(self): t = 0 timeout = 10 ssh_ok = False while t < timeout and not ssh_ok: cmd = 'echo \'GOOD NODE\'' ((out, err), proc) = self.execute(cmd) if out.find("GOOD NODE") < 0: self.debug( "No SSH connection, waiting 60s" ) t = t + 5 time.sleep(5) continue else: self.debug( "SSH OK" ) ssh_ok = True continue return ssh_ok def _check_fs(self): cmd = 'mount |grep proc' ((out, err), proc) = self.execute(cmd) if out.find("/proc type proc") < 0: self.warning(" Corrupted file system ") self._blacklist_node(node) return False return True def _check_omfrc(self): cmd = 'ps aux|grep omf' ((out, err), proc) = self.execute(cmd) if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0: return False return True def _check_hostname(self): cmd = 'hostname' ((out, err), proc) = self.execute(cmd) if 'localhost' in out.lower(): return False return True def _add_node_to_slice(self, host_hrn): self.info(" Adding node to slice ") slicename = self.get("slicename") self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn) def _delete_from_slice(self): self.warning(" Deleting all slivers from slice ") slicename = self.get("slicename") self.sfaapi.remove_all_from_slice(slicename) def _get_hostname(self): hostname = self.get("hostname") if hostname: return hostname else: return None def _set_hostname_attr(self, node): """ Query SFAAPI for the hostname of a certain host hrn and sets the attribute hostname, it will over write the previous value """ hosts_hrn = self.sfaapi.get_resources_hrn() for hostname, hrn in hosts_hrn.iteritems(): if hrn == node: hostname = hostname + '.wilab2.ilabt.iminds.be' self.set("hostname", hostname) def _check_if_in_slice(self, hosts_hrn): """ Check using SFA API if any host hrn from hosts_hrn is in the user's slice """ slicename = self.get("slicename") slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource'] if slice_nodes: if len(slice_nodes[0]['services']) != 0: slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values() else: slice_nodes_hrn = [] nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn)) return nodes_inslice def _do_ping(self, hostname): """ Perform ping command on node's IP matching hostname """ ping_ok = False guser = self.get("gatewayUser") gw = self.get("gateway") host = hostname + ".wilab2.ilabt.iminds.be" command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host) (out, err) = lexec(command) m = re.search("(\d+)% packet loss", str(out)) if m and int(m.groups()[0]) < 50: ping_ok = True return ping_ok def _blacklist_node(self, host_hrn): """ Add node mal functioning node to blacklist """ self.warning(" Blacklisting malfunctioning node ") self.sfaapi.blacklist_resource(host_hrn) if not self._hostname: self.set('hostname', None) else: self.set('hostname', host_hrn.split('.').pop()) def _put_node_in_provision(self, host_hrn): """ Add node to the list of nodes being provisioned, in order for other RMs to not try to provision the same one again """ self.sfaapi.reserve_resource(host_hrn) def _get_ip(self, hostname): """ Query PLCAPI for the IP of a node with certain node id """ try: ip = sshfuncs.gethostbyname(hostname) except: # Fail while trying to find the IP return None return ip def fail_discovery(self): msg = "Discovery failed. No candidates found for node" self.error(msg) raise RuntimeError, msg def fail_node_not_alive(self, hostname=None): msg = "Node %s not alive" % hostname raise RuntimeError, msg def fail_node_not_available(self, hostname): msg = "Node %s not available for provisioning" % hostname raise RuntimeError, msg def fail_not_enough_nodes(self): msg = "Not enough nodes available for provisioning" raise RuntimeError, msg def fail_plapi(self): msg = "Failing while trying to instanciate the PLC API.\nSet the" + \ " attributes pluser and plpassword." raise RuntimeError, msg def valid_connection(self, guid): # TODO: Validate! return True