def fetch_node_info(self):
orig_attrs = {}
- info = self._api.GetNodes(self._node_id)[0]
+ self._api.StartMulticall()
+ info = self._api.GetNodes(self._node_id)
+ tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
+ info, tags = self._api.FinishMulticall()
+ info = info[0]
+
tags = dict( (t['tagname'],t['value'])
- for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
+ for t in tags )
orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
import xmlrpclib
import functools
import socket
+import time
def _retry(fn):
def rv(*p, **kw):
- for x in xrange(3):
+ for x in xrange(5):
try:
return fn(*p, **kw)
except (socket.error, IOError, OSError):
- pass
+ time.sleep(x*5+5)
else:
return fn (*p, **kw)
return rv
urlpattern % {'hostname':hostname},
allow_none = True)
+ self._multi = False
+
def test(self):
import warnings
return _retry(self.api.UpdateSlice)(self.auth, sliceIdOrName, kw)
+ def StartMulticall(self):
+ if not self._multi:
+ self._api = self.api
+ self.api = xmlrpclib.MultiCall(self._api)
+ self._multi = True
+
+ def FinishMulticall(self):
+ if self._multi:
+ rv = self.api()
+ self.api = self._api
+ self._multi = False
+ return rv
+ else:
+ return []
def dump_stats(self, astats=astats, dstats=dstats, dump_count=dump_count):
if dump_count[0] >= 10000:
- dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
- astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
- fd = open('dropped_stats', 'w')
- iovec.writev(fd.fileno(), "Dropped:\n", dstatsstr, "Accepted:\n", astatsstr)
- fd.close()
+ try:
+ dstatsstr = "".join(['%s:%s\n' % (key, value) for key, value in dstats.items()])
+ astatsstr = "".join(['%s:%s\n' % (key, value) for key, value in astats.items()])
+ fd = open('dropped_stats', 'w')
+ iovec.writev(fd.fileno(), "Classes: ", _classes, "\nDropped:\n", dstatsstr, "Accepted:\n", astatsstr)
+ fd.close()
+ except:
+ # who cares
+ pass
dump_count[0] = 0
else:
dump_count[0] += 1
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from constants import TESTBED_ID, TESTBED_VERSION
+from nepi.core import testbed_impl
+from nepi.core.metadata import Parallel
+from nepi.util.constants import TIME_NOW
+from nepi.util.graphtools import mst
+from nepi.util import ipaddr2
+from nepi.util import environ
+from nepi.util.parallel import ParallelRun
+import sys
+import os
+import os.path
+import time
+import resourcealloc
+import collections
+import operator
+import functools
+import socket
+import struct
+import tempfile
+import subprocess
+import random
+import shutil
+import logging
+import metadata
+import weakref
+
+def getAPI(user, pass_):
+ import plcapi
+ return plcapi.PLCAPI(username=user, password=pass_)
+
+def getNodes(api, num, **constraints):
+ # Now do the backtracking search for a suitable solution
+ # First with existing slice nodes
+ reqs = []
+ nodes = []
+
+ import node as Node
+
+ for i in xrange(num):
+ node = Node.Node(api)
+ node.min_num_external_interface = 1
+ nodes.append(node)
+
+ node = nodes[0]
+ candidates = node.find_candidates()
+ reqs = [candidates] * num
+
+ def pickbest(fullset, nreq, node=nodes[0]):
+ if len(fullset) > nreq:
+ fullset = zip(node.rate_nodes(fullset),fullset)
+ fullset.sort(reverse=True)
+ del fullset[nreq:]
+ return set(map(operator.itemgetter(1),fullset))
+ else:
+ return fullset
+
+ solution = resourcealloc.alloc(reqs, sample=pickbest)
+
+ # Do assign nodes
+ for node, node_id in zip(nodes, solution):
+ node.assign_node_id(node_id)
+
+ return nodes
+
+def getSpanningTree(nodes, root = None, maxbranching = 2, hostgetter = operator.attrgetter('hostname')):
+ if not root:
+ # Pick root (deterministically)
+ root = min(nodes, key=hostgetter)
+
+ # Obtain all IPs in numeric format
+ # (which means faster distance computations)
+ for node in nodes:
+ node._ip = socket.gethostbyname(hostgetter(node))
+ node._ip_n = struct.unpack('!L', socket.inet_aton(node._ip))[0]
+
+ # Compute plan
+ # NOTE: the plan is an iterator
+ plan = mst.mst(
+ nodes,
+ lambda a,b : ipaddr2.ipdistn(a._ip_n, b._ip_n),
+ root = root,
+ maxbranching = maxbranching)
+
+ return plan
+
print "ssh", host, command
tmp_known_hosts = None
- args = ['ssh',
+ connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
+ args = ['ssh', '-C',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
'-o', 'ConnectTimeout=%d' % (int(connect_timeout),),
+ '-o', 'ConnectionAttempts=3',
+ '-o', 'ServerAliveInterval=30',
+ '-o', 'TCPKeepAlive=yes',
+ '-o', 'ControlMaster=auto',
+ '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
+ '-o', 'ControlPersist=60',
'-l', user, host]
if agent:
args.append('-A')
user,host = remspec.rsplit('@',1)
tmp_known_hosts = None
+ connkey = repr((user,host,port)).encode("base64").strip().replace('/','.')
args = ['ssh', '-l', user, '-C',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
+ '-o', 'ConnectTimeout=30',
+ '-o', 'ConnectionAttempts=3',
+ '-o', 'ServerAliveInterval=30',
+ '-o', 'TCPKeepAlive=yes',
+ '-o', 'ControlMaster=auto',
+ '-o', 'ControlPath=/tmp/nepi_ssh_pl_%s' % ( connkey, ),
+ '-o', 'ControlPersist=60',
host ]
if port:
args.append('-P%d' % port)
if communication == DC.ACCESS_SSH:
tmp_known_hosts = None
- args = ['ssh',
+ args = ['ssh', '-C',
# Don't bother with localhost. Makes test easier
'-o', 'NoHostAuthenticationForLocalhost=yes',
+ '-o', 'ConnectionAttempts=3',
+ '-o', 'ServerAliveInterval=30',
+ '-o', 'TCPKeepAlive=yes',
'-l', user, host]
if agent:
args.append('-A')
else:
select_timeout = timelimit - curtime + 0.1
else:
- select_timeout = None
+ select_timeout = 1.0
+
+ if select_timeout > 1.0:
+ select_timeout = 1.0
try:
rlist, wlist, xlist = select.select(read_set, write_set, [], select_timeout)
raise
else:
continue
+
+ if not rlist and not wlist and not xlist and self.poll() is not None:
+ # timeout and process exited, say bye
+ break
if self.stdin in wlist:
# When select has indicated that the file is writable,