'maxLoad' : ('load%(timeframe)s', '[value'),
'minCpu' : ('cpu%(timeframe)s', ']value'),
'maxCpu' : ('cpu%(timeframe)s', '[value'),
- }
+ }
+
+ RATE_FACTORS = (
+ # (<tag name>, <weight>, <default>)
+ ('bw%(timeframe)s', -0.001, 1024.0),
+ ('cpu%(timeframe)s', 0.1, 40.0),
+ ('load%(timeframe)s', -0.2, 3.0),
+ ('reliability%(timeframe)s', 1, 100.0),
+ )
DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
DEPENDS_LOGFILE = '/tmp/nepi-depends.log'
def unassign_node(self):
self._node_id = None
- self.__dict__.update(self.__orig_attrs)
+
+ try:
+ orig_attrs = self.__orig_attrs
+ except AttributeError:
+ return
+
+ for key, value in __orig_attrs.iteritems():
+ setattr(self, key, value)
+ del self.__orig_attrs
+ def rate_nodes(self, nodes):
+ rates = collections.defaultdict(int)
+ tags = collections.defaultdict(dict)
+ replacements = {'timeframe':self.timeframe}
+ tagnames = [ tagname % replacements
+ for tagname, weight, default in self.RATE_FACTORS ]
+
+ taginfo = self._api.GetNodeTags(
+ node_id=list(nodes),
+ tagname=tagnames,
+ fields=('node_id','tagname','value'))
+
+ unpack = operator.itemgetter('node_id','tagname','value')
+ for value in taginfo:
+ node, tagname, value = unpack(value)
+ if value and value.lower() != 'n/a':
+ tags[tagname][int(node)] = float(value)
+
+ for tagname, weight, default in self.RATE_FACTORS:
+ taginfo = tags[tagname % replacements].get
+ for node in nodes:
+ rates[node] += weight * taginfo(node,default)
+
+ return map(rates.__getitem__, nodes)
+
def fetch_node_info(self):
orig_attrs = {}
- info = self._api.GetNodes(self._node_id)[0]
+ self._api.StartMulticall()
+ info = self._api.GetNodes(self._node_id)
+ tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value'))
+ info, tags = self._api.FinishMulticall()
+ info = info[0]
+
tags = dict( (t['tagname'],t['value'])
- for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
+ for t in tags )
orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
value = tags[tag]
if hasattr(self, attr):
orig_attrs[attr] = getattr(self, attr)
+ if not value or value.lower() == 'n/a':
+ value = None
setattr(self, attr, value)
if 'peer_id' in info:
orig_attrs['server_key'] = self.server_key
self.server_key = info['ssh_rsa_key']
- self.__orig_attrs = orig_attrs
+ try:
+ self.__orig_attrs
+ except AttributeError:
+ self.__orig_attrs = orig_attrs
def validate(self):
if self.home_path is None:
# PlanetLab has a 15' delay on configuration propagation
# If we're above that delay, the unresponsiveness is not due
# to this delay.
- raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+ if not self.is_alive(verbose=True):
+ raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
# Ensure the node is clean (no apps running that could interfere with operations)
if self.enable_cleanup:
self._yum_dependencies.async_setup_wait()
self._installed = True
- def is_alive(self):
+ def is_alive(self, verbose = False):
# Make sure all the paths are created where
# they have to be created for deployment
(out,err),proc = server.eintr_retry(server.popen_ssh_command)(
ident_key = self.ident_path,
server_key = self.server_key,
timeout = 60,
- err_on_timeout = False
+ err_on_timeout = False,
+ persistent = False
)
if proc.wait():
+ if verbose:
+ self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
return False
elif not err and out.strip() == 'ALIVE':
return True
else:
+ if verbose:
+ self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err)
return False
def destroy(self):
if self.enable_cleanup:
self.do_cleanup()
+ def blacklist(self):
+ if self._node_id:
+ self._logger.warn("Blacklisting malfunctioning node %s", self.hostname)
+ import util
+ util.appendBlacklist(self._node_id)
+
def do_cleanup(self):
if self.testbed().recovering:
# WOW - not now