Merging heads
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 5 Nov 2013 10:53:40 +0000 (11:53 +0100)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Tue, 5 Nov 2013 10:53:40 +0000 (11:53 +0100)
examples/omf/nepi_omf_stdin_iminds.py
examples/planetlab/scalability.py [new file with mode: 0755]
src/nepi/resources/omf/application.py
src/nepi/resources/planetlab/node.py

index 1c6d91e..df95a24 100644 (file)
@@ -42,6 +42,8 @@
 from nepi.execution.resource import ResourceFactory, ResourceAction, ResourceState
 from nepi.execution.ec import ExperimentController
 
+import time
+
 # Create the EC
 ec = ExperimentController()
 
@@ -58,11 +60,11 @@ ec.set(node1, 'xmppPassword', "1234")
 # Create and Configure the Application
 app1 = ec.register_resource("OMFRobotApplication")
 ec.set(app1, 'appid', "robot")
-ec.set(app1, 'path', "/users/jtribino/RobotCTRLComm.rb")
-ec.set(app1, 'args', "/users/jtribino/coordinate.csv")
+ec.set(app1, 'path', "/users/jtribino/RobotCTRLComm.rb") # /users/username/RobotCTRLComm.rb
+ec.set(app1, 'args', "/users/jtribino/coordinate.csv")   #/users/username/coordinate.csv
 ec.set(app1, 'env', " ")
-ec.set(app1, 'sources', "/home/wlab18/Desktop/coordinate.csv")
-ec.set(app1, 'sshUser', "jtribino")
+ec.set(app1, 'sources', "/home/wlab18/Desktop/coordinate.csv")  # local path
+ec.set(app1, 'sshUser', "jtribino")  # username
 
 # Connection
 ec.register_connection(app1, node1)
@@ -85,5 +87,5 @@ ec.set(app1, 'stdin', "2;openlefteye")
 
 ec.wait_finished([app1])
 
- Stop Experiment
+# Stop Experiment
 ec.shutdown()
diff --git a/examples/planetlab/scalability.py b/examples/planetlab/scalability.py
new file mode 100755 (executable)
index 0000000..4a028b4
--- /dev/null
@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+#
+#    NEPI, a framework to manage network experiments
+#    Copyright (C) 2013 INRIA
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License as published by
+#    the Free Software Foundation, either version 3 of the License, or
+#    (at your option) any later version.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+# Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
+
+from nepi.execution.ec import ExperimentController
+from nepi.execution.resource import ResourceAction, ResourceState
+
+import os
+import time
+
+def create_node(ec, username, pl_user, pl_password, hostname=None, country=None,
+                operatingSystem=None, minBandwidth=None, minCpu=None):
+
+    node = ec.register_resource("PlanetlabNode")
+
+    if username:
+        ec.set(node, "username", username)
+    if pl_user:
+        ec.set(node, "pluser", pl_user)
+    if pl_password:
+        ec.set(node, "plpassword", pl_password)
+
+    if hostname:
+        ec.set(node, "hostname", hostname)
+    if country:
+        ec.set(node, "country", country)
+    if operatingSystem:
+        ec.set(node, "operatingSystem", operatingSystem)
+    if minBandwidth:
+        ec.set(node, "minBandwidth", minBandwidth)
+    if minCpu:
+        ec.set(node, "minCpu", minCpu)
+
+    ec.set(node, "cleanHome", True)
+    ec.set(node, "cleanProcesses", True)
+    
+    return node
+
+exp_id = "scalability_exp"
+
+# Create the entity Experiment Controller:
+ec = ExperimentController(exp_id)
+
+# Register the nodes resources:
+
+# The username in this case is the slice name, the one to use for login in 
+# via ssh into PlanetLab nodes. Replace with your own slice name.
+username = os.environ.get("PL_SLICE")
+
+# The pluser and plpassword are the ones used to login in the PlanetLab web 
+# site. Replace with your own user and password account information.
+pl_user = os.environ.get("PL_USER")
+pl_password =  os.environ.get("PL_PASS")
+
+# Choose the PlanetLab nodes for the experiment, in this example 5 nodes are
+# used, and they are picked according to different criterias.
+
+first_set_nodes = []
+second_set_nodes = []
+third_set_nodes = []
+
+# First set of nodes will be defined by its hostname.
+hostnames = ['planetlab2.utt.fr',
+'planetlab-2.man.poznan.pl',
+'planetlab-1.ing.unimo.it',
+'gschembra3.diit.unict.it',
+'planetlab2.ionio.gr',
+'planetlab-1.imag.fr',
+'node2pl.planet-lab.telecom-lille1.eu',
+'planetlab1.xeno.cl.cam.ac.uk',
+'planetlab3.hiit.fi',
+'planetlab2.fri.uni-lj.si',
+'planetlab1.informatik.uni-erlangen.de',
+'node1pl.planet-lab.telecom-lille1.eu',
+'planet2.servers.ua.pt',
+'iraplab1.iralab.uni-karlsruhe.de',
+'planetlab-node3.it-sudparis.eu',
+'planet1.elte.hu',
+'planet1.l3s.uni-hannover.de',
+'planetlab1.fct.ualg.pt',
+'host1.planetlab.informatik.tu-darmstadt.de',
+'vicky.planetlab.ntua.gr',
+'planetlab1.rutgers.edu']
+
+for hostname in hostnames:
+    node = create_node(ec, username, pl_user, pl_password, hostname=hostname)
+    first_set_nodes.append(node)
+
+# Second set of nodes will be U.S.A. nodes.
+country = "United States"
+for i in range(15):
+    node = create_node(ec, username, pl_user, pl_password, country=country)
+    second_set_nodes.append(node)
+
+# Third set of nodes can be any node in Planetlab testbed.
+for i in range(70):
+    node = create_node(ec, username, pl_user, pl_password)
+    third_set_nodes.append(node)
+
+# Register conditions
+
+# The nodes that are completely identified by their hostnames have to be provisioned 
+# before the rest of the nodes. This assures that no other resource will use the
+# identified node even if the constraints matchs. 
+# In this example the nodes from the first need to be provisioned before the ones in
+# the second and third group. Using the same logic, is convenient that nodes from the
+# second group are provisioned before nodes from the third group.
+
+ec.register_condition(second_set_nodes, ResourceAction.DEPLOY, first_set_nodes, ResourceState.PROVISIONED)
+ec.register_condition(third_set_nodes, ResourceAction.DEPLOY, second_set_nodes, ResourceState.PROVISIONED)
+    
+# Deploy the experiment:
+ec.deploy()
+
+# Wait until all nodes are provisioned:
+ec.wait_deployed(first_set_nodes)
+ec.wait_deployed(second_set_nodes)
+ec.wait_deployed(third_set_nodes)
+
+# Do the experiment controller shutdown:
+ec.shutdown()
+
+# END
index 4a3f027..75a277f 100644 (file)
@@ -25,6 +25,8 @@ from nepi.resources.omf.omf_resource import ResourceGateway, OMFResource
 from nepi.resources.omf.node import OMFNode
 from nepi.resources.omf.omf_api import OMFAPIFactory
 
+from nepi.util import sshfuncs
+
 @clsinit_copy
 class OMFApplication(OMFResource):
     """
@@ -56,11 +58,20 @@ class OMFApplication(OMFResource):
         args = Attribute("args", "Argument of the application")
         env = Attribute("env", "Environnement variable of the application")
         stdin = Attribute("stdin", "Input of the application", default = "")
+        sources = Attribute("sources", "Sources of the application", 
+                     flags = Flags.ExecReadOnly)
+        sshuser = Attribute("sshUser", "user to connect with ssh", 
+                     flags = Flags.ExecReadOnly)
+        sshkey = Attribute("sshKey", "key to use for ssh", 
+                     flags = Flags.ExecReadOnly)
         cls._register_attribute(appid)
         cls._register_attribute(path)
         cls._register_attribute(args)
         cls._register_attribute(env)
         cls._register_attribute(stdin)
+        cls._register_attribute(sources)
+        cls._register_attribute(sshuser)
+        cls._register_attribute(sshkey)
 
     def __init__(self, ec, guid):
         """
@@ -142,15 +153,27 @@ class OMFApplication(OMFResource):
         It becomes DEPLOYED after getting the xmpp client.
 
         """
+        self.set('xmppSlice',self.node.get('xmppSlice'))
+        self.set('xmppHost',self.node.get('xmppHost'))
+        self.set('xmppPort',self.node.get('xmppPort'))
+        self.set('xmppPassword',self.node.get('xmppPassword'))
+
+        if not (self.get('xmppSlice') and self.get('xmppHost')
+              and self.get('xmppPort') and self.get('xmppPassword')):
+            msg = "Credentials are not initialzed. XMPP Connections impossible"
+            self.error(msg)
+            raise RuntimeError, msg
+
         if not self._omf_api :
             self._omf_api = OMFAPIFactory.get_api(self.get('xmppSlice'), 
                 self.get('xmppHost'), self.get('xmppPort'), 
                 self.get('xmppPassword'), exp_id = self.exp_id)
 
-        if not self._omf_api :
-            msg = "Credentials are not initialzed. XMPP Connections impossible"
-            self.error(msg)
-            raise RuntimeError, msg
+        if self.get('sources'):
+            gateway = ResourceGateway.AMtoGateway[self.get('xmppHost')]
+            user = self.get('sshUser') or self.get('xmppSlice')
+            dst = user + "@"+ gateway + ":"
+            (out, err), proc = sshfuncs.rcopy(self.get('sources'), dst)
 
         super(OMFApplication, self).do_deploy()
 
index c8039a1..c25f2d7 100644 (file)
@@ -20,7 +20,7 @@
 
 from nepi.execution.attribute import Attribute, Flags, Types
 from nepi.execution.resource import ResourceManager, clsinit_copy, \
-        ResourceState, reschedule_delay
+        ResourceState, reschedule_delay 
 from nepi.resources.linux.node import LinuxNode
 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
 from nepi.util.execfuncs import lexec
@@ -29,6 +29,7 @@ from nepi.util import sshfuncs
 from random import randint
 import time
 import threading
+import datetime
 
 @clsinit_copy
 class PlanetlabNode(LinuxNode):
@@ -37,8 +38,6 @@ class PlanetlabNode(LinuxNode):
             "associated to a PlanetLab user account"
     _backend = "planetlab"
 
-    ## XXX A.Q. This lock could use a more descriptive name and 
-    #           an explanatory comment
     lock = threading.Lock()
 
     @classmethod
@@ -195,6 +194,7 @@ class PlanetlabNode(LinuxNode):
 
         self._plapi = None
         self._node_to_provision = None
+        self._slicenode = False
     
     @property
     def plapi(self):
@@ -207,28 +207,28 @@ class PlanetlabNode(LinuxNode):
             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
                     pl_ptn)
             
+            if not self._plapi:
+                self.fail_plapi()
+
         return self._plapi
 
     def do_discover(self):
         """
-        Based on the attributes defined by the user, discover the suitable nodes
+        Based on the attributes defined by the user, discover the suitable 
+        nodes for provision.
         """
         hostname = self._get_hostname()
-        print self.guid, hostname 
         if hostname:
             # the user specified one particular node to be provisioned
             # check with PLCAPI if it is alvive
             node_id = self._query_if_alive(hostname=hostname)
             node_id = node_id.pop()
-            print self.guid, node_id
 
             # check that the node is not blacklisted or being provisioned
             # by other RM
             with PlanetlabNode.lock:
                 plist = self.plapi.reserved()
                 blist = self.plapi.blacklisted()
-                print self.guid,plist
-                print self.guid,blist
                 if node_id not in blist and node_id not in plist:
                 
                     # check that is really alive, by performing ping
@@ -237,6 +237,8 @@ class PlanetlabNode(LinuxNode):
                         self._blacklist_node(node_id)
                         self.fail_node_not_alive(hostname)
                     else:
+                        if self._check_if_in_slice([node_id]):
+                            self._slicenode = True
                         self._put_node_in_provision(node_id)
                         self._node_to_provision = node_id
                         super(PlanetlabNode, self).do_discover()
@@ -258,15 +260,25 @@ class PlanetlabNode(LinuxNode):
             node_id = None
             if nodes_inslice:
                 node_id = self._choose_random_node(nodes_inslice)
+                self._slicenode = True                
                 
             if not node_id:
                 # Either there were no matching nodes in the user's slice, or
                 # the nodes in the slice  were blacklisted or being provisioned
                 # by other RM. Note nodes_not_inslice is never empty
                 node_id = self._choose_random_node(nodes_not_inslice)
+                self._slicenode = False
 
             if node_id:
                 self._node_to_provision = node_id
+                try:
+                    self._set_hostname_attr(node_id)
+                    self.info(" Selected node to provision ")
+                except:
+                    with PlanetlabNode.lock:
+                        self._blacklist_node(node_id)
+                        self.do_discover()
                 super(PlanetlabNode, self).do_discover()
             else:
                self.fail_not_enough_nodes() 
@@ -279,43 +291,40 @@ class PlanetlabNode(LinuxNode):
         provision_ok = False
         ssh_ok = False
         proc_ok = False
-        timeout = 120
+        timeout = 1800
 
         while not provision_ok:
             node = self._node_to_provision
-            # Adding try catch to set hostname because sometimes MyPLC fails
-            # when trying to retrive node's hostname
-            try:
-                self._set_hostname_attr(node)
-            except:
-                with PlanetlabNode.lock:
-                    self._blacklist_node(node)
-                self.do_discover()
-                continue
-
-            self._add_node_to_slice(node)
+            if not self._slicenode:
+                self._add_node_to_slice(node)
             
-            # check ssh connection
-            t = 0 
-            while t < timeout and not ssh_ok:
-
+                # check ssh connection
+                t = 0 
+                while t < timeout and not ssh_ok:
+
+                    cmd = 'echo \'GOOD NODE\''
+                    ((out, err), proc) = self.execute(cmd)
+                    if out.find("GOOD NODE") < 0:
+                        t = t + 60
+                        time.sleep(60)
+                        continue
+                    else:
+                        ssh_ok = True
+                        continue
+            else:
                 cmd = 'echo \'GOOD NODE\''
                 ((out, err), proc) = self.execute(cmd)
-                if out.find("GOOD NODE") < 0:
-                    t = t + 60
-                    time.sleep(60)
-                    continue
-                else:
+                if not out.find("GOOD NODE") < 0:
                     ssh_ok = True
-                    continue
 
             if not ssh_ok:
                 # the timeout was reach without establishing ssh connection
                 # the node is blacklisted, deleted from the slice, and a new
                 # node to provision is discovered
                 with PlanetlabNode.lock:
+                    self.warn(" Could not SSH login ")
                     self._blacklist_node(node)
-                    self._delete_node_from_slice(node)
+                    #self._delete_node_from_slice(node)
                 self.set('hostname', None)
                 self.do_discover()
                 continue
@@ -326,8 +335,9 @@ class PlanetlabNode(LinuxNode):
                 ((out, err), proc) = self.execute(cmd)
                 if out.find("/proc type proc") < 0:
                     with PlanetlabNode.lock:
+                        self.warn(" Could not find directory /proc ")
                         self._blacklist_node(node)
-                        self._delete_node_from_slice(node)
+                        #self._delete_node_from_slice(node)
                     self.set('hostname', None)
                     self.do_discover()
                     continue
@@ -344,7 +354,6 @@ class PlanetlabNode(LinuxNode):
         """
         Retrive the list of nodes ids that match user's constraints 
         """
-
         # Map user's defined attributes with tagnames of PlanetLab
         timeframe = self.get("timeframe")[0]
         attr_to_tags = {
@@ -389,7 +398,7 @@ class PlanetlabNode(LinuxNode):
             nodes = self.plapi.get_nodes()
             for node in nodes:
                 nodes_id.append(node['node_id'])
-                
+        
         return nodes_id
                     
 
@@ -516,7 +525,7 @@ class PlanetlabNode(LinuxNode):
             return nodes_id
 
     def _choose_random_node(self, nodes):
-        """ 
+        """
         From the possible nodes for provision, choose randomly to decrese the
         probability of different RMs choosing the same node for provision
         """
@@ -535,20 +544,21 @@ class PlanetlabNode(LinuxNode):
                 plist = self.plapi.reserved()
                 if node_id not in blist and node_id not in plist:
                     ping_ok = self._do_ping(node_id)
-                    print " ### ping_ok #### %s guid %s" % (ping_ok, self.guid)
                     if not ping_ok:
+                        self._set_hostname_attr(node_id)
+                        self.warn(" Node not responding PING ")
                         self._blacklist_node(node_id)
+                        self.set('hostname', None)
                     else:
                         # discovered node for provision, added to provision list
                         self._put_node_in_provision(node_id)
-                        print "node_id %s , guid %s" % (node_id, self.guid)
                         return node_id
 
     def _get_nodes_id(self, filters):
         return self.plapi.get_nodes(filters, fields=['node_id'])
 
     def _add_node_to_slice(self, node_id):
-        self.info(" Selected node to provision ")
+        self.info(" Adding node to slice ")
         slicename = self.get("username")
         with PlanetlabNode.lock:
             slice_nodes = self.plapi.get_slice_nodes(slicename)
@@ -595,17 +605,14 @@ class PlanetlabNode(LinuxNode):
         """
         ping_ok = False
         ip = self._get_ip(node_id)
-        print "ip de do_ping %s, guid %s" % (ip, self.guid)
         if not ip: return ping_ok
 
         command = "ping -c2 %s" % ip
 
         (out, err) = lexec(command)
-        print "out de do_ping %s, guid %s" % (out, self.guid)
         if not out.find("2 received") < 0:
             ping_ok = True
         
-        print "ping_ok de do_ping %s, guid %s" % (ping_ok, self.guid)
         return ping_ok 
 
     def _blacklist_node(self, node):
@@ -627,30 +634,39 @@ class PlanetlabNode(LinuxNode):
         Query PLCAPI for the IP of a node with certain node id
         """
         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
-        print "#### HOSTNAME ##### %s ### guid %s " % (hostname['hostname'], self.guid)
-        ip = sshfuncs.gethostbyname(hostname['hostname'])
-        if not ip:
+        try:
+            ip = sshfuncs.gethostbyname(hostname['hostname'])
+        except:
             # Fail while trying to find the IP
             return None
         return ip
 
     def fail_discovery(self):
+        self.fail()
         msg = "Discovery failed. No candidates found for node"
         self.error(msg)
         raise RuntimeError, msg
 
     def fail_node_not_alive(self, hostname=None):
+        self.fail()
         msg = "Node %s not alive" % hostname
         raise RuntimeError, msg
     
     def fail_node_not_available(self, hostname):
+        self.fail()
         msg = "Node %s not available for provisioning" % hostname
         raise RuntimeError, msg
 
     def fail_not_enough_nodes(self):
+        self.fail()
         msg = "Not enough nodes available for provisioning"
         raise RuntimeError, msg
 
+    def fail_plapi(self):
+        self.fail()
+        msg = "Failing while trying to instanciate the PLC API"
+        raise RuntimeError, msg
+
     def valid_connection(self, guid):
         # TODO: Validate!
         return True