From c01d40579e6d77f12327072dec0fb82c41676bb0 Mon Sep 17 00:00:00 2001 From: Lucia Guevgeozian Odizzio Date: Thu, 3 Jul 2014 18:38:07 +0200 Subject: [PATCH] SFA for PL without specifing hostname --- examples/planetlab/ping_sfa.py | 18 ++- src/nepi/execution/ec.py | 13 +- src/nepi/resources/planetlab/node.py | 11 ++ src/nepi/resources/planetlab/sfa_node.py | 169 ++++++++++++----------- 4 files changed, 115 insertions(+), 96 deletions(-) diff --git a/examples/planetlab/ping_sfa.py b/examples/planetlab/ping_sfa.py index 9e9974d8..287e83b4 100755 --- a/examples/planetlab/ping_sfa.py +++ b/examples/planetlab/ping_sfa.py @@ -19,6 +19,7 @@ # Author: Lucia Guevgeozian from nepi.execution.ec import ExperimentController +from nepi.execution.resource import ResourceAction, ResourceState import os # Create the EC @@ -32,7 +33,7 @@ sfaPrivateKey = os.environ.get('SFA_PK') # server node1 = ec.register_resource("PlanetlabSfaNode") -ec.set(node1, "hostname", 'planetlab1.cs.vu.nl') +ec.set(node1, "hostname", 'planetlab3.xeno.cl.cam.ac.uk') ec.set(node1, "username", username) ec.set(node1, "sfauser", sfauser) ec.set(node1, "sfaPrivateKey", sfaPrivateKey) @@ -40,13 +41,17 @@ ec.set(node1, "cleanHome", True) ec.set(node1, "cleanProcesses", True) node2 = ec.register_resource("PlanetlabSfaNode") -ec.set(node2, "hostname", 'onelab1.info.ucl.ac.be') ec.set(node2, "username", username) ec.set(node2, "sfauser", sfauser) ec.set(node2, "sfaPrivateKey", sfaPrivateKey) ec.set(node2, "cleanHome", True) ec.set(node2, "cleanProcesses", True) +node3 = ec.register_resource("PlanetlabSfaNode") +ec.set(node3, "username", username) +ec.set(node3, "sfauser", sfauser) +ec.set(node3, "sfaPrivateKey", sfaPrivateKey) + app1 = ec.register_resource("LinuxApplication") command = "ping -c5 google.com" ec.set(app1, "command", command) @@ -57,11 +62,18 @@ command = "ping -c5 google.com" ec.set(app2, "command", command) ec.register_connection(app2, node2) +app3 = ec.register_resource("LinuxApplication") +command = "ping -c5 google.com" +ec.set(app3, "command", command) +ec.register_connection(app3, node3) + +ec.register_condition(node2, ResourceAction.DEPLOY, node1, ResourceState.PROVISIONED) +ec.register_condition(node3, ResourceAction.DEPLOY, node1, ResourceState.PROVISIONED) # Deploy ec.deploy() -ec.wait_finished([app1, app2]) +ec.wait_finished([app1, app2, app3]) ec.shutdown() diff --git a/src/nepi/execution/ec.py b/src/nepi/execution/ec.py index 5584c681..ad698932 100644 --- a/src/nepi/execution/ec.py +++ b/src/nepi/execution/ec.py @@ -201,19 +201,9 @@ class ExperimentController(object): # EC state self._state = ECState.RUNNING - # Blacklist file for PL nodes - nepi_home = os.path.join(os.path.expanduser("~"), ".nepi") - plblacklist_file = os.path.join(nepi_home, "plblacklist.txt") - if not os.path.exists(plblacklist_file): - if os.path.isdir(nepi_home): - open(plblacklist_file, 'w').close() - else: - os.makedirs(nepi_home) - open(plblacklist_file, 'w').close() - # The runner is a pool of threads used to parallelize # execution of tasks - nthreads = int(os.environ.get("NEPI_NTHREADS", "50")) + nthreads = int(os.environ.get("NEPI_NTHREADS", "3")) self._runner = ParallelRun(maxthreads = nthreads) # Event processing thread @@ -332,7 +322,6 @@ class ExperimentController(object): :type guids: list """ - if isinstance(guids, int): guids = [guids] diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index 7a8d5a72..7f07d057 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -28,6 +28,7 @@ from nepi.util import sshfuncs from random import randint import re +import os import time import socket import threading @@ -206,6 +207,16 @@ class PlanetlabNode(LinuxNode): self.set("gateway", None) self.set("gatewayUser", None) + # Blacklist file + nepi_home = os.path.join(os.path.expanduser("~"), ".nepi") + plblacklist_file = os.path.join(nepi_home, "plblacklist.txt") + if not os.path.exists(plblacklist_file): + if os.path.isdir(nepi_home): + open(plblacklist_file, 'w').close() + else: + os.makedirs(nepi_home) + open(plblacklist_file, 'w').close() + def _skip_provision(self): pl_user = self.get("pluser") pl_pass = self.get("plpassword") diff --git a/src/nepi/resources/planetlab/sfa_node.py b/src/nepi/resources/planetlab/sfa_node.py index d18d248a..a8bad103 100644 --- a/src/nepi/resources/planetlab/sfa_node.py +++ b/src/nepi/resources/planetlab/sfa_node.py @@ -27,6 +27,7 @@ from nepi.util import sshfuncs from random import randint import re +import os import weakref import time import socket @@ -184,6 +185,16 @@ class PlanetlabSfaNode(LinuxNode): self.set("gateway", None) self.set("gatewayUser", None) + # Blacklist file for PL nodes + nepi_home = os.path.join(os.path.expanduser("~"), ".nepi") + plblacklist_file = os.path.join(nepi_home, "plblacklist.txt") + if not os.path.exists(plblacklist_file): + if os.path.isdir(nepi_home): + open(plblacklist_file, 'w').close() + else: + os.makedirs(nepi_home) + open(plblacklist_file, 'w').close() + def _skip_provision(self): sfa_user = self.get("sfauser") if not sfa_user: @@ -232,61 +243,55 @@ class PlanetlabSfaNode(LinuxNode): # check that the node is not blacklisted or being provisioned # by other RM - if not self._blacklisted(host_hrn): - if not self._reserved(host_hrn): - # Node in reservation - ping_ok = self._do_ping(hostname) - if not ping_ok: - self._blacklist_node(host_hrn) - self.fail_node_not_alive(hostname) - else: - if self._check_if_in_slice([host_hrn]): - self.debug("The node %s is already in the slice" % hostname) - self._slicenode = True - self._node_to_provision = host_hrn - super(PlanetlabSfaNode, self).do_discover() - -# else: -# # the user specifies constraints based on attributes, zero, one or -# # more nodes can match these constraints -# nodes = self._filter_based_on_attributes() -# -# # nodes that are already part of user's slice have the priority to -# # provisioned -# nodes_inslice = self._check_if_in_slice(nodes) -# nodes_not_inslice = list(set(nodes) - set(nodes_inslice)) -# -# node_id = None -# if nodes_inslice: -# node_id = self._choose_random_node(nodes_inslice) -# self._slicenode = True -# -# if not node_id: -# # Either there were no matching nodes in the user's slice, or -# # the nodes in the slice were blacklisted or being provisioned -# # by other RM. Note nodes_not_inslice is never empty -# node_id = self._choose_random_node(nodes_not_inslice) -# self._slicenode = False -# -# if node_id: -# self._node_to_provision = node_id -# try: -# self._set_hostname_attr(node_id) -# self.info(" Selected node to provision ") -# super(PlanetlabSfaNode, self).do_discover() -# except: -# with PlanetlabSfaNode.lock: -# self._blacklist_node(node_id) -# self.do_discover() -# else: -# self.fail_not_enough_nodes() -# + if not self._blacklisted(host_hrn) and not self._reserved(host_hrn): + # Node in reservation + ping_ok = self._do_ping(hostname) + if not ping_ok: + self._blacklist_node(host_hrn) + self.fail_node_not_alive(hostname) + else: + if self._check_if_in_slice([host_hrn]): + self.debug("The node %s is already in the slice" % hostname) + self._slicenode = True + self._node_to_provision = host_hrn + else: + self.fail_node_not_available(hostname) + super(PlanetlabSfaNode, self).do_discover() + + else: + hosts_hrn = nodes.values() + nodes_inslice = self._check_if_in_slice(hosts_hrn) + nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice)) + host_hrn = None + if nodes_inslice: + host_hrn = self._choose_random_node(nodes, nodes_inslice) + self._slicenode = True + + if not host_hrn: + # Either there were no matching nodes in the user's slice, or + # the nodes in the slice were blacklisted or being provisioned + # by other RM. Note nodes_not_inslice is never empty + host_hrn = self._choose_random_node(nodes, nodes_not_inslice) + self._slicenode = False + + if host_hrn: + self._node_to_provision = host_hrn + try: + self._set_hostname_attr(host_hrn) + self.info(" Selected node to provision ") + super(PlanetlabSfaNode, self).do_discover() + except: + self._blacklist_node(host_hrn) + self.do_discover() + else: + self.fail_not_enough_nodes() + def _blacklisted(self, host_hrn): """ Check in the SFA API that the node is not in the blacklist. """ if self.sfaapi.blacklisted(host_hrn): - self.fail_node_not_available(host_hrn) + return True return False def _reserved(self, host_hrn): @@ -295,7 +300,7 @@ class PlanetlabSfaNode(LinuxNode): list. """ if self.sfaapi.reserved(host_hrn): - self.fail_node_not_available(host_hrn) + return True return False def do_provision(self): @@ -508,36 +513,38 @@ class PlanetlabSfaNode(LinuxNode): ## self.fail_discovery() ## ## return nodes_id -# -# def _choose_random_node(self, nodes): -# """ -# From the possible nodes for provision, choose randomly to decrese the -# probability of different RMs choosing the same node for provision -# """ -# size = len(nodes) -# while size: -# size = size - 1 -# index = randint(0, size) -# node_id = nodes[index] -# nodes[index] = nodes[size] -# -# # check the node is not blacklisted or being provision by other RM -# # and perform ping to check that is really alive -# with PlanetlabNode.lock: -# -# blist = self.plapi.blacklisted() -# plist = self.plapi.reserved() -# if node_id not in blist and node_id not in plist: -# ping_ok = self._do_ping(node_id) -# if not ping_ok: -# self._set_hostname_attr(node_id) -# self.warn(" Node not responding PING ") -# self._blacklist_node(node_id) -# else: -# # discovered node for provision, added to provision list -# self._put_node_in_provision(node_id) -# return node_id -# + + def _choose_random_node(self, nodes, hosts_hrn): + """ + From the possible nodes for provision, choose randomly to decrese the + probability of different RMs choosing the same node for provision + """ + size = len(hosts_hrn) + while size: + size = size - 1 + index = randint(0, size) + host_hrn = hosts_hrn[index] + hosts_hrn[index] = hosts_hrn[size] + + # check the node is not blacklisted or being provision by other RM + # and perform ping to check that is really alive + if not self._blacklisted(host_hrn): + if not self._reserved(host_hrn): + print self.sfaapi._reserved ,self.guid + for hostname, hrn in nodes.iteritems(): + if host_hrn == hrn: + print 'hostname' ,hostname + ping_ok = self._do_ping(hostname) + + if not ping_ok: + self._set_hostname_attr(hostname) + self.warning(" Node not responding PING ") + self._blacklist_node(host_hrn) + else: + # discovered node for provision, added to provision list + self._node_to_provision = host_hrn + return host_hrn + # def _get_nodes_id(self, filters=None): # return self.plapi.get_nodes(filters, fields=['node_id']) # -- 2.43.0