merge
authorAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 26 Sep 2011 11:04:36 +0000 (13:04 +0200)
committerAlina Quereilhac <alina.quereilhac@inria.fr>
Mon, 26 Sep 2011 11:04:36 +0000 (13:04 +0200)
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/plcapi.py
src/nepi/testbeds/planetlab/scripts/classqueue.py
src/nepi/testbeds/planetlab/util.py [new file with mode: 0644]
src/nepi/util/server.py

index c6ef6aa..ed8ddbd 100644 (file)
@@ -352,9 +352,14 @@ class Node(object):
     def fetch_node_info(self):
         orig_attrs = {}
         
-        info = self._api.GetNodes(self._node_id)[0]
+        self._api.StartMulticall()
+        info = self._api.GetNodes(self._node_id)
+        tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
+        info, tags = self._api.FinishMulticall()
+        info = info[0]
+        
         tags = dict( (t['tagname'],t['value'])
-                     for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
+                     for t in tags )
 
         orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
         orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
index f302f84..4e6b89d 100644 (file)
@@ -1,14 +1,15 @@
 import xmlrpclib
 import functools
 import socket
+import time
 
 def _retry(fn):
     def rv(*p, **kw):
-        for x in xrange(3):
+        for x in xrange(5):
             try:
                 return fn(*p, **kw)
             except (socket.error, IOError, OSError):
-                pass
+                time.sleep(x*5+5)
         else:
             return fn (*p, **kw)
     return rv
@@ -86,6 +87,8 @@ class PLCAPI(object):
             urlpattern % {'hostname':hostname},
             allow_none = True)
         
+        self._multi = False
+        
     def test(self):
         import warnings
         
@@ -276,3 +279,17 @@ class PLCAPI(object):
         return _retry(self.api.UpdateSlice)(self.auth, sliceIdOrName, kw)
         
 
+    def StartMulticall(self):
+        if not self._multi:
+            self._api = self.api
+            self.api = xmlrpclib.MultiCall(self._api)
+            self._multi = True
+    
+    def FinishMulticall(self):
+        if self._multi:
+            rv = self.api()
+            self.api = self._api
+            self._multi = False
+            return rv
+        else:
+            return []
index cfc1c2b..412a0a4 100644 (file)
@@ -229,11 +229,15 @@ class ClassQueue(object):
 
     def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count):
         if dump_count[0] >= 10000:
-            dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
-            astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
-            fd = open('dropped_stats', 'w')
-            iovec.writev(fd.fileno(), "Dropped:\n", dstatsstr, "Accepted:\n", astatsstr)
-            fd.close()
+            try:
+                dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
+                astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
+                fd = open('dropped_stats', 'w')
+                iovec.writev(fd.fileno(), "Classes: ", _classes, "\nDropped:\n", dstatsstr, "Accepted:\n", astatsstr)
+                fd.close()
+            except:
+                # who cares
+                pass
             dump_count[0] = 0
         else:
             dump_count[0] += 1
diff --git a/src/nepi/testbeds/planetlab/util.py b/src/nepi/testbeds/planetlab/util.py
new file mode 100644 (file)
index 0000000..dbe8018
--- /dev/null
@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from constants import TESTBED_ID, TESTBED_VERSION
+from nepi.core import testbed_impl
+from nepi.core.metadata import Parallel
+from nepi.util.constants import TIME_NOW
+from nepi.util.graphtools import mst
+from nepi.util import ipaddr2
+from nepi.util import environ
+from nepi.util.parallel import ParallelRun
+import sys
+import os
+import os.path
+import time
+import resourcealloc
+import collections
+import operator
+import functools
+import socket
+import struct
+import tempfile
+import subprocess
+import random
+import shutil
+import logging
+import metadata
+import weakref
+
+def getAPI(user, pass_):
+    import plcapi
+    return plcapi.PLCAPI(username=user, password=pass_)
+
+def getNodes(api, num, **constraints):
+    # Now do the backtracking search for a suitable solution
+    # First with existing slice nodes
+    reqs = []
+    nodes = []
+    
+    import node as Node
+        
+    for i in xrange(num):
+        node = Node.Node(api)
+        node.min_num_external_interface = 1
+        nodes.append(node)
+    
+    node = nodes[0]
+    candidates = node.find_candidates()
+    reqs = [candidates] * num
+
+    def pickbest(fullset, nreq, node=nodes[0]):
+        if len(fullset) > nreq:
+            fullset = zip(node.rate_nodes(fullset),fullset)
+            fullset.sort(reverse=True)
+            del fullset[nreq:]
+            return set(map(operator.itemgetter(1),fullset))
+        else:
+            return fullset
+    
+    solution = resourcealloc.alloc(reqs, sample=pickbest)
+    
+    # Do assign nodes
+    for node, node_id in zip(nodes, solution):
+        node.assign_node_id(node_id)
+    
+    return nodes
+
+def getSpanningTree(nodes, root = None, maxbranching = 2, hostgetter = operator.attrgetter('hostname')):
+    if not root:
+        # Pick root (deterministically)
+        root = min(nodes, key=hostgetter)
+    
+    # Obtain all IPs in numeric format
+    # (which means faster distance computations)
+    for node in nodes:
+        node._ip = socket.gethostbyname(hostgetter(node))
+        node._ip_n = struct.unpack('!L', socket.inet_aton(node._ip))[0]
+    
+    # Compute plan
+    # NOTE: the plan is an iterator
+    plan = mst.mst(
+        nodes,
+        lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
+        root = root,
+        maxbranching = maxbranching)
+
+    return plan
+
index 2d15b38..5a37fab 100644 (file)
@@ -587,10 +587,17 @@ def popen_ssh_command(command, host, port, user, agent,
         print "ssh", host, command
     
     tmp_known_hosts = None
-    args = ['ssh',
+    connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
+    args = ['ssh', '-C',
             # Don't bother with localhost. Makes test easier
             '-o', 'NoHostAuthenticationForLocalhost=yes',
             '-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
+            '-o', 'ConnectionAttempts=3',
+            '-o', 'ServerAliveInterval=30',
+            '-o', 'TCPKeepAlive=yes',
+            '-o', 'ControlMaster=auto',
+            '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
+            '-o', 'ControlPersist=60',
             '-l', user, host]
     if agent:
         args.append('-A')
@@ -689,9 +696,17 @@ def popen_scp(source, dest,
         user,host = remspec.rsplit('@',1)
         tmp_known_hosts = None
         
+        connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
         args = ['ssh', '-l', user, '-C',
                 # Don't bother with localhost. Makes test easier
                 '-o', 'NoHostAuthenticationForLocalhost=yes',
+                '-o', 'ConnectTimeout=30',
+                '-o', 'ConnectionAttempts=3',
+                '-o', 'ServerAliveInterval=30',
+                '-o', 'TCPKeepAlive=yes',
+                '-o', 'ControlMaster=auto',
+                '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
+                '-o', 'ControlPersist=60',
                 host ]
         if port:
             args.append('-P%d' % port)
@@ -906,9 +921,12 @@ def popen_python(python_code,
 
     if communication == DC.ACCESS_SSH:
         tmp_known_hosts = None
-        args = ['ssh',
+        args = ['ssh', '-C',
                 # Don't bother with localhost. Makes test easier
                 '-o', 'NoHostAuthenticationForLocalhost=yes',
+                '-o', 'ConnectionAttempts=3',
+                '-o', 'ServerAliveInterval=30',
+                '-o', 'TCPKeepAlive=yes',
                 '-l', user, host]
         if agent:
             args.append('-A')
@@ -1002,7 +1020,10 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
             else:
                 select_timeout = timelimit - curtime + 0.1
         else:
-            select_timeout = None
+            select_timeout = 1.0
+        
+        if select_timeout > 1.0:
+            select_timeout = 1.0
             
         try:
             rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
@@ -1011,6 +1032,10 @@ def _communicate(self, input, timeout=None, err_on_timeout=True):
                 raise
             else:
                 continue
+        
+        if not rlist and not wlist and not xlist and self.poll() is not None:
+            # timeout and process exited, say bye
+            break
 
         if self.stdin in wlist:
             # When select has indicated that the file is writable,