--- /dev/null
+#
+# NEPI, a framework to manage network experiments
+# Copyright (C) 2013 INRIA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from nepi.execution.attribute import Attribute, Flags, Types
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState, reschedule_delay
+from nepi.resources.linux.node import LinuxNode
+from nepi.util.sfaapi import SFAAPIFactory
+from nepi.util.execfuncs import lexec
+from nepi.util import sshfuncs
+
+from random import randint
+import time
+import re
+import weakref
+import socket
+import threading
+import datetime
+
+@clsinit_copy
+class WilabtSfaNode(LinuxNode):
+ _rtype = "WilabtSfaNode"
+ _help = "Controls a Wilabt host accessible using a SSH key " \
+ "and provisioned using SFA"
+ _backend = "omf"
+
+ @classmethod
+ def _register_attributes(cls):
+
+ sfa_user = Attribute("sfauser", "SFA user",
+ flags = Flags.Credential)
+
+ sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
+ used to generate the user credential",
+ flags = Flags.Credential)
+
+ slicename = Attribute("slicename", "SFA slice for the experiment",
+ flags = Flags.Credential)
+
+ gateway_user = Attribute("gatewayUser", "Gateway account username",
+ flags = Flags.Design)
+
+ gateway = Attribute("gateway", "Hostname of the gateway machine",
+ flags = Flags.Design)
+
+ cls._register_attribute(sfa_user)
+ cls._register_attribute(sfa_private_key)
+ cls._register_attribute(slicename)
+ cls._register_attribute(gateway_user)
+ cls._register_attribute(gateway)
+
+ def __init__(self, ec, guid):
+ super(WilabtSfaNode, self).__init__(ec, guid)
+
+ self._ecobj = weakref.ref(ec)
+ self._sfaapi = None
+ self._node_to_provision = None
+ self._slicenode = False
+ self._hostname = False
+ self._username = None
+
+ def _skip_provision(self):
+ sfa_user = self.get("sfauser")
+ if not sfa_user:
+ return True
+ else: return False
+
+ @property
+ def sfaapi(self):
+ if not self._sfaapi:
+ sfa_user = self.get("sfauser")
+ sfa_sm = "http://www.wilab2.ilabt.iminds.be:12369/protogeni/xmlrpc/am/3.0"
+ sfa_auth = '.'.join(sfa_user.split('.')[:2])
+ sfa_registry = "http://sfa3.planet-lab.eu:12345/"
+ sfa_private_key = self.get("sfaPrivateKey")
+ batch = True
+
+ _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
+ sfa_registry, sfa_sm, sfa_private_key, self._ecobj(), batch, WilabtSfaNode._rtype)
+
+ if not _sfaapi:
+ self.fail_sfaapi()
+
+ self._sfaapi = weakref.ref(_sfaapi)
+
+ return self._sfaapi()
+
+ def do_discover(self):
+ """
+ Based on the attributes defined by the user, discover the suitable
+ nodes for provision.
+ """
+ if self._skip_provision():
+ super(WilabtSfaNode, self).do_discover()
+ return
+
+ nodes = self.sfaapi.get_resources_hrn()
+
+ hostname = self._get_hostname()
+ if hostname:
+ # the user specified one particular node to be provisioned
+ self._hostname = True
+ host_hrn = nodes[hostname]
+
+ # check that the node is not blacklisted or being provisioned
+ # by other RM
+ if not self._blacklisted(host_hrn):
+ if not self._reserved(host_hrn):
+ if self._check_if_in_slice([host_hrn]):
+ self.debug("Node already in slice %s" % host_hrn)
+ self._slicenode = True
+ hostname = hostname + '.wilab2.ilabt.iminds.be'
+ self.set('hostname', hostname)
+ self._node_to_provision = host_hrn
+ super(WilabtSfaNode, self).do_discover()
+
+ def do_provision(self):
+ """
+ Add node to user's slice after verifing that the node is functioning
+ correctly.
+ """
+ if self._skip_provision():
+ super(WilabtSfaNode, self).do_provision()
+ return
+
+ provision_ok = False
+ ssh_ok = False
+ proc_ok = False
+ timeout = 300
+
+ while not provision_ok:
+ node = self._node_to_provision
+ if self._slicenode:
+ self._delete_from_slice()
+ self.debug("Waiting 300 seg for re-adding to slice")
+ time.sleep(300) # Timout for the testbed to allow a new reservation
+ self._add_node_to_slice(node)
+ t = 0
+ while not self._check_if_in_slice([node]) and t < timeout:
+ t = t + 5
+ time.sleep(t)
+ self.debug("Waiting 5 seg for resources to be added")
+ continue
+ self._get_username()
+ ssh_ok = self._check_ssh_loop()
+
+ if not ssh_ok:
+ # the timeout was reach without establishing ssh connection
+ # the node is blacklisted, and a new
+ # node to provision is discovered
+ self._blacklist_node(node)
+ self.do_discover()
+ continue
+
+ # check /proc directory is mounted (ssh_ok = True)
+ # file system is not read only, hostname is correct
+ # and omf_rc process is up
+ else:
+ if not self._check_fs():
+ self.do_discover()
+ continue
+ if not self._check_omf():
+ self.do_discover()
+ continue
+ if not self._check_hostname():
+ self.do_discover()
+ continue
+
+ else:
+ provision_ok = True
+ if not self.get('hostname'):
+ self._set_hostname_attr(node)
+ self.info(" Node provisioned ")
+
+ super(WilabtSfaNode, self).do_provision()
+
+ def _blacklisted(self, host_hrn):
+ if self.sfaapi.blacklisted(host_hrn):
+ self.fail_node_not_available(host_hrn)
+ return False
+
+ def _reserved(self, host_hrn):
+ if self.sfaapi.reserved(host_hrn):
+ self.fail_node_not_available(host_hrn)
+ return False
+
+ def _get_username(self):
+ slicename = self.get("slicename")
+ if self._username is None:
+ slice_info = self.sfaapi.get_slice_resources(slicename)
+ username = slice_info['resource'][0]['services'][0]['login'][0]['username']
+ self.set('username', username)
+ self.debug("Retriving username information from RSpec %s" % username)
+ self._username = username
+
+ def _check_ssh_loop(self):
+ t = 0
+ timeout = 10
+ ssh_ok = False
+ while t < timeout and not ssh_ok:
+ cmd = 'echo \'GOOD NODE\''
+ ((out, err), proc) = self.execute(cmd)
+ if out.find("GOOD NODE") < 0:
+ self.debug( "No SSH connection, waiting 60s" )
+ t = t + 5
+ time.sleep(5)
+ continue
+ else:
+ self.debug( "SSH OK" )
+ ssh_ok = True
+ continue
+ return ssh_ok
+
+ def _check_fs(self):
+ cmd = 'mount |grep proc'
+ ((out, err), proc) = self.execute(cmd)
+ if out.find("/proc type proc") < 0:
+ self.warning(" Corrupted file system ")
+ self._blacklist_node(node)
+ return False
+ return True
+
+ def _check_omfrc(self):
+ cmd = 'ps aux|grep omf'
+ ((out, err), proc) = self.execute(cmd)
+ if out.find("/usr/local/rvm/gems/ruby-1.9.3-p286@omf/bin/omf_rc") < 0:
+ return False
+ return True
+
+ def _check_hostname(self):
+ cmd = 'hostname'
+ ((out, err), proc) = self.execute(cmd)
+ if 'localhost' in out.lower():
+ return False
+ return True
+
+ def _add_node_to_slice(self, host_hrn):
+ self.info(" Adding node to slice ")
+ slicename = self.get("slicename")
+ self.sfaapi.add_resource_to_slice_batch(slicename, host_hrn)
+
+ def _delete_from_slice(self):
+ self.warning(" Deleting all slivers from slice ")
+ slicename = self.get("slicename")
+ self.sfaapi.remove_all_from_slice(slicename)
+
+ def _get_hostname(self):
+ hostname = self.get("hostname")
+ if hostname:
+ return hostname
+ else:
+ return None
+
+ def _set_hostname_attr(self, node):
+ """
+ Query SFAAPI for the hostname of a certain host hrn and sets the
+ attribute hostname, it will over write the previous value
+ """
+ hosts_hrn = self.sfaapi.get_resources_hrn()
+ for hostname, hrn in hosts_hrn.iteritems():
+ if hrn == node:
+ hostname = hostname + '.wilab2.ilabt.iminds.be'
+ self.set("hostname", hostname)
+
+ def _check_if_in_slice(self, hosts_hrn):
+ """
+ Check using SFA API if any host hrn from hosts_hrn is in the user's
+ slice
+ """
+ slicename = self.get("slicename")
+ slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
+ if slice_nodes:
+ if len(slice_nodes[0]['services']) != 0:
+ slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
+ else: slice_nodes_hrn = []
+ nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
+ return nodes_inslice
+
+ def _do_ping(self, hostname):
+ """
+ Perform ping command on node's IP matching hostname
+ """
+ ping_ok = False
+ guser = self.get("gatewayUser")
+ gw = self.get("gateway")
+ host = hostname + ".wilab2.ilabt.iminds.be"
+ command = "ssh %s@%s 'ping -c4 %s'" % (guser, gw, host)
+ (out, err) = lexec(command)
+ m = re.search("(\d+)% packet loss", str(out))
+ if m and int(m.groups()[0]) < 50:
+ ping_ok = True
+
+ return ping_ok
+
+ def _blacklist_node(self, host_hrn):
+ """
+ Add node mal functioning node to blacklist
+ """
+ self.warning(" Blacklisting malfunctioning node ")
+ self.sfaapi.blacklist_resource(host_hrn)
+ if not self._hostname:
+ self.set('hostname', None)
+ else:
+ self.set('hostname', host_hrn.split('.').pop())
+
+ def _put_node_in_provision(self, host_hrn):
+ """
+ Add node to the list of nodes being provisioned, in order for other RMs
+ to not try to provision the same one again
+ """
+ self.sfaapi.reserve_resource(host_hrn)
+
+ def _get_ip(self, hostname):
+ """
+ Query PLCAPI for the IP of a node with certain node id
+ """
+ try:
+ ip = sshfuncs.gethostbyname(hostname)
+ except:
+ # Fail while trying to find the IP
+ return None
+ return ip
+
+ def fail_discovery(self):
+ msg = "Discovery failed. No candidates found for node"
+ self.error(msg)
+ raise RuntimeError, msg
+
+ def fail_node_not_alive(self, hostname=None):
+ msg = "Node %s not alive" % hostname
+ raise RuntimeError, msg
+
+ def fail_node_not_available(self, hostname):
+ msg = "Node %s not available for provisioning" % hostname
+ raise RuntimeError, msg
+
+ def fail_not_enough_nodes(self):
+ msg = "Not enough nodes available for provisioning"
+ raise RuntimeError, msg
+
+ def fail_plapi(self):
+ msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
+ " attributes pluser and plpassword."
+ raise RuntimeError, msg
+
+ def valid_connection(self, guid):
+ # TODO: Validate!
+ return True
+
+