2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
28 from random import randint
37 class PlanetlabSfaNode(LinuxNode):
38 _rtype = "PlanetlabSfaNode"
39 _help = "Controls a PlanetLab host accessible using a SSH key " \
40 "and provisioned using SFA"
41 _backend = "planetlab"
44 def _register_attributes(cls):
46 sfa_user = Attribute("sfauser", "SFA user",
47 flags = Flags.Credential)
49 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50 used to generate the user credential")
52 city = Attribute("city", "Constrain location (city) during resource \
53 discovery. May use wildcards.",
56 country = Attribute("country", "Constrain location (country) during \
57 resource discovery. May use wildcards.",
60 region = Attribute("region", "Constrain location (region) during \
61 resource discovery. May use wildcards.",
64 architecture = Attribute("architecture", "Constrain architecture \
65 during resource discovery.",
66 type = Types.Enumerate,
71 operating_system = Attribute("operatingSystem", "Constrain operating \
72 system during resource discovery.",
73 type = Types.Enumerate,
81 min_reliability = Attribute("minReliability", "Constrain reliability \
82 while picking PlanetLab nodes. Specifies a lower \
88 max_reliability = Attribute("maxReliability", "Constrain reliability \
89 while picking PlanetLab nodes. Specifies an upper \
95 min_bandwidth = Attribute("minBandwidth", "Constrain available \
96 bandwidth while picking PlanetLab nodes. \
97 Specifies a lower acceptable bound.",
100 flags = Flags.Filter)
102 max_bandwidth = Attribute("maxBandwidth", "Constrain available \
103 bandwidth while picking PlanetLab nodes. \
104 Specifies an upper acceptable bound.",
107 flags = Flags.Filter)
109 min_load = Attribute("minLoad", "Constrain node load average while \
110 picking PlanetLab nodes. Specifies a lower acceptable \
114 flags = Flags.Filter)
116 max_load = Attribute("maxLoad", "Constrain node load average while \
117 picking PlanetLab nodes. Specifies an upper acceptable \
121 flags = Flags.Filter)
123 min_cpu = Attribute("minCpu", "Constrain available cpu time while \
124 picking PlanetLab nodes. Specifies a lower acceptable \
128 flags = Flags.Filter)
130 max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
131 picking PlanetLab nodes. Specifies an upper acceptable \
135 flags = Flags.Filter)
137 timeframe = Attribute("timeframe", "Past time period in which to check\
138 information about the node. Values are year,month, \
141 type = Types.Enumerate,
146 flags = Flags.Filter)
148 plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
149 in the user's home directory under .nepi directory. This file \
150 contains a list of PL nodes to blacklist, and at the end \
151 of the experiment execution the new blacklisted nodes are added.",
154 flags = Flags.Global)
156 cls._register_attribute(sfa_user)
157 cls._register_attribute(sfa_private_key)
158 cls._register_attribute(city)
159 cls._register_attribute(country)
160 cls._register_attribute(region)
161 cls._register_attribute(architecture)
162 cls._register_attribute(operating_system)
163 cls._register_attribute(min_reliability)
164 cls._register_attribute(max_reliability)
165 cls._register_attribute(min_bandwidth)
166 cls._register_attribute(max_bandwidth)
167 cls._register_attribute(min_load)
168 cls._register_attribute(max_load)
169 cls._register_attribute(min_cpu)
170 cls._register_attribute(max_cpu)
171 cls._register_attribute(timeframe)
172 cls._register_attribute(plblacklist)
174 def __init__(self, ec, guid):
175 super(PlanetlabSfaNode, self).__init__(ec, guid)
177 self._ecobj = weakref.ref(ec)
179 self._node_to_provision = None
180 self._slicenode = False
181 self._hostname = False
183 if self.get("gateway") or self.get("gatewayUser"):
184 self.set("gateway", None)
185 self.set("gatewayUser", None)
187 def _skip_provision(self):
188 sfa_user = self.get("sfauser")
196 sfa_user = self.get("sfauser")
197 sfa_sm = "http://sfa3.planet-lab.eu:12346/"
198 sfa_auth = '.'.join(sfa_user.split('.')[:2])
199 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
200 sfa_private_key = self.get("sfaPrivateKey")
202 _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
203 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
208 self._sfaapi = weakref.ref(_sfaapi)
210 return self._sfaapi()
212 def do_discover(self):
214 Based on the attributes defined by the user, discover the suitable
217 if self._skip_provision():
218 super(PlanetlabSfaNode, self).do_discover()
221 nodes = self.sfaapi.get_resources_hrn()
223 hostname = self._get_hostname()
225 # the user specified one particular node to be provisioned
226 self._hostname = True
227 host_hrn = nodes[hostname]
229 # check that the node is not blacklisted or being provisioned
231 if not self._blacklisted(host_hrn):
232 if not self._reserved(host_hrn):
233 # Node in reservation
234 ping_ok = self._do_ping(hostname)
236 self._blacklist_node(host_hrn)
237 self.fail_node_not_alive(hostname)
239 if self._check_if_in_slice([host_hrn]):
240 self.debug("The node %s is already in the slice" % hostname)
241 self._slicenode = True
242 self._node_to_provision = host_hrn
243 super(PlanetlabSfaNode, self).do_discover()
246 # # the user specifies constraints based on attributes, zero, one or
247 # # more nodes can match these constraints
248 # nodes = self._filter_based_on_attributes()
250 # # nodes that are already part of user's slice have the priority to
252 # nodes_inslice = self._check_if_in_slice(nodes)
253 # nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
257 # node_id = self._choose_random_node(nodes_inslice)
258 # self._slicenode = True
261 # # Either there were no matching nodes in the user's slice, or
262 # # the nodes in the slice were blacklisted or being provisioned
263 # # by other RM. Note nodes_not_inslice is never empty
264 # node_id = self._choose_random_node(nodes_not_inslice)
265 # self._slicenode = False
268 # self._node_to_provision = node_id
270 # self._set_hostname_attr(node_id)
271 # self.info(" Selected node to provision ")
272 # super(PlanetlabSfaNode, self).do_discover()
274 # with PlanetlabSfaNode.lock:
275 # self._blacklist_node(node_id)
278 # self.fail_not_enough_nodes()
280 def _blacklisted(self, host_hrn):
281 if self.sfaapi.blacklisted(host_hrn):
282 self.fail_node_not_available(host_hrn)
285 def _reserved(self, host_hrn):
286 if self.sfaapi.reserved(host_hrn):
287 self.fail_node_not_available(host_hrn)
290 def do_provision(self):
292 Add node to user's slice after verifing that the node is functioning
295 if self._skip_provision():
296 super(PlanetlabSfaNode, self).do_provision()
304 while not provision_ok:
305 node = self._node_to_provision
306 if not self._slicenode:
307 self._add_node_to_slice(node)
309 # check ssh connection
311 while t < timeout and not ssh_ok:
313 cmd = 'echo \'GOOD NODE\''
314 ((out, err), proc) = self.execute(cmd)
315 if out.find("GOOD NODE") < 0:
316 self.debug( "No SSH connection, waiting 60s" )
321 self.debug( "SSH OK" )
325 cmd = 'echo \'GOOD NODE\''
326 ((out, err), proc) = self.execute(cmd)
327 if not out.find("GOOD NODE") < 0:
331 # the timeout was reach without establishing ssh connection
332 # the node is blacklisted, deleted from the slice, and a new
333 # node to provision is discovered
334 self.warning(" Could not SSH login ")
335 self._blacklist_node(node)
339 # check /proc directory is mounted (ssh_ok = True)
340 # and file system is not read only
342 cmd = 'mount |grep proc'
343 ((out1, err1), proc1) = self.execute(cmd)
344 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
345 ((out2, err2), proc2) = self.execute(cmd)
346 if out1.find("/proc type proc") < 0 or \
347 "Read-only file system".lower() in err2.lower():
348 self.warning(" Corrupted file system ")
349 self._blacklist_node(node)
355 if not self.get('hostname'):
356 self._set_hostname_attr(node)
357 self.info(" Node provisioned ")
359 super(PlanetlabSfaNode, self).do_provision()
361 # def _filter_based_on_attributes(self):
363 # Retrive the list of nodes hrn that match user's constraints
365 # # Map user's defined attributes with tagnames of PlanetLab
366 # timeframe = self.get("timeframe")[0]
369 # 'country' : 'country',
370 # 'region' : 'region',
371 # 'architecture' : 'arch',
372 # 'operatingSystem' : 'fcdistro',
373 # 'minReliability' : 'reliability%s' % timeframe,
374 # 'maxReliability' : 'reliability%s' % timeframe,
375 # 'minBandwidth' : 'bw%s' % timeframe,
376 # 'maxBandwidth' : 'bw%s' % timeframe,
377 # 'minLoad' : 'load%s' % timeframe,
378 # 'maxLoad' : 'load%s' % timeframe,
379 # 'minCpu' : 'cpu%s' % timeframe,
380 # 'maxCpu' : 'cpu%s' % timeframe,
386 # for attr_name, attr_obj in self._attrs.iteritems():
387 # attr_value = self.get(attr_name)
389 # if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
390 # attr_name != 'timeframe':
392 # attr_tag = attr_to_tags[attr_name]
393 # filters['tagname'] = attr_tag
395 # # filter nodes by fixed constraints e.g. operating system
396 # if not 'min' in attr_name and not 'max' in attr_name:
397 # filters['value'] = attr_value
398 # nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
400 # # filter nodes by range constraints e.g. max bandwidth
401 # elif ('min' or 'max') in attr_name:
402 # nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
405 # nodes = self.sfaapi.get_resources_hrn()
407 # nodes_hrn.append(node[node.key()])
410 # def _filter_by_fixed_attr(self, filters, nodes_hrn):
412 # Query SFA API for nodes matching fixed attributes defined by the
416 ## node_tags = self.sfaapi.get_resources_tags(filters)
417 ## if node_tags is not None:
419 ## if len(nodes_id) == 0:
420 ## # first attribute being matched
421 ## for node_tag in node_tags:
422 ## nodes_id.append(node_tag['node_id'])
424 ## # remove the nodes ids that don't match the new attribute
425 ## # that is being match
428 ## for node_tag in node_tags:
429 ## if node_tag['node_id'] in nodes_id:
430 ## nodes_id_tmp.append(node_tag['node_id'])
432 ## if len(nodes_id_tmp):
433 ## nodes_id = set(nodes_id) & set(nodes_id_tmp)
435 ## # no node from before match the new constraint
436 ## self.fail_discovery()
438 ## # no nodes match the filter applied
439 ## self.fail_discovery()
443 # def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
445 # Query PLCAPI for nodes ids matching attributes defined in a certain
449 ## node_tags = self.plapi.get_node_tags(filters)
452 ## if len(nodes_id) == 0:
453 ## # first attribute being matched
454 ## for node_tag in node_tags:
456 ## # check that matches the min or max restriction
457 ## if 'min' in attr_name and node_tag['value'] != 'n/a' and \
458 ## float(node_tag['value']) > attr_value:
459 ## nodes_id.append(node_tag['node_id'])
461 ## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
462 ## float(node_tag['value']) < attr_value:
463 ## nodes_id.append(node_tag['node_id'])
466 ## # remove the nodes ids that don't match the new attribute
467 ## # that is being match
469 ## for node_tag in node_tags:
471 ## # check that matches the min or max restriction and was a
472 ## # matching previous filters
473 ## if 'min' in attr_name and node_tag['value'] != 'n/a' and \
474 ## float(node_tag['value']) > attr_value and \
475 ## node_tag['node_id'] in nodes_id:
476 ## nodes_id_tmp.append(node_tag['node_id'])
478 ## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
479 ## float(node_tag['value']) < attr_value and \
480 ## node_tag['node_id'] in nodes_id:
481 ## nodes_id_tmp.append(node_tag['node_id'])
483 ## if len(nodes_id_tmp):
484 ## nodes_id = set(nodes_id) & set(nodes_id_tmp)
486 ## # no node from before match the new constraint
487 ## self.fail_discovery()
490 ## # no nodes match the filter applied
491 ## self.fail_discovery()
495 # def _choose_random_node(self, nodes):
497 # From the possible nodes for provision, choose randomly to decrese the
498 # probability of different RMs choosing the same node for provision
503 # index = randint(0, size)
504 # node_id = nodes[index]
505 # nodes[index] = nodes[size]
507 # # check the node is not blacklisted or being provision by other RM
508 # # and perform ping to check that is really alive
509 # with PlanetlabNode.lock:
511 # blist = self.plapi.blacklisted()
512 # plist = self.plapi.reserved()
513 # if node_id not in blist and node_id not in plist:
514 # ping_ok = self._do_ping(node_id)
516 # self._set_hostname_attr(node_id)
517 # self.warn(" Node not responding PING ")
518 # self._blacklist_node(node_id)
520 # # discovered node for provision, added to provision list
521 # self._put_node_in_provision(node_id)
524 # def _get_nodes_id(self, filters=None):
525 # return self.plapi.get_nodes(filters, fields=['node_id'])
527 def _add_node_to_slice(self, host_hrn):
528 self.info(" Adding node to slice ")
529 slicename = self.get("username").replace('_', '.')
530 slicename = 'ple.' + slicename
531 self.sfaapi.add_resource_to_slice(slicename, host_hrn)
533 def _delete_from_slice(self):
534 self.warning(" Deleting node from slice ")
535 slicename = self.get("username").replace('_', '.')
536 slicename = 'ple.' + slicename
537 self.sfaapi.remove_all_from_slice(slicename)
539 def _get_hostname(self):
540 hostname = self.get("hostname")
546 def _set_hostname_attr(self, node):
548 Query SFAAPI for the hostname of a certain host hrn and sets the
549 attribute hostname, it will over write the previous value
551 hosts_hrn = self.sfaapi.get_resources_hrn()
552 for hostname, hrn in hosts_hrn.iteritems():
554 self.set("hostname", hostname)
556 def _check_if_in_slice(self, hosts_hrn):
558 Check using SFA API if any host hrn from hosts_hrn is in the user's
561 slicename = self.get("username").replace('_', '.')
562 slicename = 'ple.' + slicename
563 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
565 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
566 else: slice_nodes_hrn = []
567 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
570 def _do_ping(self, hostname):
572 Perform ping command on node's IP matching hostname
575 ip = self._get_ip(hostname)
577 command = "ping -c4 %s" % ip
578 (out, err) = lexec(command)
580 m = re.search("(\d+)% packet loss", str(out))
581 if m and int(m.groups()[0]) < 50:
586 def _blacklist_node(self, host_hrn):
588 Add node mal functioning node to blacklist
590 self.warning(" Blacklisting malfunctioning node ")
591 self.sfaapi.blacklist_resource(host_hrn)
592 if not self._hostname:
593 self.set('hostname', None)
595 def _reserve(self, host_hrn):
597 Add node to the list of nodes being provisioned, in order for other RMs
598 to not try to provision the same one again.
600 self.sfaapi.reserve_resource(host_hrn)
602 def _get_ip(self, hostname):
604 Query cache for the IP of a node with certain hostname
607 ip = sshfuncs.gethostbyname(hostname)
609 # Fail while trying to find the IP
613 def fail_discovery(self):
614 msg = "Discovery failed. No candidates found for node"
616 raise RuntimeError, msg
618 def fail_node_not_alive(self, hostname=None):
619 msg = "Node %s not alive" % hostname
620 raise RuntimeError, msg
622 def fail_node_not_available(self, hostname):
623 msg = "Node %s not available for provisioning" % hostname
624 raise RuntimeError, msg
626 def fail_not_enough_nodes(self):
627 msg = "Not enough nodes available for provisioning"
628 raise RuntimeError, msg
630 def fail_sfaapi(self):
631 msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
632 " attributes sfauser and sfaPrivateKey."
633 raise RuntimeError, msg
635 def valid_connection(self, guid):