Ticket #58: remove the emulation flag, instead, pick correctly configured nodes only
[nepi.git] / src / nepi / testbeds / planetlab / node.py
index 8557a95..99ba2aa 100644 (file)
@@ -10,8 +10,14 @@ import os
 import collections
 import cStringIO
 import resourcealloc
+import socket
+import sys
 
 from nepi.util import server
+from nepi.util import parallel
+
+class UnresponsiveNodeError(RuntimeError):
+    pass
 
 class Node(object):
     BASEFILTERS = {
@@ -34,6 +40,8 @@ class Node(object):
     
     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
     DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
+    RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm'
+    RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm'
     
     def __init__(self, api=None):
         if not api:
@@ -46,7 +54,6 @@ class Node(object):
         self.operatingSystem = None
         self.pl_distro = None
         self.site = None
-        self.emulation = None
         self.minReliability = None
         self.maxReliability = None
         self.minBandwidth = None
@@ -59,6 +66,7 @@ class Node(object):
         self.required_packages = set()
         self.required_vsys = set()
         self.pythonpath = []
+        self.rpmFusion = False
         self.env = collections.defaultdict(list)
         
         # Testbed-derived attributes
@@ -101,6 +109,8 @@ class Node(object):
         )
     
     def find_candidates(self, filter_slice_id=None):
+        print >>sys.stderr, "Finding candidates for", self.make_filter_description()
+        
         fields = ('node_id',)
         replacements = {'timeframe':self.timeframe}
         
@@ -113,6 +123,8 @@ class Node(object):
         # only pick healthy nodes
         basefilters['run_level'] = 'boot'
         basefilters['boot_state'] = 'boot'
+        basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
+        basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
         
         # keyword-only "pseudofilters"
         extra = {}
@@ -182,6 +194,24 @@ class Node(object):
                     len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
             
             candidates = set(filter(predicate, candidates))
+        
+        # make sure hostnames are resolvable
+        if candidates:
+            print >>sys.stderr, "  Found", len(candidates), "candidates. Checking for reachability..."
+            
+            hostnames = dict(map(operator.itemgetter('node_id','hostname'),
+                self._api.GetNodes(list(candidates), ['node_id','hostname'])
+            ))
+            def resolvable(node_id):
+                try:
+                    addr = socket.gethostbyname(hostnames[node_id])
+                    return addr is not None
+                except:
+                    return False
+            candidates = set(parallel.pfilter(resolvable, candidates,
+                maxthreads = 16))
+
+            print >>sys.stderr, "  Found", len(candidates), "reachable candidates."
             
         return candidates
     
@@ -225,11 +255,19 @@ class Node(object):
         self._node_id = node_id
         self.fetch_node_info()
     
+    def unassign_node(self):
+        self._node_id = None
+        self.__dict__.update(self.__orig_attrs)
+    
     def fetch_node_info(self):
+        orig_attrs = {}
+        
         info = self._api.GetNodes(self._node_id)[0]
         tags = dict( (t['tagname'],t['value'])
                      for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
 
+        orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
+        orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
         self.min_num_external_ifaces = None
         self.max_num_external_ifaces = None
         self.timeframe = 'm'
@@ -238,14 +276,19 @@ class Node(object):
         for attr, tag in self.BASEFILTERS.iteritems():
             if tag in info:
                 value = info[tag]
+                if hasattr(self, attr):
+                    orig_attrs[attr] = getattr(self, attr)
                 setattr(self, attr, value)
         for attr, (tag,_) in self.TAGFILTERS.iteritems():
             tag = tag % replacements
             if tag in tags:
                 value = tags[tag]
+                if hasattr(self, attr):
+                    orig_attrs[attr] = getattr(self, attr)
                 setattr(self, attr, value)
         
         if 'peer_id' in info:
+            orig_attrs['site'] = self.site
             self.site = self._api.peer_map[info['peer_id']]
         
         if 'interface_ids' in info:
@@ -253,7 +296,10 @@ class Node(object):
             self.max_num_external_ifaces = len(info['interface_ids'])
         
         if 'ssh_rsa_key' in info:
+            orig_attrs['server_key'] = self.server_key
             self.server_key = info['ssh_rsa_key']
+        
+        self.__orig_attrs = orig_attrs
 
     def validate(self):
         if self.home_path is None:
@@ -269,11 +315,29 @@ class Node(object):
             pidfile = self.DEPENDS_PIDFILE
             logfile = self.DEPENDS_LOGFILE
             
+            # If we need rpmfusion, we must install the repo definition and the gpg keys
+            if self.rpmFusion:
+                if self.operatingSystem == 'f12':
+                    # Fedora 12 requires a different rpmfusion package
+                    RPM_FUSION_URL = self.RPM_FUSION_URL_F12
+                else:
+                    # This one works for f13+
+                    RPM_FUSION_URL = self.RPM_FUSION_URL
+                    
+                rpmFusion = (
+                  '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&'
+                ) % {
+                    'RPM_FUSION_URL' : RPM_FUSION_URL
+                }
+            else:
+                rpmFusion = ''
+            
             # Start process in a "daemonized" way, using nohup and heavy
             # stdin/out redirection to avoid connection issues
             (out,err),proc = rspawn.remote_spawn(
-                "( yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
+                "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % {
                     'packages' : ' '.join(self.required_packages),
+                    'rpmfusion' : rpmFusion,
                 },
                 pidfile = pidfile,
                 stdout = logfile,
@@ -291,6 +355,21 @@ class Node(object):
             if proc.wait():
                 raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
     
+    def wait_provisioning(self, timeout = 20*60):
+        # recently provisioned nodes may not be up yet
+        sleeptime = 1.0
+        totaltime = 0.0
+        while not self.is_alive():
+            time.sleep(sleeptime)
+            totaltime += sleeptime
+            sleeptime = min(30.0, sleeptime*1.5)
+            
+            if totaltime > timeout:
+                # PlanetLab has a 15' delay on configuration propagation
+                # If we're above that delay, the unresponsiveness is not due
+                # to this delay.
+                raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+    
     def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
         if self.required_packages:
             pidfile = self.DEPENDS_PIDFILE
@@ -351,7 +430,7 @@ class Node(object):
     def is_alive(self):
         # Make sure all the paths are created where 
         # they have to be created for deployment
-        (out,err),proc = server.popen_ssh_command(
+        (out,err),proc = server.eintr_retry(server.popen_ssh_command)(
             "echo 'ALIVE'",
             host = self.hostname,
             port = None,
@@ -379,7 +458,7 @@ class Node(object):
             for dev in devs:
                 if dev.routes_here(route):
                     # Schedule rule
-                    dest, prefix, nexthop = route
+                    dest, prefix, nexthop, metric = route
                     rules.append(
                         "add %s%s gw %s %s" % (
                             dest,
@@ -395,6 +474,8 @@ class Node(object):
                 raise RuntimeError, "Route %s cannot be bound to any virtual interface " \
                     "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs)
         
+        print >>sys.stderr, "Setting up routes for", self.hostname
+        
         (out,err),proc = server.popen_ssh_command(
             "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict(
                 home = server.shell_escape(self.home_path)),