From 87c178efc9b852bcbda3fac1e1159f842da17e48 Mon Sep 17 00:00:00 2001 From: Lucia Guevgeozian Odizzio Date: Wed, 16 Apr 2014 19:12:47 +0200 Subject: [PATCH] Adding persistent blacklist for PL node --- src/nepi/execution/attribute.py | 2 + src/nepi/execution/ec.py | 42 +++++++++++++ src/nepi/execution/resource.py | 30 +++++++++- src/nepi/resources/planetlab/node.py | 78 ++++++++++++++----------- src/nepi/resources/planetlab/plcapi.py | 81 +++++++++++++++++--------- 5 files changed, 169 insertions(+), 64 deletions(-) diff --git a/src/nepi/execution/attribute.py b/src/nepi/execution/attribute.py index bda72e2c..fa2b1043 100644 --- a/src/nepi/execution/attribute.py +++ b/src/nepi/execution/attribute.py @@ -52,6 +52,8 @@ class Flags: # transparent to the user) Reserved = 1 << 6 # 64 + # Attribute global is set to all resources of rtype + Global = 1 << 7 # 128 class Attribute(object): """ diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 1be5c3cf..44b95780 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -196,6 +196,16 @@ class ExperimentController(object): # EC state self._state = ECState.RUNNING + # Blacklist file for PL nodes + nepi_home = os.path.join(os.path.expanduser("~"), ".nepi") + plblacklist_file = os.path.join(nepi_home, "plblacklist.txt") + if not os.path.exists(plblacklist_file): + if os.path.isdir(nepi_home): + open(plblacklist_file, 'w').close() + else: + os.makedirs(nepi_home) + open(plblacklist_file, 'w').close() + # The runner is a pool of threads used to parallelize # execution of tasks nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) @@ -609,6 +619,38 @@ class ExperimentController(object): rm = self.get_resource(guid) rm.set(name, value) + def get_global(self, rtype, name): + """ Returns the value of the global attribute with name 'name' on the + RMs of rtype 'rtype'. + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :return: The value of the attribute with name 'name' + + """ + rclass = ResourceFactory.get_resource_type(rtype) + return rclass.get_global(name) + + def set_global(self, rtype, name, value): + """ Modifies the value of the global attribute with name 'name' on the + RMs of with rtype 'rtype'. + + :param guid: Guid of the RM + :type guid: int + + :param name: Name of the attribute + :type name: str + + :param value: Value of the attribute + + """ + rclass = ResourceFactory.get_resource_type(rtype) + return rclass.set_global(name, value) + def state(self, guid, hr = False): """ Returns the state of a resource diff --git a/src/nepi/execution/resource.py b/src/nepi/execution/resource.py index 5ad084da..d32a3dd9 100644 --- a/src/nepi/execution/resource.py +++ b/src/nepi/execution/resource.py @@ -186,7 +186,6 @@ class ResourceManager(Logger): attributes. """ - critical = Attribute("critical", "Defines whether the resource is critical. " "A failure on a critical resource will interrupt " @@ -289,6 +288,32 @@ class ResourceManager(Logger): """ return cls._backend + @classmethod + def get_global(cls, name): + """ Returns the value of a global attribute + Global attribute meaning an attribute for + all the resources from a rtype + + :param name: Name of the attribute + :type name: str + :rtype: str + """ + global_attr = cls._attributes[name] + return global_attr.value + + @classmethod + def set_global(cls, name, value): + """ Set value for a global attribute + + :param name: Name of the attribute + :type name: str + :param name: Value of the attribute + :type name: str + """ + global_attr = cls._attributes[name] + global_attr.value = value + return value + def __init__(self, ec, guid): super(ResourceManager, self).__init__(self.get_rtype()) @@ -564,6 +589,9 @@ class ResourceManager(Logger): :rtype: str """ attr = self._attrs[name] + if attr.has_flag(Flags.Global): + self.warning( "Attribute %s is global. Use get_global instead." % name) + return attr.value def has_changed(self, name): diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index d2461bcc..1f706a8d 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -27,10 +27,12 @@ from nepi.util.execfuncs import lexec from nepi.util import sshfuncs from random import randint +import re import time import socket import threading import datetime +import weakref @clsinit_copy class PlanetlabNode(LinuxNode): @@ -94,14 +96,6 @@ class PlanetlabNode(LinuxNode): "other"], flags = Flags.Filter) - #site = Attribute("site", "Constrain the PlanetLab site this node \ - # should reside on.", - # type = Types.Enumerate, - # allowed = ["PLE", - # "PLC", - # "PLJ"], - # flags = Flags.Filter) - min_reliability = Attribute("minReliability", "Constrain reliability \ while picking PlanetLab nodes. Specifies a lower \ acceptable bound.", @@ -169,21 +163,20 @@ class PlanetlabNode(LinuxNode): "year"], flags = Flags.Filter) -# plblacklist = Attribute("blacklist", "Take into account the file plblacklist \ -# in the user's home directory under .nepi directory. This file \ -# contains a list of PL nodes to blacklist, and at the end \ -# of the experiment execution the new blacklisted nodes are added.", -# type = Types.Bool, -# default = True, -# flags = Flags.ReadOnly) -# + plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \ + in the user's home directory under .nepi directory. This file \ + contains a list of PL nodes to blacklist, and at the end \ + of the experiment execution the new blacklisted nodes are added.", + type = Types.Bool, + default = False, + flags = Flags.Global) + cls._register_attribute(ip) cls._register_attribute(pl_url) cls._register_attribute(pl_ptn) cls._register_attribute(pl_user) cls._register_attribute(pl_password) - #cls._register_attribute(site) cls._register_attribute(city) cls._register_attribute(country) cls._register_attribute(region) @@ -198,10 +191,12 @@ class PlanetlabNode(LinuxNode): cls._register_attribute(min_cpu) cls._register_attribute(max_cpu) cls._register_attribute(timeframe) + cls._register_attribute(plblacklist) def __init__(self, ec, guid): super(PlanetlabNode, self).__init__(ec, guid) + self._ecobj = weakref.ref(ec) self._plapi = None self._node_to_provision = None self._slicenode = False @@ -225,14 +220,15 @@ class PlanetlabNode(LinuxNode): pl_pass = self.get("plpassword") pl_url = self.get("plcApiUrl") pl_ptn = self.get("plcApiPattern") - - self._plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url, - pl_ptn) + _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url, + pl_ptn, self._ecobj()) - if not self._plapi: + if not _plapi: self.fail_plapi() + + self._plapi = weakref.ref(_plapi) - return self._plapi + return self._plapi() def do_discover(self): """ @@ -318,14 +314,21 @@ class PlanetlabNode(LinuxNode): provision_ok = False ssh_ok = False proc_ok = False - timeout = 3600 + timeout = 1800 while not provision_ok: node = self._node_to_provision if not self._slicenode: self._add_node_to_slice(node) - self.info( " Node added to slice ") - + if self._check_if_in_slice([node]): + self.debug( "Node added to slice" ) + else: + self.warning(" Could not add to slice ") + with PlanetlabNode.lock: + self._blacklist_node(node) + self.do_discover() + continue + # check ssh connection t = 0 while t < timeout and not ssh_ok: @@ -333,12 +336,12 @@ class PlanetlabNode(LinuxNode): cmd = 'echo \'GOOD NODE\'' ((out, err), proc) = self.execute(cmd) if out.find("GOOD NODE") < 0: - self.info(" No SSH login, sleeping for 60 seconds ") + self.debug( "No SSH connection, waiting 60s" ) t = t + 60 time.sleep(60) continue else: - self.info(" SSH login OK ") + self.debug( "SSH OK" ) ssh_ok = True continue else: @@ -385,6 +388,12 @@ class PlanetlabNode(LinuxNode): super(PlanetlabNode, self).do_provision() + def do_release(self): + super(PlanetlabNode, self).do_release() + if self.state == ResourceState.RELEASED: + self.debug(" Releasing PLC API ") + self.plapi.release() + def _filter_based_on_attributes(self): """ Retrive the list of nodes ids that match user's constraints @@ -397,7 +406,6 @@ class PlanetlabNode(LinuxNode): 'region' : 'region', 'architecture' : 'arch', 'operatingSystem' : 'fcdistro', - #'site' : 'pldistro', 'minReliability' : 'reliability%s' % timeframe, 'maxReliability' : 'reliability%s' % timeframe, 'minBandwidth' : 'bw%s' % timeframe, @@ -555,6 +563,7 @@ class PlanetlabNode(LinuxNode): slicename = self.get("username") with PlanetlabNode.lock: slice_nodes = self.plapi.get_slice_nodes(slicename) + self.debug(" Previous slice nodes %s " % slice_nodes) slice_nodes.append(node_id) self.plapi.add_slice_nodes(slicename, slice_nodes) @@ -599,14 +608,13 @@ class PlanetlabNode(LinuxNode): """ ping_ok = False ip = self._get_ip(node_id) - if not ip: return ping_ok - - command = "ping -c4 %s" % ip + if ip: + command = "ping -c4 %s" % ip + (out, err) = lexec(command) - (out, err) = lexec(command) - if not str(out).find("2 received") < 0 or not str(out).find("3 received") < 0 or not \ - str(out).find("4 received") < 0: - ping_ok = True + m = re.search("(\d+)% packet loss", str(out)) + if m and int(m.groups()[0]) < 50: + ping_ok = True return ping_ok diff --git a/src/nepi/resources/planetlab/plcapi.py b/src/nepi/resources/planetlab/plcapi.py index 97c48462..4f4bf5cd 100644 --- a/src/nepi/resources/planetlab/plcapi.py +++ b/src/nepi/resources/planetlab/plcapi.py @@ -20,6 +20,7 @@ import functools import hashlib import socket +import os import time import threading import xmlrpclib @@ -135,16 +136,15 @@ class PLCAPI(object): _required_methods = set() - def __init__(self, username = None, password = None, session_key = None, - proxy = None, - hostname = "www.planet-lab.eu", - urlpattern = "https://%(hostname)s:443/PLCAPI/", + def __init__(self, username, password, hostname, urlpattern, ec, proxy, session_key = None, local_peer = "PLE"): self._blacklist = set() self._reserved = set() self._nodes_cache = None self._already_cached = False + self._ecobj = ec + self.count = 1 if session_key is not None: self.auth = dict(AuthMethod='session', session=session_key) @@ -175,7 +175,11 @@ class PLCAPI(object): self._proxy_transport = lambda : None self.threadlocal = threading.local() - + + # Load blacklist from file + if self._ecobj.get_global('PlanetlabNode', 'persist_blacklist'): + self._set_blacklist() + @property def api(self): # Cannot reuse same proxy in all threads, py2.7 is not threadsafe @@ -213,6 +217,16 @@ class PLCAPI(object): warnings.warn(str(e)) return True + + def _set_blacklist(self): + nepi_home = os.path.join(os.path.expanduser("~"), ".nepi") + plblacklist_file = os.path.join(nepi_home, "plblacklist.txt") + with open(plblacklist_file, 'r') as f: + hosts_tobl = f.read().splitlines() + if hosts_tobl: + nodes_id = self.get_nodes(hosts_tobl, ['node_id']) + for node_id in nodes_id: + self._blacklist.add(node_id['node_id']) @property def network_types(self): @@ -429,8 +443,8 @@ class PLCAPI(object): def get_slice_nodes(self, slicename): return self.get_slices(slicename, ['node_ids'])[0]['node_ids'] - def add_slice_nodes(self, slicename, nodes = None): - self.update_slice(slicename, nodes = nodes) + def add_slice_nodes(self, slicename, nodes): + self.update_slice(slicename, nodes=nodes) def get_node_info(self, node_id): self.start_multicall() @@ -459,24 +473,39 @@ class PLCAPI(object): else: return None - def blacklist_host(self, hostname): - self._blacklist.add(hostname) + def blacklist_host(self, node_id): + self._blacklist.add(node_id) def blacklisted(self): - return self._blacklist + return self._blacklist - def unblacklist_host(self, hostname): - del self._blacklist[hostname] + def unblacklist_host(self, node_id): + del self._blacklist[node_id] - def reserve_host(self, hostname): - self._reserved.add(hostname) + def reserve_host(self, node_id): + self._reserved.add(node_id) def reserved(self): return self._reserved - def unreserve_host(self, hostname): - del self._reserved[hostname] + def unreserve_host(self, node_id): + del self._reserved[node_id] + def release(self): + self.count -= 1 + if self._ecobj.get_global('PlanetlabNode', 'persist_blacklist') and self.count == 0: + if self._blacklist: + to_blacklist = list() + hostnames = self.get_nodes(list(self._blacklist), ['hostname']) + for hostname in hostnames: + to_blacklist.append(hostname['hostname']) + + nepi_home = os.path.join(os.path.expanduser("~"), ".nepi") + plblacklist_file = os.path.join(nepi_home, "plblacklist.txt") + + with open(plblacklist_file, 'w') as f: + for host in to_blacklist: + f.write("%s\n" % host) class PLCAPIFactory(object): """ @@ -492,8 +521,7 @@ class PLCAPIFactory(object): @classmethod def get_api(cls, pl_user, pl_pass, pl_host, - pl_ptn = "https://%(hostname)s:443/PLCAPI/", - proxy = None): + pl_ptn, ec, proxy = None): """ Get existing PLCAPI instance :param pl_user: Planelab user name (used for web login) @@ -512,14 +540,15 @@ class PLCAPIFactory(object): with cls._lock: api = cls._apis.get(key) if not api: - api = cls.create_api(pl_user, pl_pass, pl_host, pl_ptn, proxy) + api = cls.create_api(pl_user, pl_pass, pl_host, pl_ptn, ec, proxy) + else: + api.count += 1 return api return None @classmethod def create_api(cls, pl_user, pl_pass, pl_host, - pl_ptn = "https://%(hostname)s:443/PLCAPI/", - proxy = None): + pl_ptn, ec, proxy = None): """ Create an PLCAPI instance :param pl_user: Planelab user name (used for web login) @@ -533,13 +562,8 @@ class PLCAPIFactory(object): :param proxy: Proxy service url :type pl_ptn: str """ - api = PLCAPI( - username = pl_user, - password = pl_pass, - hostname = pl_host, - urlpattern = pl_ptn, - proxy = proxy - ) + api = PLCAPI(username = pl_user, password = pl_pass, hostname = pl_host, + urlpattern = pl_ptn, ec = ec, proxy = proxy) key = cls._make_key(pl_user, pl_host) cls._apis[key] = api return api @@ -555,3 +579,4 @@ class PLCAPIFactory(object): skey = "".join(map(str, args)) return hashlib.md5(skey).hexdigest() + -- 2.43.0