X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Ftestbeds%2Fplanetlab%2Fnode.py;h=59485ef2cc0e7127d19833913d4fe807cfced220;hb=62aa9b7a5b846baed4dd304e2d8fa442f9f35b51;hp=10fac055df970e1f18a8025c834ee628eb255f03;hpb=7443c3354b8d9baba337660899fecd7f2eaae310;p=nepi.git diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index 10fac055..59485ef2 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -92,7 +92,7 @@ class Node(object): if not api: api = plcapi.PLCAPI() self._api = api - self._sliceapi = sliceapi + self._sliceapi = sliceapi or api # Attributes self.hostname = None @@ -113,7 +113,7 @@ class Node(object): self.maxLoad = None self.min_num_external_ifaces = None self.max_num_external_ifaces = None - self.timeframe = 'm' + self._timeframe = 'w' # Applications and routes add requirements to connected nodes self.required_packages = set() @@ -130,7 +130,8 @@ class Node(object): self.ident_path = None self.server_key = None self.home_path = None - self.enable_cleanup = False + self.enable_proc_cleanup = False + self.enable_home_cleanup = False # Those are filled when an actual node is allocated self._node_id = None @@ -139,6 +140,27 @@ class Node(object): # Logging self._logger = logging.getLogger('nepi.testbeds.planetlab') + + def set_timeframe(self, timeframe): + if timeframe == "latest": + self._timeframe = "" + elif timeframe == "month": + self._timeframe = "m" + elif timeframe == "year": + self._timeframe = "y" + else: + self._timeframe = "w" + + def get_timeframe(self): + if self._timeframe == "": + return "latest" + if self._timeframe == "m": + return "month" + if self._timeframe == "y": + return "year" + return "week" + + timeframe = property(get_timeframe, set_timeframe) def _nepi_testbed_environment_setup_get(self): command = cStringIO.StringIO() @@ -153,8 +175,10 @@ class Node(object): for envval in envvals: command.write(' ; export %s=%s' % (envkey, envval)) return command.getvalue() + def _nepi_testbed_environment_setup_set(self, value): pass + _nepi_testbed_environment_setup = property( _nepi_testbed_environment_setup_get, _nepi_testbed_environment_setup_set) @@ -178,7 +202,7 @@ class Node(object): self._logger.info("Finding candidates for %s", self.make_filter_description()) fields = ('node_id',) - replacements = {'timeframe':self.timeframe} + replacements = {'timeframe':self._timeframe} # get initial candidates (no tag filters) basefilters = self.build_filters({}, self.BASEFILTERS) @@ -199,7 +223,7 @@ class Node(object): candidates = set(map(operator.itemgetter('node_id'), self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra))) - + # filter by tag, one tag at a time applicable = self.applicable_filters for tagfilter in self.TAGFILTERS.iteritems(): @@ -209,12 +233,12 @@ class Node(object): if attr in applicable: tagfilter = rootfilters.copy() tagfilter['tagname'] = tagname % replacements - tagfilter[expr % replacements] = getattr(self,attr) + tagfilter[expr % replacements] = str(getattr(self,attr)) tagfilter['node_id'] = list(candidates) - + candidates &= set(map(operator.itemgetter('node_id'), self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields))) - + # filter by vsys tags - special case since it doesn't follow # the usual semantics if self.required_vsys: @@ -260,14 +284,16 @@ class Node(object): len(ifaces.get(node_id,())) <= self.max_num_external_ifaces ) candidates = set(filter(predicate, candidates)) - + # make sure hostnames are resolvable + hostnames = dict() if candidates: self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates)) - + hostnames = dict(map(operator.itemgetter('node_id','hostname'), - self._api.GetNodes(list(candidates), ['node_id','hostname']) + self._sliceapi.GetNodes(list(candidates), ['node_id','hostname']) )) + def resolvable(node_id): try: addr = socket.gethostbyname(hostnames[node_id]) @@ -278,8 +304,14 @@ class Node(object): maxthreads = 16)) self._logger.info(" Found %s reachable candidates.", len(candidates)) - - return candidates + + for h in hostnames.keys(): + if h not in candidates: + del hostnames[h] + + hostnames = dict((v,k) for k, v in hostnames.iteritems()) + + return hostnames def make_filter_description(self): """ @@ -337,10 +369,10 @@ class Node(object): def rate_nodes(self, nodes): rates = collections.defaultdict(int) tags = collections.defaultdict(dict) - replacements = {'timeframe':self.timeframe} + replacements = {'timeframe':self._timeframe} tagnames = [ tagname % replacements for tagname, weight, default in self.RATE_FACTORS ] - + taginfo = self._sliceapi.GetNodeTags( node_id=list(nodes), tagname=tagnames, @@ -350,7 +382,7 @@ class Node(object): for value in taginfo: node, tagname, value = unpack(value) if value and value.lower() != 'n/a': - tags[tagname][int(node)] = float(value) + tags[tagname][node] = float(value) for tagname, weight, default in self.RATE_FACTORS: taginfo = tags[tagname % replacements].get @@ -372,9 +404,10 @@ class Node(object): orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces self.min_num_external_ifaces = None self.max_num_external_ifaces = None - self.timeframe = 'm' + if not self._timeframe: self._timeframe = 'w' - replacements = {'timeframe':self.timeframe} + replacements = {'timeframe':self._timeframe} + for attr, tag in self.BASEFILTERS.iteritems(): if tag in info: value = info[tag] @@ -393,7 +426,7 @@ class Node(object): if 'peer_id' in info: orig_attrs['site'] = self.site - self.site = self._api.peer_map[info['peer_id']] + self.site = self._sliceapi.peer_map[info['peer_id']] if 'interface_ids' in info: self.min_num_external_ifaces = \ @@ -488,9 +521,11 @@ class Node(object): raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,) # Ensure the node is clean (no apps running that could interfere with operations) - if self.enable_cleanup: - self.do_cleanup() - + if self.enable_proc_cleanup: + self.do_proc_cleanup() + if self.enable_home_cleanup: + self.do_home_cleanup() + def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10): # Wait for the p2p installer if self._yum_dependencies and not self._installed: @@ -525,21 +560,21 @@ class Node(object): return False def destroy(self): - if self.enable_cleanup: - self.do_cleanup() + if self.enable_proc_cleanup: + self.do_proc_cleanup() def blacklist(self): if self._node_id: self._logger.warn("Blacklisting malfunctioning node %s", self.hostname) import util - util.appendBlacklist(self._node_id) + util.appendBlacklist(self.hostname) - def do_cleanup(self): + def do_proc_cleanup(self): if self.testbed().recovering: # WOW - not now return - self._logger.info("Cleaning up %s", self.hostname) + self._logger.info("Cleaning up processes on %s", self.hostname) cmds = [ "sudo -S killall python tcpdump || /bin/true ; " @@ -568,7 +603,36 @@ class Node(object): retry = 3 ) proc.wait() - + + def do_home_cleanup(self): + if self.testbed().recovering: + # WOW - not now + return + + self._logger.info("Cleaning up home on %s", self.hostname) + + cmds = [ + "find . -maxdepth 1 ! -name '.bash*' ! -name '.' -execdir rm -rf {} + " + ] + + for cmd in cmds: + (out,err),proc = server.popen_ssh_command( + # Some apps need two kills + cmd % { + 'slicename' : self.slicename , + }, + host = self.hostname, + port = None, + user = self.slicename, + agent = None, + ident_key = self.ident_path, + server_key = self.server_key, + tty = True, # so that ps -N -T works as advertised... + timeout = 60, + retry = 3 + ) + proc.wait() + def prepare_dependencies(self): # Configure p2p yum dependency installer if self.required_packages and not self._installed: @@ -604,10 +668,10 @@ class Node(object): if len(routes) > MAX_VROUTE_ROUTES: return 'sliceip' - vsys_vnet = ipaddr.IPNetwork(vsys_vnet) + vsys_vnet = ipaddr.IPv4Network(vsys_vnet) for route in routes: - dest, prefix, nexthop, metric = route - dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix)) + dest, prefix, nexthop, metric, device = route + dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix)) nexthop = ipaddr.IPAddress(nexthop) if dest not in vsys_vnet or nexthop not in vsys_vnet: return 'sliceip' @@ -615,7 +679,7 @@ class Node(object): return 'vroute' def format_route(self, route, dev, method, action): - dest, prefix, nexthop, metric = route + dest, prefix, nexthop, metric, device = route if method == 'vroute': return ( "%s %s%s gw %s %s" % (