# Copyright (C) 2013 INRIA
#
# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation;
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
from nepi.execution.attribute import Attribute, Flags, Types
-from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
- reschedule_delay
+from nepi.execution.resource import ResourceManager, clsinit_copy, \
+ ResourceState
from nepi.resources.linux.node import LinuxNode
from nepi.resources.planetlab.plcapi import PLCAPIFactory
from nepi.util.execfuncs import lexec
+from nepi.util import sshfuncs
from random import randint
+import re
+import os
import time
+import socket
import threading
+import datetime
+import weakref
@clsinit_copy
class PlanetlabNode(LinuxNode):
- _rtype = "PlanetlabNode"
+ _rtype = "planetlab::Node"
_help = "Controls a PlanetLab host accessible using a SSH key " \
"associated to a PlanetLab user account"
- _backend = "planetlab"
+ _platform = "planetlab"
lock = threading.Lock()
@classmethod
def _register_attributes(cls):
ip = Attribute("ip", "PlanetLab host public IP address",
- flags = Flags.ReadOnly)
+ flags = Flags.Design)
pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
(e.g. www.planet-lab.eu or www.planet-lab.org) ",
pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
(e.g. https://%(hostname)s:443/PLCAPI/ ) ",
default = "https://%(hostname)s:443/PLCAPI/",
- flags = Flags.ExecReadOnly)
+ flags = Flags.Design)
pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
authenticate in the website) ",
"other"],
flags = Flags.Filter)
- #site = Attribute("site", "Constrain the PlanetLab site this node \
- # should reside on.",
- # type = Types.Enumerate,
- # allowed = ["PLE",
- # "PLC",
- # "PLJ"],
- # flags = Flags.Filter)
-
min_reliability = Attribute("minReliability", "Constrain reliability \
while picking PlanetLab nodes. Specifies a lower \
acceptable bound.",
"year"],
flags = Flags.Filter)
+ plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
+ in the user's home directory under .nepi directory. This file \
+ contains a list of PL nodes to blacklist, and at the end \
+ of the experiment execution the new blacklisted nodes are added.",
+ type = Types.Bool,
+ default = False,
+ flags = Flags.Global)
+
cls._register_attribute(ip)
cls._register_attribute(pl_url)
cls._register_attribute(pl_ptn)
cls._register_attribute(pl_user)
cls._register_attribute(pl_password)
- #cls._register_attribute(site)
cls._register_attribute(city)
cls._register_attribute(country)
cls._register_attribute(region)
cls._register_attribute(min_cpu)
cls._register_attribute(max_cpu)
cls._register_attribute(timeframe)
-
+ cls._register_attribute(plblacklist)
def __init__(self, ec, guid):
super(PlanetlabNode, self).__init__(ec, guid)
+ self._ecobj = weakref.ref(ec)
self._plapi = None
self._node_to_provision = None
+ self._slicenode = False
+ self._hostname = False
+
+ if self.get("gateway") or self.get("gatewayUser"):
+ self.set("gateway", None)
+ self.set("gatewayUser", None)
+
+ # Blacklist file
+ nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
+ plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
+ if not os.path.exists(plblacklist_file):
+ if os.path.isdir(nepi_home):
+ with open(plblacklist_file, 'w') as clear:
+ pass
+ else:
+ os.makedirs(nepi_home)
+ with open(plblacklist_file, 'w') as clear:
+ pass
+
+ def _skip_provision(self):
+ pl_user = self.get("pluser")
+ pl_pass = self.get("plpassword")
+ if not pl_user and not pl_pass:
+ return True
+ else: return False
@property
def plapi(self):
pl_pass = self.get("plpassword")
pl_url = self.get("plcApiUrl")
pl_ptn = self.get("plcApiPattern")
-
- self._plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
- pl_ptn)
+ _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
+ pl_ptn, self._ecobj())
- return self._plapi
+ if not _plapi:
+ self.fail_plapi()
+
+ self._plapi = weakref.ref(_plapi)
- def discover(self):
+ return self._plapi()
+
+ def do_discover(self):
"""
- Based on the attributes defined by the user, discover the suitable nodes
+ Based on the attributes defined by the user, discover the suitable
+ nodes for provision.
"""
- hostname = self.get("hostname")
+ if self._skip_provision():
+ super(PlanetlabNode, self).do_discover()
+ return
+
+ hostname = self._get_hostname()
if hostname:
# the user specified one particular node to be provisioned
- # check with PLCAPI if it is alvive
- node_id = self._query_if_alive(hostname=hostname)
- node_id = node_id.pop()
+ self._hostname = True
+ node_id = self._get_nodes_id({'hostname':hostname})
+ node_id = node_id.pop()['node_id']
# check that the node is not blacklisted or being provisioned
# by other RM
self._blacklist_node(node_id)
self.fail_node_not_alive(hostname)
else:
+ if self._check_if_in_slice([node_id]):
+ self._slicenode = True
self._put_node_in_provision(node_id)
self._node_to_provision = node_id
- super(PlanetlabNode, self).discover()
-
else:
self.fail_node_not_available(hostname)
+ super(PlanetlabNode, self).do_discover()
else:
# the user specifies constraints based on attributes, zero, one or
# more nodes can match these constraints
nodes = self._filter_based_on_attributes()
- nodes_alive = self._query_if_alive(nodes)
-
+
# nodes that are already part of user's slice have the priority to
# provisioned
- nodes_inslice = self._check_if_in_slice(nodes_alive)
- nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
+ nodes_inslice = self._check_if_in_slice(nodes)
+ nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
node_id = None
if nodes_inslice:
node_id = self._choose_random_node(nodes_inslice)
+ self._slicenode = True
if not node_id:
# Either there were no matching nodes in the user's slice, or
# the nodes in the slice were blacklisted or being provisioned
# by other RM. Note nodes_not_inslice is never empty
node_id = self._choose_random_node(nodes_not_inslice)
+ self._slicenode = False
if node_id:
self._node_to_provision = node_id
- super(PlanetlabNode, self).discover()
+ try:
+ self._set_hostname_attr(node_id)
+ self.info(" Selected node to provision ")
+ super(PlanetlabNode, self).do_discover()
+ except:
+ with PlanetlabNode.lock:
+ self._blacklist_node(node_id)
+ self.do_discover()
else:
self.fail_not_enough_nodes()
- def provision(self):
+ def do_provision(self):
"""
Add node to user's slice after verifing that the node is functioning
correctly
"""
+ if self._skip_provision():
+ super(PlanetlabNode, self).do_provision()
+ return
+
provision_ok = False
ssh_ok = False
proc_ok = False
- timeout = 1200
+ timeout = 1800
while not provision_ok:
node = self._node_to_provision
- # Adding try catch to set hostname because sometimes MyPLC fails
- # when trying to retrive node's hostname
- try:
- self._set_hostname_attr(node)
- except:
- self.discover()
- self._add_node_to_slice(node)
-
- # check ssh connection
- t = 0
- while t < timeout and not ssh_ok:
+ if not self._slicenode:
+ self._add_node_to_slice(node)
+ if self._check_if_in_slice([node]):
+ self.debug( "Node added to slice" )
+ else:
+ self.warning(" Could not add to slice ")
+ with PlanetlabNode.lock:
+ self._blacklist_node(node)
+ self.do_discover()
+ continue
+ # check ssh connection
+ t = 0
+ while t < timeout and not ssh_ok:
+
+ cmd = 'echo \'GOOD NODE\''
+ ((out, err), proc) = self.execute(cmd)
+ if out.find("GOOD NODE") < 0:
+ self.debug( "No SSH connection, waiting 60s" )
+ t = t + 60
+ time.sleep(60)
+ continue
+ else:
+ self.debug( "SSH OK" )
+ ssh_ok = True
+ continue
+ else:
cmd = 'echo \'GOOD NODE\''
((out, err), proc) = self.execute(cmd)
- if out.find("GOOD NODE") < 0:
- t = t + 60
- time.sleep(60)
- continue
- else:
+ if not out.find("GOOD NODE") < 0:
ssh_ok = True
- continue
if not ssh_ok:
# the timeout was reach without establishing ssh connection
# the node is blacklisted, deleted from the slice, and a new
# node to provision is discovered
with PlanetlabNode.lock:
+ self.warning(" Could not SSH login ")
self._blacklist_node(node)
- self._delete_node_from_slice(node)
- self.discover()
+ #self._delete_node_from_slice(node)
+ self.do_discover()
continue
# check /proc directory is mounted (ssh_ok = True)
+ # and file system is not read only
else:
cmd = 'mount |grep proc'
- ((out, err), proc) = self.execute(cmd)
- if out.find("/proc type proc") < 0:
+ ((out1, err1), proc1) = self.execute(cmd)
+ cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
+ ((out2, err2), proc2) = self.execute(cmd)
+ if out1.find("/proc type proc") < 0 or \
+ "Read-only file system".lower() in err2.lower():
with PlanetlabNode.lock:
+ self.warning(" Corrupted file system ")
self._blacklist_node(node)
- self._delete_node_from_slice(node)
- self.discover()
+ #self._delete_node_from_slice(node)
+ self.do_discover()
continue
else:
provision_ok = True
+ if not self.get('hostname'):
+ self._set_hostname_attr(node)
# set IP attribute
ip = self._get_ip(node)
self.set("ip", ip)
+ self.info(" Node provisioned ")
- super(PlanetlabNode, self).provision()
+ super(PlanetlabNode, self).do_provision()
+
+ def do_release(self):
+ super(PlanetlabNode, self).do_release()
+ if self.state == ResourceState.RELEASED and not self._skip_provision():
+ self.debug(" Releasing PLC API ")
+ self.plapi.release()
def _filter_based_on_attributes(self):
"""
'region' : 'region',
'architecture' : 'arch',
'operatingSystem' : 'fcdistro',
- #'site' : 'pldistro',
'minReliability' : 'reliability%s' % timeframe,
'maxReliability' : 'reliability%s' % timeframe,
'minBandwidth' : 'bw%s' % timeframe,
for attr_name, attr_obj in self._attrs.iteritems():
attr_value = self.get(attr_name)
- if attr_value is not None and attr_obj.flags == 8 and \
+ if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
attr_name != 'timeframe':
attr_tag = attr_to_tags[attr_name]
nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
if not filters:
- nodes = self.plapi.get_nodes()
+ nodes = self._get_nodes_id()
for node in nodes:
nodes_id.append(node['node_id'])
-
return nodes_id
-
def _filter_by_fixed_attr(self, filters, nodes_id):
"""
Query PLCAPI for nodes ids matching fixed attributes defined by the
range, by the user
"""
node_tags = self.plapi.get_node_tags(filters)
- if node_tags is not None:
+ if node_tags:
if len(nodes_id) == 0:
# first attribute being matched
return nodes_id
- def _query_if_alive(self, nodes_id=None, hostname=None):
- """
- Query PLCAPI for nodes that register activity recently, using filters
- related to the state of the node, e.g. last time it was contacted
- """
- if nodes_id is None and hostname is None:
- msg = "Specify nodes_id or hostname"
- raise RuntimeError, msg
-
- if nodes_id is not None and hostname is not None:
- msg = "Specify either nodes_id or hostname"
- raise RuntimeError, msg
-
- # define PL filters to check the node is alive
- filters = dict()
- filters['run_level'] = 'boot'
- filters['boot_state'] = 'boot'
- filters['node_type'] = 'regular'
- filters['>last_contact'] = int(time.time()) - 2*3600
-
- # adding node_id or hostname to the filters to check for the particular
- # node
- if nodes_id:
- filters['node_id'] = list(nodes_id)
- alive_nodes_id = self._get_nodes_id(filters)
- elif hostname:
- filters['hostname'] = hostname
- alive_nodes_id = self._get_nodes_id(filters)
-
- if len(alive_nodes_id) == 0:
- self.fail_discovery()
- else:
- nodes_id = list()
- for node_id in alive_nodes_id:
- nid = node_id['node_id']
- nodes_id.append(nid)
-
- return nodes_id
-
def _choose_random_node(self, nodes):
"""
From the possible nodes for provision, choose randomly to decrese the
if node_id not in blist and node_id not in plist:
ping_ok = self._do_ping(node_id)
if not ping_ok:
+ self._set_hostname_attr(node_id)
+ self.warning(" Node not responding PING ")
self._blacklist_node(node_id)
else:
# discovered node for provision, added to provision list
self._put_node_in_provision(node_id)
return node_id
- def _get_nodes_id(self, filters):
+ def _get_nodes_id(self, filters=None):
return self.plapi.get_nodes(filters, fields=['node_id'])
def _add_node_to_slice(self, node_id):
- self.info(" Selecting node ... ")
+ self.info(" Adding node to slice ")
slicename = self.get("username")
with PlanetlabNode.lock:
slice_nodes = self.plapi.get_slice_nodes(slicename)
+ self.debug(" Previous slice nodes %s " % slice_nodes)
slice_nodes.append(node_id)
self.plapi.add_slice_nodes(slicename, slice_nodes)
def _delete_node_from_slice(self, node):
- self.warn(" Deleting node from slice ")
+ self.warning(" Deleting node from slice ")
slicename = self.get("username")
self.plapi.delete_slice_node(slicename, [node])
+ def _get_hostname(self):
+ hostname = self.get("hostname")
+ if hostname:
+ return hostname
+ ip = self.get("ip")
+ if ip:
+ hostname = socket.gethostbyaddr(ip)[0]
+ self.set('hostname', hostname)
+ return hostname
+ else:
+ return None
+
def _set_hostname_attr(self, node):
"""
Query PLCAPI for the hostname of a certain node id and sets the
"""
ping_ok = False
ip = self._get_ip(node_id)
- command = "ping -c2 %s | echo \"PING OK\"" % ip
-
- (out, err) = lexec(command)
- if not out.find("PING OK") < 0:
- ping_ok = True
-
+ if ip:
+ command = "ping -c4 %s" % ip
+ (out, err) = lexec(command)
+
+ m = re.search("(\d+)% packet loss", str(out))
+ if m and int(m.groups()[0]) < 50:
+ ping_ok = True
+
return ping_ok
def _blacklist_node(self, node):
"""
Add node mal functioning node to blacklist
"""
- self.warn(" Blacklisting malfunctioning node ")
- self._plapi.blacklist_host(node)
+ self.warning(" Blacklisting malfunctioning node ")
+ self.plapi.blacklist_host(node)
+ if not self._hostname:
+ self.set('hostname', None)
def _put_node_in_provision(self, node):
"""
Add node to the list of nodes being provisioned, in order for other RMs
to not try to provision the same one again
"""
- self._plapi.reserve_host(node)
+ self.plapi.reserve_host(node)
def _get_ip(self, node_id):
"""
Query PLCAPI for the IP of a node with certain node id
"""
- ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
- ip = ip[0]['ip']
+ hostname = self.get("hostname") or \
+ self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname']
+ try:
+ ip = sshfuncs.gethostbyname(hostname)
+ except:
+ # Fail while trying to find the IP
+ return None
return ip
def fail_discovery(self):
- self.fail()
msg = "Discovery failed. No candidates found for node"
self.error(msg)
- raise RuntimeError, msg
+ raise RuntimeError(msg)
- def fail_node_not_alive(self, hostname):
- msg = "Node %s not alive, pick another node" % hostname
- raise RuntimeError, msg
+ def fail_node_not_alive(self, hostname=None):
+ msg = "Node %s not alive" % hostname
+ raise RuntimeError(msg)
def fail_node_not_available(self, hostname):
- msg = "Node %s not available for provisioning, pick another \
- node" % hostname
- raise RuntimeError, msg
+ msg = "Node %s not available for provisioning" % hostname
+ raise RuntimeError(msg)
def fail_not_enough_nodes(self):
msg = "Not enough nodes available for provisioning"
- raise RuntimeError, msg
+ raise RuntimeError(msg)
+
+ def fail_plapi(self):
+ msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
+ " attributes pluser and plpassword."
+ raise RuntimeError(msg)
def valid_connection(self, guid):
# TODO: Validate!