From: Alina Quereilhac Date: Tue, 5 Nov 2013 10:53:40 +0000 (+0100) Subject: Merging heads X-Git-Tag: nepi-3.0.0~20 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=ea2531757263ee74a1bee54172932186faadd4e8;hp=5afea61e207061be0647ac4ea98f8c9683b03f24;p=nepi.git Merging heads --- diff --git a/examples/omf/nepi_omf_stdin_iminds.py b/examples/omf/nepi_omf_stdin_iminds.py index 1c6d91e7..df95a247 100644 --- a/examples/omf/nepi_omf_stdin_iminds.py +++ b/examples/omf/nepi_omf_stdin_iminds.py @@ -42,6 +42,8 @@ from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState from nepi.execution.ec import ExperimentController +import time + # Create the EC ec = ExperimentController() @@ -58,11 +60,11 @@ ec.set(node1, 'xmppPassword', "1234") # Create and Configure the Application app1 = ec.register_resource("OMFRobotApplication") ec.set(app1, 'appid', "robot") -ec.set(app1, 'path', "/users/jtribino/RobotCTRLComm.rb") -ec.set(app1, 'args', "/users/jtribino/coordinate.csv") +ec.set(app1, 'path', "/users/jtribino/RobotCTRLComm.rb") # /users/username/RobotCTRLComm.rb +ec.set(app1, 'args', "/users/jtribino/coordinate.csv") #/users/username/coordinate.csv ec.set(app1, 'env', " ") -ec.set(app1, 'sources', "/home/wlab18/Desktop/coordinate.csv") -ec.set(app1, 'sshUser', "jtribino") +ec.set(app1, 'sources', "/home/wlab18/Desktop/coordinate.csv") # local path +ec.set(app1, 'sshUser', "jtribino") # username # Connection ec.register_connection(app1, node1) @@ -85,5 +87,5 @@ ec.set(app1, 'stdin', "2;openlefteye") ec.wait_finished([app1]) - Stop Experiment +# Stop Experiment ec.shutdown() diff --git a/examples/planetlab/scalability.py b/examples/planetlab/scalability.py new file mode 100755 index 00000000..4a028b46 --- /dev/null +++ b/examples/planetlab/scalability.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# +# NEPI, a framework to manage network experiments +# Copyright (C) 2013 INRIA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# Author: Lucia Guevgeozian + +from nepi.execution.ec import ExperimentController +from nepi.execution.resource import ResourceAction, ResourceState + +import os +import time + +def create_node(ec, username, pl_user, pl_password, hostname=None, country=None, + operatingSystem=None, minBandwidth=None, minCpu=None): + + node = ec.register_resource("PlanetlabNode") + + if username: + ec.set(node, "username", username) + if pl_user: + ec.set(node, "pluser", pl_user) + if pl_password: + ec.set(node, "plpassword", pl_password) + + if hostname: + ec.set(node, "hostname", hostname) + if country: + ec.set(node, "country", country) + if operatingSystem: + ec.set(node, "operatingSystem", operatingSystem) + if minBandwidth: + ec.set(node, "minBandwidth", minBandwidth) + if minCpu: + ec.set(node, "minCpu", minCpu) + + ec.set(node, "cleanHome", True) + ec.set(node, "cleanProcesses", True) + + return node + +exp_id = "scalability_exp" + +# Create the entity Experiment Controller: +ec = ExperimentController(exp_id) + +# Register the nodes resources: + +# The username in this case is the slice name, the one to use for login in +# via ssh into PlanetLab nodes. Replace with your own slice name. +username = os.environ.get("PL_SLICE") + +# The pluser and plpassword are the ones used to login in the PlanetLab web +# site. Replace with your own user and password account information. +pl_user = os.environ.get("PL_USER") +pl_password = os.environ.get("PL_PASS") + +# Choose the PlanetLab nodes for the experiment, in this example 5 nodes are +# used, and they are picked according to different criterias. + +first_set_nodes = [] +second_set_nodes = [] +third_set_nodes = [] + +# First set of nodes will be defined by its hostname. +hostnames = ['planetlab2.utt.fr', +'planetlab-2.man.poznan.pl', +'planetlab-1.ing.unimo.it', +'gschembra3.diit.unict.it', +'planetlab2.ionio.gr', +'planetlab-1.imag.fr', +'node2pl.planet-lab.telecom-lille1.eu', +'planetlab1.xeno.cl.cam.ac.uk', +'planetlab3.hiit.fi', +'planetlab2.fri.uni-lj.si', +'planetlab1.informatik.uni-erlangen.de', +'node1pl.planet-lab.telecom-lille1.eu', +'planet2.servers.ua.pt', +'iraplab1.iralab.uni-karlsruhe.de', +'planetlab-node3.it-sudparis.eu', +'planet1.elte.hu', +'planet1.l3s.uni-hannover.de', +'planetlab1.fct.ualg.pt', +'host1.planetlab.informatik.tu-darmstadt.de', +'vicky.planetlab.ntua.gr', +'planetlab1.rutgers.edu'] + +for hostname in hostnames: + node = create_node(ec, username, pl_user, pl_password, hostname=hostname) + first_set_nodes.append(node) + +# Second set of nodes will be U.S.A. nodes. +country = "United States" +for i in range(15): + node = create_node(ec, username, pl_user, pl_password, country=country) + second_set_nodes.append(node) + +# Third set of nodes can be any node in Planetlab testbed. +for i in range(70): + node = create_node(ec, username, pl_user, pl_password) + third_set_nodes.append(node) + +# Register conditions + +# The nodes that are completely identified by their hostnames have to be provisioned +# before the rest of the nodes. This assures that no other resource will use the +# identified node even if the constraints matchs. +# In this example the nodes from the first need to be provisioned before the ones in +# the second and third group. Using the same logic, is convenient that nodes from the +# second group are provisioned before nodes from the third group. + +ec.register_condition(second_set_nodes, ResourceAction.DEPLOY, first_set_nodes, ResourceState.PROVISIONED) +ec.register_condition(third_set_nodes, ResourceAction.DEPLOY, second_set_nodes, ResourceState.PROVISIONED) + +# Deploy the experiment: +ec.deploy() + +# Wait until all nodes are provisioned: +ec.wait_deployed(first_set_nodes) +ec.wait_deployed(second_set_nodes) +ec.wait_deployed(third_set_nodes) + +# Do the experiment controller shutdown: +ec.shutdown() + +# END diff --git a/src/nepi/resources/omf/application.py b/src/nepi/resources/omf/application.py index 4a3f027f..75a277f1 100644 --- a/src/nepi/resources/omf/application.py +++ b/src/nepi/resources/omf/application.py @@ -25,6 +25,8 @@ from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource from nepi.resources.omf.node import OMFNode from nepi.resources.omf.omf_api import OMFAPIFactory +from nepi.util import sshfuncs + @clsinit_copy class OMFApplication(OMFResource): """ @@ -56,11 +58,20 @@ class OMFApplication(OMFResource): args = Attribute("args", "Argument of the application") env = Attribute("env", "Environnement variable of the application") stdin = Attribute("stdin", "Input of the application", default = "") + sources = Attribute("sources", "Sources of the application", + flags = Flags.ExecReadOnly) + sshuser = Attribute("sshUser", "user to connect with ssh", + flags = Flags.ExecReadOnly) + sshkey = Attribute("sshKey", "key to use for ssh", + flags = Flags.ExecReadOnly) cls._register_attribute(appid) cls._register_attribute(path) cls._register_attribute(args) cls._register_attribute(env) cls._register_attribute(stdin) + cls._register_attribute(sources) + cls._register_attribute(sshuser) + cls._register_attribute(sshkey) def __init__(self, ec, guid): """ @@ -142,15 +153,27 @@ class OMFApplication(OMFResource): It becomes DEPLOYED after getting the xmpp client. """ + self.set('xmppSlice',self.node.get('xmppSlice')) + self.set('xmppHost',self.node.get('xmppHost')) + self.set('xmppPort',self.node.get('xmppPort')) + self.set('xmppPassword',self.node.get('xmppPassword')) + + if not (self.get('xmppSlice') and self.get('xmppHost') + and self.get('xmppPort') and self.get('xmppPassword')): + msg = "Credentials are not initialzed. XMPP Connections impossible" + self.error(msg) + raise RuntimeError, msg + if not self._omf_api : self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), self.get('xmppHost'), self.get('xmppPort'), self.get('xmppPassword'), exp_id = self.exp_id) - if not self._omf_api : - msg = "Credentials are not initialzed. XMPP Connections impossible" - self.error(msg) - raise RuntimeError, msg + if self.get('sources'): + gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')] + user = self.get('sshUser') or self.get('xmppSlice') + dst = user + "@"+ gateway + ":" + (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst) super(OMFApplication, self).do_deploy() diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index c8039a1c..c25f2d7a 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -20,7 +20,7 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay + ResourceState, reschedule_delay from nepi.resources.linux.node import LinuxNode from nepi.resources.planetlab.plcapi import PLCAPIFactory from nepi.util.execfuncs import lexec @@ -29,6 +29,7 @@ from nepi.util import sshfuncs from random import randint import time import threading +import datetime @clsinit_copy class PlanetlabNode(LinuxNode): @@ -37,8 +38,6 @@ class PlanetlabNode(LinuxNode): "associated to a PlanetLab user account" _backend = "planetlab" - ## XXX A.Q. This lock could use a more descriptive name and - # an explanatory comment lock = threading.Lock() @classmethod @@ -195,6 +194,7 @@ class PlanetlabNode(LinuxNode): self._plapi = None self._node_to_provision = None + self._slicenode = False @property def plapi(self): @@ -207,28 +207,28 @@ class PlanetlabNode(LinuxNode): self._plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url, pl_ptn) + if not self._plapi: + self.fail_plapi() + return self._plapi def do_discover(self): """ - Based on the attributes defined by the user, discover the suitable nodes + Based on the attributes defined by the user, discover the suitable + nodes for provision. """ hostname = self._get_hostname() - print self.guid, hostname if hostname: # the user specified one particular node to be provisioned # check with PLCAPI if it is alvive node_id = self._query_if_alive(hostname=hostname) node_id = node_id.pop() - print self.guid, node_id # check that the node is not blacklisted or being provisioned # by other RM with PlanetlabNode.lock: plist = self.plapi.reserved() blist = self.plapi.blacklisted() - print self.guid,plist - print self.guid,blist if node_id not in blist and node_id not in plist: # check that is really alive, by performing ping @@ -237,6 +237,8 @@ class PlanetlabNode(LinuxNode): self._blacklist_node(node_id) self.fail_node_not_alive(hostname) else: + if self._check_if_in_slice([node_id]): + self._slicenode = True self._put_node_in_provision(node_id) self._node_to_provision = node_id super(PlanetlabNode, self).do_discover() @@ -258,15 +260,25 @@ class PlanetlabNode(LinuxNode): node_id = None if nodes_inslice: node_id = self._choose_random_node(nodes_inslice) + self._slicenode = True if not node_id: # Either there were no matching nodes in the user's slice, or # the nodes in the slice were blacklisted or being provisioned # by other RM. Note nodes_not_inslice is never empty node_id = self._choose_random_node(nodes_not_inslice) + self._slicenode = False if node_id: self._node_to_provision = node_id + try: + self._set_hostname_attr(node_id) + self.info(" Selected node to provision ") + except: + with PlanetlabNode.lock: + self._blacklist_node(node_id) + self.do_discover() + super(PlanetlabNode, self).do_discover() else: self.fail_not_enough_nodes() @@ -279,43 +291,40 @@ class PlanetlabNode(LinuxNode): provision_ok = False ssh_ok = False proc_ok = False - timeout = 120 + timeout = 1800 while not provision_ok: node = self._node_to_provision - # Adding try catch to set hostname because sometimes MyPLC fails - # when trying to retrive node's hostname - try: - self._set_hostname_attr(node) - except: - with PlanetlabNode.lock: - self._blacklist_node(node) - self.do_discover() - continue - - self._add_node_to_slice(node) + if not self._slicenode: + self._add_node_to_slice(node) - # check ssh connection - t = 0 - while t < timeout and not ssh_ok: - + # check ssh connection + t = 0 + while t < timeout and not ssh_ok: + + cmd = 'echo \'GOOD NODE\'' + ((out, err), proc) = self.execute(cmd) + if out.find("GOOD NODE") < 0: + t = t + 60 + time.sleep(60) + continue + else: + ssh_ok = True + continue + else: cmd = 'echo \'GOOD NODE\'' ((out, err), proc) = self.execute(cmd) - if out.find("GOOD NODE") < 0: - t = t + 60 - time.sleep(60) - continue - else: + if not out.find("GOOD NODE") < 0: ssh_ok = True - continue if not ssh_ok: # the timeout was reach without establishing ssh connection # the node is blacklisted, deleted from the slice, and a new # node to provision is discovered with PlanetlabNode.lock: + self.warn(" Could not SSH login ") self._blacklist_node(node) - self._delete_node_from_slice(node) + #self._delete_node_from_slice(node) self.set('hostname', None) self.do_discover() continue @@ -326,8 +335,9 @@ class PlanetlabNode(LinuxNode): ((out, err), proc) = self.execute(cmd) if out.find("/proc type proc") < 0: with PlanetlabNode.lock: + self.warn(" Could not find directory /proc ") self._blacklist_node(node) - self._delete_node_from_slice(node) + #self._delete_node_from_slice(node) self.set('hostname', None) self.do_discover() continue @@ -344,7 +354,6 @@ class PlanetlabNode(LinuxNode): """ Retrive the list of nodes ids that match user's constraints """ - # Map user's defined attributes with tagnames of PlanetLab timeframe = self.get("timeframe")[0] attr_to_tags = { @@ -389,7 +398,7 @@ class PlanetlabNode(LinuxNode): nodes = self.plapi.get_nodes() for node in nodes: nodes_id.append(node['node_id']) - + return nodes_id @@ -516,7 +525,7 @@ class PlanetlabNode(LinuxNode): return nodes_id def _choose_random_node(self, nodes): - """ + """ From the possible nodes for provision, choose randomly to decrese the probability of different RMs choosing the same node for provision """ @@ -535,20 +544,21 @@ class PlanetlabNode(LinuxNode): plist = self.plapi.reserved() if node_id not in blist and node_id not in plist: ping_ok = self._do_ping(node_id) - print " ### ping_ok #### %s guid %s" % (ping_ok, self.guid) if not ping_ok: + self._set_hostname_attr(node_id) + self.warn(" Node not responding PING ") self._blacklist_node(node_id) + self.set('hostname', None) else: # discovered node for provision, added to provision list self._put_node_in_provision(node_id) - print "node_id %s , guid %s" % (node_id, self.guid) return node_id def _get_nodes_id(self, filters): return self.plapi.get_nodes(filters, fields=['node_id']) def _add_node_to_slice(self, node_id): - self.info(" Selected node to provision ") + self.info(" Adding node to slice ") slicename = self.get("username") with PlanetlabNode.lock: slice_nodes = self.plapi.get_slice_nodes(slicename) @@ -595,17 +605,14 @@ class PlanetlabNode(LinuxNode): """ ping_ok = False ip = self._get_ip(node_id) - print "ip de do_ping %s, guid %s" % (ip, self.guid) if not ip: return ping_ok command = "ping -c2 %s" % ip (out, err) = lexec(command) - print "out de do_ping %s, guid %s" % (out, self.guid) if not out.find("2 received") < 0: ping_ok = True - print "ping_ok de do_ping %s, guid %s" % (ping_ok, self.guid) return ping_ok def _blacklist_node(self, node): @@ -627,30 +634,39 @@ class PlanetlabNode(LinuxNode): Query PLCAPI for the IP of a node with certain node id """ hostname = self.plapi.get_nodes(node_id, ['hostname'])[0] - print "#### HOSTNAME ##### %s ### guid %s " % (hostname['hostname'], self.guid) - ip = sshfuncs.gethostbyname(hostname['hostname']) - if not ip: + try: + ip = sshfuncs.gethostbyname(hostname['hostname']) + except: # Fail while trying to find the IP return None return ip def fail_discovery(self): + self.fail() msg = "Discovery failed. No candidates found for node" self.error(msg) raise RuntimeError, msg def fail_node_not_alive(self, hostname=None): + self.fail() msg = "Node %s not alive" % hostname raise RuntimeError, msg def fail_node_not_available(self, hostname): + self.fail() msg = "Node %s not available for provisioning" % hostname raise RuntimeError, msg def fail_not_enough_nodes(self): + self.fail() msg = "Not enough nodes available for provisioning" raise RuntimeError, msg + def fail_plapi(self): + self.fail() + msg = "Failing while trying to instanciate the PLC API" + raise RuntimeError, msg + def valid_connection(self, guid): # TODO: Validate! return True