Adding WilabtSfaNode
authorLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Thu, 22 May 2014 13:46:54 +0000 (15:46 +0200)
committerLucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
Thu, 22 May 2014 13:46:54 +0000 (15:46 +0200)
src/nepi/resources/omf/wilabt_node.py [new file with mode: 0644]

diff --git a/src/nepi/resources/omf/wilabt_node.py b/src/nepi/resources/omf/wilabt_node.py
new file mode 100644 (file)
index 0000000..c8807b4
--- /dev/null
@@ -0,0 +1,366 @@
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+        ResourceState, reschedule_delay 
+from nepi.resources.linux.node import LinuxNode
+from nepi.util.sfaapi import SFAAPIFactory 
+from nepi.util.execfuncs import lexec
+from nepi.util import sshfuncs
+
+from random import randint
+import time
+import re
+import weakref
+import socket
+import threading
+import datetime
+
+@clsinit_copy
+class WilabtSfaNode(LinuxNode):
+    _rtype = "WilabtSfaNode"
+    _help = "Controls a Wilabt host accessible using a SSH key " \
+            "and provisioned using SFA"
+    _backend = "omf"
+
+    @classmethod
+    def _register_attributes(cls):
+
+        sfa_user = Attribute("sfauser", "SFA user",
+                    flags = Flags.Credential)
+
+        sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
+                            used to generate the user credential",
+                            flags = Flags.Credential)
+
+        slicename = Attribute("slicename", "SFA slice for the experiment",
+                    flags = Flags.Credential)
+
+        gateway_user = Attribute("gatewayUser", "Gateway account username",
+                flags = Flags.Design)
+
+        gateway = Attribute("gateway", "Hostname of the gateway machine",
+                flags = Flags.Design)
+
+        cls._register_attribute(sfa_user)
+        cls._register_attribute(sfa_private_key)
+        cls._register_attribute(slicename)
+        cls._register_attribute(gateway_user)
+        cls._register_attribute(gateway)
+
+    def __init__(self, ec, guid):
+        super(WilabtSfaNode, self).__init__(ec, guid)
+
+        self._ecobj = weakref.ref(ec)
+        self._sfaapi = None
+        self._node_to_provision = None
+        self._slicenode = False
+        self._hostname = False
+        self._username = None
+
+    def _skip_provision(self):
+        sfa_user = self.get("sfauser")
+        if not sfa_user:
+            return True
+        else: return False
+    
+    @property
+    def sfaapi(self):
+        if not self._sfaapi:
+            sfa_user = self.get("sfauser")
+            sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
+            sfa_auth = '.'.join(sfa_user.split('.')[:2])
+            sfa_registry = "http://sfa3.planet-lab.eu:12345/"
+            sfa_private_key = self.get("sfaPrivateKey")
+            batch = True
+
+            _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
+                sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
+            
+            if not _sfaapi:
+                self.fail_sfaapi()
+
+            self._sfaapi = weakref.ref(_sfaapi)
+
+        return self._sfaapi()
+
+    def do_discover(self):
+        """
+        Based on the attributes defined by the user, discover the suitable 
+        nodes for provision.
+        """
+        if self._skip_provision():
+            super(WilabtSfaNode, self).do_discover()
+            return
+
+        nodes = self.sfaapi.get_resources_hrn()
+
+        hostname = self._get_hostname()
+        if hostname:
+            # the user specified one particular node to be provisioned
+            self._hostname = True
+            host_hrn = nodes[hostname]
+
+            # check that the node is not blacklisted or being provisioned
+            # by other RM
+            if not self._blacklisted(host_hrn):
+                if not self._reserved(host_hrn):
+                    if self._check_if_in_slice([host_hrn]):
+                        self.debug("Node already in slice %s" % host_hrn)
+                        self._slicenode = True
+                    hostname = hostname + '.wilab2.ilabt.iminds.be'
+                    self.set('hostname', hostname)
+                    self._node_to_provision = host_hrn
+                    super(WilabtSfaNode, self).do_discover()
+
+    def do_provision(self):
+        """
+        Add node to user's slice after verifing that the node is functioning
+        correctly.
+        """
+        if self._skip_provision():
+            super(WilabtSfaNode, self).do_provision()
+            return
+
+        provision_ok = False
+        ssh_ok = False
+        proc_ok = False
+        timeout = 300
+
+        while not provision_ok:
+            node = self._node_to_provision
+            if self._slicenode:
+                self._delete_from_slice()
+                self.debug("Waiting 300 seg for re-adding to slice")
+                time.sleep(300) # Timout for the testbed to allow a new reservation
+            self._add_node_to_slice(node)
+            t = 0
+            while not self._check_if_in_slice([node]) and t < timeout:
+                t = t + 5
+                time.sleep(t)
+                self.debug("Waiting 5 seg for resources to be added")
+                continue
+            self._get_username()
+            ssh_ok = self._check_ssh_loop()          
+
+            if not ssh_ok:
+                # the timeout was reach without establishing ssh connection
+                # the node is blacklisted, and a new
+                # node to provision is discovered
+                self._blacklist_node(node)
+                self.do_discover()
+                continue
+            
+            # check /proc directory is mounted (ssh_ok = True)
+            # file system is not read only, hostname is correct
+            # and omf_rc process is up
+            else:
+                if not self._check_fs():
+                    self.do_discover()
+                    continue
+                if not self._check_omf():
+                    self.do_discover()
+                    continue
+                if not self._check_hostname():
+                    self.do_discover()
+                    continue
+            
+                else:
+                    provision_ok = True
+                    if not self.get('hostname'):
+                        self._set_hostname_attr(node)            
+                    self.info(" Node provisioned ")            
+            
+        super(WilabtSfaNode, self).do_provision()
+
+    def _blacklisted(self, host_hrn):
+        if self.sfaapi.blacklisted(host_hrn):
+           self.fail_node_not_available(host_hrn)
+        return False
+
+    def _reserved(self, host_hrn):
+        if self.sfaapi.reserved(host_hrn):
+            self.fail_node_not_available(host_hrn)
+        return False
+
+    def _get_username(self):
+        slicename = self.get("slicename")
+        if self._username is None:
+            slice_info = self.sfaapi.get_slice_resources(slicename)
+            username = slice_info['resource'][0]['services'][0]['login'][0]['username']
+            self.set('username', username)
+            self.debug("Retriving username information from RSpec %s" % username)
+            self._username = username
+            
+    def _check_ssh_loop(self):
+        t = 0
+        timeout = 10
+        ssh_ok = False
+        while t < timeout and not ssh_ok:
+            cmd = 'echo \'GOOD NODE\''
+            ((out, err), proc) = self.execute(cmd)
+            if out.find("GOOD NODE") < 0:
+                self.debug( "No SSH connection, waiting 60s" )
+                t = t + 5
+                time.sleep(5)
+                continue
+            else:
+                self.debug( "SSH OK" )
+                ssh_ok = True
+                continue
+        return ssh_ok
+
+    def _check_fs(self):
+        cmd = 'mount |grep proc'
+        ((out, err), proc) = self.execute(cmd)
+        if out.find("/proc type proc") < 0:
+            self.warning(" Corrupted file system ")
+            self._blacklist_node(node)
+            return False
+        return True
+
+    def _check_omfrc(self):
+        cmd = 'ps aux|grep omf'
+        ((out, err), proc) = self.execute(cmd)
+        if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
+            return False
+        return True
+
+    def _check_hostname(self):
+        cmd = 'hostname'
+        ((out, err), proc) = self.execute(cmd)
+        if 'localhost' in out.lower():
+            return False
+        return True 
+
+    def _add_node_to_slice(self, host_hrn):
+        self.info(" Adding node to slice ")
+        slicename = self.get("slicename")
+        self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn)
+
+    def _delete_from_slice(self):
+        self.warning(" Deleting all slivers from slice ")
+        slicename = self.get("slicename")
+        self.sfaapi.remove_all_from_slice(slicename)
+
+    def _get_hostname(self):
+        hostname = self.get("hostname")
+        if hostname:
+            return hostname
+        else:
+            return None
+
+    def _set_hostname_attr(self, node):
+        """
+        Query SFAAPI for the hostname of a certain host hrn and sets the
+        attribute hostname, it will over write the previous value
+        """
+        hosts_hrn = self.sfaapi.get_resources_hrn()
+        for hostname, hrn  in hosts_hrn.iteritems():
+            if hrn == node:
+                hostname = hostname + '.wilab2.ilabt.iminds.be'
+                self.set("hostname", hostname)
+
+    def _check_if_in_slice(self, hosts_hrn):
+        """
+        Check using SFA API if any host hrn from hosts_hrn is in the user's
+        slice
+        """
+        slicename = self.get("slicename")
+        slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
+        if slice_nodes:
+            if len(slice_nodes[0]['services']) != 0:
+                slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
+        else: slice_nodes_hrn = []
+        nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
+        return nodes_inslice
+
+    def _do_ping(self, hostname):
+        """
+        Perform ping command on node's IP matching hostname
+        """
+        ping_ok = False
+        guser = self.get("gatewayUser")
+        gw = self.get("gateway")
+        host = hostname + ".wilab2.ilabt.iminds.be"
+        command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
+        (out, err) = lexec(command)
+        m = re.search("(\d+)% packet loss", str(out))
+        if m and int(m.groups()[0]) < 50:
+            ping_ok = True
+
+        return ping_ok
+
+    def _blacklist_node(self, host_hrn):
+        """
+        Add node mal functioning node to blacklist
+        """
+        self.warning(" Blacklisting malfunctioning node ")
+        self.sfaapi.blacklist_resource(host_hrn)
+        if not self._hostname:
+            self.set('hostname', None)
+        else:
+            self.set('hostname', host_hrn.split('.').pop())
+
+    def _put_node_in_provision(self, host_hrn):
+        """
+        Add node to the list of nodes being provisioned, in order for other RMs
+        to not try to provision the same one again
+        """
+        self.sfaapi.reserve_resource(host_hrn)
+
+    def _get_ip(self, hostname):
+        """
+        Query PLCAPI for the IP of a node with certain node id
+        """
+        try:
+            ip = sshfuncs.gethostbyname(hostname)
+        except:
+            # Fail while trying to find the IP
+            return None
+        return ip
+
+    def fail_discovery(self):
+        msg = "Discovery failed. No candidates found for node"
+        self.error(msg)
+        raise RuntimeError, msg
+
+    def fail_node_not_alive(self, hostname=None):
+        msg = "Node %s not alive" % hostname
+        raise RuntimeError, msg
+    
+    def fail_node_not_available(self, hostname):
+        msg = "Node %s not available for provisioning" % hostname
+        raise RuntimeError, msg
+
+    def fail_not_enough_nodes(self):
+        msg = "Not enough nodes available for provisioning"
+        raise RuntimeError, msg
+
+    def fail_plapi(self):
+        msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
+            " attributes pluser and plpassword."
+        raise RuntimeError, msg
+
+    def valid_connection(self, guid):
+        # TODO: Validate!
+        return True
+
+