X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Fresources%2Fplanetlab%2Fnode.py;h=fe7f058176d6fc8a7c843a5d903455e9ba602407;hb=e55924b6886bd7382a28e1ae235c4810f852e163;hp=57196b35f75446f6717563e5675fcc9c3e0d16d7;hpb=99d8b2a4431d8fafd0385e189375106d46f1abd9;p=nepi.git diff --git a/src/nepi/resources/planetlab/node.py b/src/nepi/resources/planetlab/node.py index 57196b35..fe7f0581 100644 --- a/src/nepi/resources/planetlab/node.py +++ b/src/nepi/resources/planetlab/node.py @@ -3,9 +3,8 @@ # Copyright (C) 2013 INRIA # # This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation; # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -20,35 +19,34 @@ from nepi.execution.attribute import Attribute, Flags, Types from nepi.execution.resource import ResourceManager, clsinit_copy, \ - ResourceState, reschedule_delay, failtrap + ResourceState from nepi.resources.linux.node import LinuxNode from nepi.resources.planetlab.plcapi import PLCAPIFactory from nepi.util.execfuncs import lexec +from nepi.util import sshfuncs from random import randint +import re +import os import time +import socket import threading +import datetime +import weakref @clsinit_copy class PlanetlabNode(LinuxNode): - _rtype = "PlanetlabNode" + _rtype = "planetlab::Node" _help = "Controls a PlanetLab host accessible using a SSH key " \ "associated to a PlanetLab user account" - _backend = "planetlab" - - blacklist = list() - provisionlist = list() - - lock_blist = threading.Lock() - lock_plist = threading.Lock() - - lock_slice = threading.Lock() + _platform = "planetlab" + lock = threading.Lock() @classmethod def _register_attributes(cls): ip = Attribute("ip", "PlanetLab host public IP address", - flags = Flags.ReadOnly) + flags = Flags.Design) pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \ (e.g. www.planet-lab.eu or www.planet-lab.org) ", @@ -58,13 +56,13 @@ class PlanetlabNode(LinuxNode): pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \ (e.g. https://%(hostname)s:443/PLCAPI/ ) ", default = "https://%(hostname)s:443/PLCAPI/", - flags = Flags.ExecReadOnly) + flags = Flags.Design) pl_user = Attribute("pluser", "PlanetLab account user, as the one to \ authenticate in the website) ", flags = Flags.Credential) - pl_password = Attribute("password", + pl_password = Attribute("plpassword", "PlanetLab account password, as \ the one to authenticate in the website) ", flags = Flags.Credential) @@ -98,14 +96,6 @@ class PlanetlabNode(LinuxNode): "other"], flags = Flags.Filter) - site = Attribute("site", "Constrain the PlanetLab site this node \ - should reside on.", - type = Types.Enumerate, - allowed = ["PLE", - "PLC", - "PLJ"], - flags = Flags.Filter) - min_reliability = Attribute("minReliability", "Constrain reliability \ while picking PlanetLab nodes. Specifies a lower \ acceptable bound.", @@ -173,12 +163,19 @@ class PlanetlabNode(LinuxNode): "year"], flags = Flags.Filter) + plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \ + in the user's home directory under .nepi directory. This file \ + contains a list of PL nodes to blacklist, and at the end \ + of the experiment execution the new blacklisted nodes are added.", + type = Types.Bool, + default = False, + flags = Flags.Global) + cls._register_attribute(ip) cls._register_attribute(pl_url) cls._register_attribute(pl_ptn) cls._register_attribute(pl_user) cls._register_attribute(pl_password) - cls._register_attribute(site) cls._register_attribute(city) cls._register_attribute(country) cls._register_attribute(region) @@ -193,137 +190,218 @@ class PlanetlabNode(LinuxNode): cls._register_attribute(min_cpu) cls._register_attribute(max_cpu) cls._register_attribute(timeframe) + cls._register_attribute(plblacklist) def __init__(self, ec, guid): super(PlanetlabNode, self).__init__(ec, guid) + self._ecobj = weakref.ref(ec) self._plapi = None self._node_to_provision = None + self._slicenode = False + self._hostname = False + + if self.get("gateway") or self.get("gatewayUser"): + self.set("gateway", None) + self.set("gatewayUser", None) + + # Blacklist file + nepi_home = os.path.join(os.path.expanduser("~"), ".nepi") + plblacklist_file = os.path.join(nepi_home, "plblacklist.txt") + if not os.path.exists(plblacklist_file): + if os.path.isdir(nepi_home): + open(plblacklist_file, 'w').close() + else: + os.makedirs(nepi_home) + open(plblacklist_file, 'w').close() + + def _skip_provision(self): + pl_user = self.get("pluser") + pl_pass = self.get("plpassword") + if not pl_user and not pl_pass: + return True + else: return False @property def plapi(self): if not self._plapi: pl_user = self.get("pluser") - pl_pass = self.get("password") + pl_pass = self.get("plpassword") pl_url = self.get("plcApiUrl") pl_ptn = self.get("plcApiPattern") - - self._plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url, - pl_ptn) + _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url, + pl_ptn, self._ecobj()) - return self._plapi + if not _plapi: + self.fail_plapi() + + self._plapi = weakref.ref(_plapi) - def discoverl(self): + return self._plapi() + + def do_discover(self): """ - Based on the attributes defined by the user, discover the suitable nodes + Based on the attributes defined by the user, discover the suitable + nodes for provision. """ - hostname = self.get("hostname") + if self._skip_provision(): + super(PlanetlabNode, self).do_discover() + return + + hostname = self._get_hostname() if hostname: # the user specified one particular node to be provisioned - # check with PLCAPI if it is alvive - node_id = self._query_if_alive(hostname=hostname) - node_id = node_id.pop() + self._hostname = True + node_id = self._get_nodes_id({'hostname':hostname}) + node_id = node_id.pop()['node_id'] - # check that the node is not blacklisted or already being provision + # check that the node is not blacklisted or being provisioned # by other RM - blist = PlanetlabNode.blacklist - plist = PlanetlabNode.provisionlist - if node_id not in blist and node_id not in plist: + with PlanetlabNode.lock: + plist = self.plapi.reserved() + blist = self.plapi.blacklisted() + if node_id not in blist and node_id not in plist: - # check that is really alive, by performing ping - ping_ok = self._do_ping(node_id) - if not ping_ok: - self._blacklist_node(node_id) - self.fail_node_not_alive(hostname) + # check that is really alive, by performing ping + ping_ok = self._do_ping(node_id) + if not ping_ok: + self._blacklist_node(node_id) + self.fail_node_not_alive(hostname) + else: + if self._check_if_in_slice([node_id]): + self._slicenode = True + self._put_node_in_provision(node_id) + self._node_to_provision = node_id else: - self._node_to_provision = node_id - self._put_node_in_provision(node_id) - super(PlanetlabNode, self).discover() - - else: - self.fail_node_not_available(hostname) + self.fail_node_not_available(hostname) + super(PlanetlabNode, self).do_discover() else: # the user specifies constraints based on attributes, zero, one or # more nodes can match these constraints nodes = self._filter_based_on_attributes() - nodes_alive = self._query_if_alive(nodes) - + # nodes that are already part of user's slice have the priority to # provisioned - nodes_inslice = self._check_if_in_slice(nodes_alive) - nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice)) + nodes_inslice = self._check_if_in_slice(nodes) + nodes_not_inslice = list(set(nodes) - set(nodes_inslice)) node_id = None if nodes_inslice: node_id = self._choose_random_node(nodes_inslice) + self._slicenode = True - if not node_id and nodes_not_inslice: + if not node_id: # Either there were no matching nodes in the user's slice, or # the nodes in the slice were blacklisted or being provisioned # by other RM. Note nodes_not_inslice is never empty node_id = self._choose_random_node(nodes_not_inslice) - if not node_id: - self.fail_not_enough_nodes() - - self._node_to_provision = node_id - super(PlanetlabNode, self).discover() + self._slicenode = False + + if node_id: + self._node_to_provision = node_id + try: + self._set_hostname_attr(node_id) + self.info(" Selected node to provision ") + super(PlanetlabNode, self).do_discover() + except: + with PlanetlabNode.lock: + self._blacklist_node(node_id) + self.do_discover() + else: + self.fail_not_enough_nodes() - def provisionl(self): + def do_provision(self): """ Add node to user's slice after verifing that the node is functioning correctly """ + if self._skip_provision(): + super(PlanetlabNode, self).do_provision() + return + provision_ok = False ssh_ok = False proc_ok = False - timeout = 1200 + timeout = 1800 while not provision_ok: node = self._node_to_provision - self._set_hostname_attr(node) - self._add_node_to_slice(node) - - # check ssh connection - t = 0 - while t < timeout and not ssh_ok: + if not self._slicenode: + self._add_node_to_slice(node) + if self._check_if_in_slice([node]): + self.debug( "Node added to slice" ) + else: + self.warning(" Could not add to slice ") + with PlanetlabNode.lock: + self._blacklist_node(node) + self.do_discover() + continue + # check ssh connection + t = 0 + while t < timeout and not ssh_ok: + + cmd = 'echo \'GOOD NODE\'' + ((out, err), proc) = self.execute(cmd) + if out.find("GOOD NODE") < 0: + self.debug( "No SSH connection, waiting 60s" ) + t = t + 60 + time.sleep(60) + continue + else: + self.debug( "SSH OK" ) + ssh_ok = True + continue + else: cmd = 'echo \'GOOD NODE\'' ((out, err), proc) = self.execute(cmd) - if out.find("GOOD NODE") < 0: - t = t + 60 - time.sleep(60) - continue - else: + if not out.find("GOOD NODE") < 0: ssh_ok = True - continue if not ssh_ok: # the timeout was reach without establishing ssh connection # the node is blacklisted, deleted from the slice, and a new # node to provision is discovered - self._blacklist_node(node) - self._delete_node_from_slice(node) - self.discover() + with PlanetlabNode.lock: + self.warning(" Could not SSH login ") + self._blacklist_node(node) + #self._delete_node_from_slice(node) + self.do_discover() continue # check /proc directory is mounted (ssh_ok = True) + # and file system is not read only else: cmd = 'mount |grep proc' - ((out, err), proc) = self.execute(cmd) - if out.find("/proc type proc") < 0: - self._blacklist_node(node) - self._delete_node_from_slice(node) - self.discover() + ((out1, err1), proc1) = self.execute(cmd) + cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile' + ((out2, err2), proc2) = self.execute(cmd) + if out1.find("/proc type proc") < 0 or \ + "Read-only file system".lower() in err2.lower(): + with PlanetlabNode.lock: + self.warning(" Corrupted file system ") + self._blacklist_node(node) + #self._delete_node_from_slice(node) + self.do_discover() continue else: provision_ok = True + if not self.get('hostname'): + self._set_hostname_attr(node) # set IP attribute ip = self._get_ip(node) self.set("ip", ip) + self.info(" Node provisioned ") - super(PlanetlabNode, self).provision() + super(PlanetlabNode, self).do_provision() + + def do_release(self): + super(PlanetlabNode, self).do_release() + if self.state == ResourceState.RELEASED and not self._skip_provision(): + self.debug(" Releasing PLC API ") + self.plapi.release() def _filter_based_on_attributes(self): """ @@ -337,7 +415,6 @@ class PlanetlabNode(LinuxNode): 'region' : 'region', 'architecture' : 'arch', 'operatingSystem' : 'fcdistro', - #'site' : 'pldistro', 'minReliability' : 'reliability%s' % timeframe, 'maxReliability' : 'reliability%s' % timeframe, 'minBandwidth' : 'bw%s' % timeframe, @@ -354,7 +431,7 @@ class PlanetlabNode(LinuxNode): for attr_name, attr_obj in self._attrs.iteritems(): attr_value = self.get(attr_name) - if attr_value is not None and attr_obj.flags == 8 and \ + if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \ attr_name != 'timeframe': attr_tag = attr_to_tags[attr_name] @@ -368,10 +445,13 @@ class PlanetlabNode(LinuxNode): # filter nodes by range constraints e.g. max bandwidth elif ('min' or 'max') in attr_name: nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id) - + + if not filters: + nodes = self._get_nodes_id() + for node in nodes: + nodes_id.append(node['node_id']) return nodes_id - def _filter_by_fixed_attr(self, filters, nodes_id): """ Query PLCAPI for nodes ids matching fixed attributes defined by the @@ -410,7 +490,7 @@ class PlanetlabNode(LinuxNode): range, by the user """ node_tags = self.plapi.get_node_tags(filters) - if node_tags is not None: + if node_tags: if len(nodes_id) == 0: # first attribute being matched @@ -455,53 +535,11 @@ class PlanetlabNode(LinuxNode): return nodes_id - def _query_if_alive(self, nodes_id=None, hostname=None): - """ - Query PLCAPI for nodes that register activity recently, using filters - related to the state of the node, e.g. last time it was contacted - """ - if nodes_id is None and hostname is None: - msg = "Specify nodes_id or hostname" - raise RuntimeError, msg - - if nodes_id is not None and hostname is not None: - msg = "Specify either nodes_id or hostname" - raise RuntimeError, msg - - # define PL filters to check the node is alive - filters = dict() - filters['run_level'] = 'boot' - filters['boot_state'] = 'boot' - filters['node_type'] = 'regular' - filters['>last_contact'] = int(time.time()) - 2*3600 - - # adding node_id or hostname to the filters to check for the particular - # node - if nodes_id: - filters['node_id'] = list(nodes_id) - alive_nodes_id = self._get_nodes_id(filters) - elif hostname: - filters['hostname'] = hostname - alive_nodes_id = self._get_nodes_id(filters) - - if len(alive_nodes_id) == 0: - self.fail_discovery() - else: - nodes_id = list() - for node_id in alive_nodes_id: - nid = node_id['node_id'] - nodes_id.append(nid) - - return nodes_id - def _choose_random_node(self, nodes): """ From the possible nodes for provision, choose randomly to decrese the probability of different RMs choosing the same node for provision """ - blist = PlanetlabNode.blacklist - plist = PlanetlabNode.provisionlist - size = len(nodes) while size: size = size - 1 @@ -511,31 +549,50 @@ class PlanetlabNode(LinuxNode): # check the node is not blacklisted or being provision by other RM # and perform ping to check that is really alive - if node_id not in blist and node_id not in plist: - ping_ok = self._do_ping(node_id) - if not ping_ok: - self._blacklist_node(node_id) - else: - # discovered node for provision, added to provision list - self._put_node_in_provision(node_id) - return node_id - - def _get_nodes_id(self, filters): + with PlanetlabNode.lock: + + blist = self.plapi.blacklisted() + plist = self.plapi.reserved() + if node_id not in blist and node_id not in plist: + ping_ok = self._do_ping(node_id) + if not ping_ok: + self._set_hostname_attr(node_id) + self.warning(" Node not responding PING ") + self._blacklist_node(node_id) + else: + # discovered node for provision, added to provision list + self._put_node_in_provision(node_id) + return node_id + + def _get_nodes_id(self, filters=None): return self.plapi.get_nodes(filters, fields=['node_id']) def _add_node_to_slice(self, node_id): - self.warn(" Adding node to slice ") + self.info(" Adding node to slice ") slicename = self.get("username") - with PlanetlabNode.lock_slice: + with PlanetlabNode.lock: slice_nodes = self.plapi.get_slice_nodes(slicename) + self.debug(" Previous slice nodes %s " % slice_nodes) slice_nodes.append(node_id) self.plapi.add_slice_nodes(slicename, slice_nodes) def _delete_node_from_slice(self, node): - self.warn(" Deleting node from slice ") + self.warning(" Deleting node from slice ") slicename = self.get("username") self.plapi.delete_slice_node(slicename, [node]) + def _get_hostname(self): + hostname = self.get("hostname") + if hostname: + return hostname + ip = self.get("ip") + if ip: + hostname = socket.gethostbyaddr(ip)[0] + self.set('hostname', hostname) + return hostname + else: + return None + def _set_hostname_attr(self, node): """ Query PLCAPI for the hostname of a certain node id and sets the @@ -552,7 +609,6 @@ class PlanetlabNode(LinuxNode): slicename = self.get("username") slice_nodes = self.plapi.get_slice_nodes(slicename) nodes_inslice = list(set(nodes_id) & set(slice_nodes)) - return nodes_inslice def _do_ping(self, node_id): @@ -561,62 +617,67 @@ class PlanetlabNode(LinuxNode): """ ping_ok = False ip = self._get_ip(node_id) - command = "ping -c2 %s | echo \"PING OK\"" % ip - - (out, err) = lexec(command) - if not out.find("PING OK") < 0: - ping_ok = True - + if ip: + command = "ping -c4 %s" % ip + (out, err) = lexec(command) + + m = re.search("(\d+)% packet loss", str(out)) + if m and int(m.groups()[0]) < 50: + ping_ok = True + return ping_ok def _blacklist_node(self, node): """ Add node mal functioning node to blacklist """ - blist = PlanetlabNode.blacklist - - self.warn(" Blacklisting malfunctioning node ") - with PlanetlabNode.lock_blist: - blist.append(node) + self.warning(" Blacklisting malfunctioning node ") + self.plapi.blacklist_host(node) + if not self._hostname: + self.set('hostname', None) def _put_node_in_provision(self, node): """ Add node to the list of nodes being provisioned, in order for other RMs to not try to provision the same one again """ - plist = PlanetlabNode.provisionlist - - self.warn(" Provisioning node ") - with PlanetlabNode.lock_plist: - plist.append(node) + self.plapi.reserve_host(node) def _get_ip(self, node_id): """ Query PLCAPI for the IP of a node with certain node id """ - ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip']) - ip = ip[0]['ip'] + hostname = self.get("hostname") or \ + self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname'] + try: + ip = sshfuncs.gethostbyname(hostname) + except: + # Fail while trying to find the IP + return None return ip def fail_discovery(self): - self.fail() msg = "Discovery failed. No candidates found for node" self.error(msg) raise RuntimeError, msg - def fail_node_not_alive(self, hostname): - msg = "Node %s not alive, pick another node" % hostname + def fail_node_not_alive(self, hostname=None): + msg = "Node %s not alive" % hostname raise RuntimeError, msg def fail_node_not_available(self, hostname): - msg = "Node %s not available for provisioning, pick another \ - node" % hostname + msg = "Node %s not available for provisioning" % hostname raise RuntimeError, msg def fail_not_enough_nodes(self): msg = "Not enough nodes available for provisioning" raise RuntimeError, msg + def fail_plapi(self): + msg = "Failing while trying to instanciate the PLC API.\nSet the" + \ + " attributes pluser and plpassword." + raise RuntimeError, msg + def valid_connection(self, guid): # TODO: Validate! return True