2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19 from __future__ import print_function
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sfaapi import SFAAPIFactory
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
29 from random import randint
39 class PlanetlabSfaNode(LinuxNode):
40 _rtype = "planetlab::sfa::Node"
41 _help = "Controls a PlanetLab host accessible using a SSH key " \
42 "and provisioned using SFA"
43 _platform = "planetlab"
46 def _register_attributes(cls):
48 sfa_user = Attribute("sfauser", "SFA user",
49 flags = Flags.Credential)
51 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
52 used to generate the user credential")
54 city = Attribute("city", "Constrain location (city) during resource \
55 discovery. May use wildcards.",
58 country = Attribute("country", "Constrain location (country) during \
59 resource discovery. May use wildcards.",
62 region = Attribute("region", "Constrain location (region) during \
63 resource discovery. May use wildcards.",
66 architecture = Attribute("architecture", "Constrain architecture \
67 during resource discovery.",
68 type = Types.Enumerate,
73 operating_system = Attribute("operatingSystem", "Constrain operating \
74 system during resource discovery.",
75 type = Types.Enumerate,
83 min_reliability = Attribute("minReliability", "Constrain reliability \
84 while picking PlanetLab nodes. Specifies a lower \
90 max_reliability = Attribute("maxReliability", "Constrain reliability \
91 while picking PlanetLab nodes. Specifies an upper \
97 min_bandwidth = Attribute("minBandwidth", "Constrain available \
98 bandwidth while picking PlanetLab nodes. \
99 Specifies a lower acceptable bound.",
102 flags = Flags.Filter)
104 max_bandwidth = Attribute("maxBandwidth", "Constrain available \
105 bandwidth while picking PlanetLab nodes. \
106 Specifies an upper acceptable bound.",
109 flags = Flags.Filter)
111 min_load = Attribute("minLoad", "Constrain node load average while \
112 picking PlanetLab nodes. Specifies a lower acceptable \
116 flags = Flags.Filter)
118 max_load = Attribute("maxLoad", "Constrain node load average while \
119 picking PlanetLab nodes. Specifies an upper acceptable \
123 flags = Flags.Filter)
125 min_cpu = Attribute("minCpu", "Constrain available cpu time while \
126 picking PlanetLab nodes. Specifies a lower acceptable \
130 flags = Flags.Filter)
132 max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
133 picking PlanetLab nodes. Specifies an upper acceptable \
137 flags = Flags.Filter)
139 timeframe = Attribute("timeframe", "Past time period in which to check\
140 information about the node. Values are year,month, \
143 type = Types.Enumerate,
148 flags = Flags.Filter)
150 plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
151 in the user's home directory under .nepi directory. This file \
152 contains a list of PL nodes to blacklist, and at the end \
153 of the experiment execution the new blacklisted nodes are added.",
156 flags = Flags.Global)
158 cls._register_attribute(sfa_user)
159 cls._register_attribute(sfa_private_key)
160 cls._register_attribute(city)
161 cls._register_attribute(country)
162 cls._register_attribute(region)
163 cls._register_attribute(architecture)
164 cls._register_attribute(operating_system)
165 cls._register_attribute(min_reliability)
166 cls._register_attribute(max_reliability)
167 cls._register_attribute(min_bandwidth)
168 cls._register_attribute(max_bandwidth)
169 cls._register_attribute(min_load)
170 cls._register_attribute(max_load)
171 cls._register_attribute(min_cpu)
172 cls._register_attribute(max_cpu)
173 cls._register_attribute(timeframe)
174 cls._register_attribute(plblacklist)
176 def __init__(self, ec, guid):
177 super(PlanetlabSfaNode, self).__init__(ec, guid)
179 self._ecobj = weakref.ref(ec)
181 self._node_to_provision = None
182 self._slicenode = False
183 self._hostname = False
185 if self.get("gateway") or self.get("gatewayUser"):
186 self.set("gateway", None)
187 self.set("gatewayUser", None)
189 # Blacklist file for PL nodes
190 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
191 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
192 if not os.path.exists(plblacklist_file):
193 if os.path.isdir(nepi_home):
194 with open(plblacklist_file, 'w') as clear:
197 os.makedirs(nepi_home)
198 with open(plblacklist_file, 'w') as clear:
201 def _skip_provision(self):
202 sfa_user = self.get("sfauser")
210 Property to instanciate the SFA API based in sfi client.
211 For each SFA method called this instance is used.
214 sfa_user = self.get("sfauser")
215 sfa_sm = "http://sfa3.planet-lab.eu:12346/"
216 sfa_auth = '.'.join(sfa_user.split('.')[:2])
217 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
218 sfa_private_key = self.get("sfaPrivateKey")
220 _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
221 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
226 self._sfaapi = weakref.ref(_sfaapi)
228 return self._sfaapi()
230 def do_discover(self):
232 Based on the attributes defined by the user, discover the suitable
235 if self._skip_provision():
236 super(PlanetlabSfaNode, self).do_discover()
239 nodes = self.sfaapi.get_resources_hrn()
241 hostname = self._get_hostname()
243 # the user specified one particular node to be provisioned
244 self._hostname = True
245 host_hrn = nodes[hostname]
247 # check that the node is not blacklisted or being provisioned
249 if not self._blacklisted(host_hrn) and not self._reserved(host_hrn):
250 # Node in reservation
251 ping_ok = self._do_ping(hostname)
253 self._blacklist_node(host_hrn)
254 self.fail_node_not_alive(hostname)
256 if self._check_if_in_slice([host_hrn]):
257 self.debug("The node %s is already in the slice" % hostname)
258 self._slicenode = True
259 self._node_to_provision = host_hrn
261 self.fail_node_not_available(hostname)
262 super(PlanetlabSfaNode, self).do_discover()
265 hosts_hrn = nodes.values()
266 nodes_inslice = self._check_if_in_slice(hosts_hrn)
267 nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice))
270 host_hrn = self._choose_random_node(nodes, nodes_inslice)
271 self._slicenode = True
274 # Either there were no matching nodes in the user's slice, or
275 # the nodes in the slice were blacklisted or being provisioned
276 # by other RM. Note nodes_not_inslice is never empty
277 host_hrn = self._choose_random_node(nodes, nodes_not_inslice)
278 self._slicenode = False
281 self._node_to_provision = host_hrn
283 self._set_hostname_attr(host_hrn)
284 self.info(" Selected node to provision ")
285 super(PlanetlabSfaNode, self).do_discover()
287 self._blacklist_node(host_hrn)
290 self.fail_not_enough_nodes()
292 def _blacklisted(self, host_hrn):
294 Check in the SFA API that the node is not in the blacklist.
296 if self.sfaapi.blacklisted(host_hrn):
300 def _reserved(self, host_hrn):
302 Check in the SFA API that the node is not in the reserved
305 if self.sfaapi.reserved(host_hrn):
309 def do_provision(self):
311 Add node to user's slice and verifing that the node is functioning
312 correctly. Check ssh, file system.
314 if self._skip_provision():
315 super(PlanetlabSfaNode, self).do_provision()
323 while not provision_ok:
324 node = self._node_to_provision
325 if not self._slicenode:
326 self._add_node_to_slice(node)
328 # check ssh connection
330 while t < timeout and not ssh_ok:
332 cmd = 'echo \'GOOD NODE\''
333 ((out, err), proc) = self.execute(cmd)
334 if out.find("GOOD NODE") < 0:
335 self.debug( "No SSH connection, waiting 60s" )
340 self.debug( "SSH OK" )
344 cmd = 'echo \'GOOD NODE\''
345 ((out, err), proc) = self.execute(cmd)
346 if not out.find("GOOD NODE") < 0:
350 # the timeout was reach without establishing ssh connection
351 # the node is blacklisted, deleted from the slice, and a new
352 # node to provision is discovered
353 self.warning(" Could not SSH login ")
354 self._blacklist_node(node)
358 # check /proc directory is mounted (ssh_ok = True)
359 # and file system is not read only
361 cmd = 'mount |grep proc'
362 ((out1, err1), proc1) = self.execute(cmd)
363 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
364 ((out2, err2), proc2) = self.execute(cmd)
365 if out1.find("/proc type proc") < 0 or \
366 "Read-only file system".lower() in err2.lower():
367 self.warning(" Corrupted file system ")
368 self._blacklist_node(node)
374 if not self.get('hostname'):
375 self._set_hostname_attr(node)
376 self.info(" Node provisioned ")
378 super(PlanetlabSfaNode, self).do_provision()
380 def do_release(self):
381 super(PlanetlabSfaNode, self).do_release()
382 if self.state == ResourceState.RELEASED and not self._skip_provision():
383 self.debug(" Releasing SFA API ")
384 self.sfaapi.release()
386 # def _filter_based_on_attributes(self):
388 # Retrive the list of nodes hrn that match user's constraints
390 # # Map user's defined attributes with tagnames of PlanetLab
391 # timeframe = self.get("timeframe")[0]
394 # 'country' : 'country',
395 # 'region' : 'region',
396 # 'architecture' : 'arch',
397 # 'operatingSystem' : 'fcdistro',
398 # 'minReliability' : 'reliability%s' % timeframe,
399 # 'maxReliability' : 'reliability%s' % timeframe,
400 # 'minBandwidth' : 'bw%s' % timeframe,
401 # 'maxBandwidth' : 'bw%s' % timeframe,
402 # 'minLoad' : 'load%s' % timeframe,
403 # 'maxLoad' : 'load%s' % timeframe,
404 # 'minCpu' : 'cpu%s' % timeframe,
405 # 'maxCpu' : 'cpu%s' % timeframe,
411 # for attr_name, attr_obj in self._attrs.iteritems():
412 # attr_value = self.get(attr_name)
414 # if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
415 # attr_name != 'timeframe':
417 # attr_tag = attr_to_tags[attr_name]
418 # filters['tagname'] = attr_tag
420 # # filter nodes by fixed constraints e.g. operating system
421 # if not 'min' in attr_name and not 'max' in attr_name:
422 # filters['value'] = attr_value
423 # nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
425 # # filter nodes by range constraints e.g. max bandwidth
426 # elif ('min' or 'max') in attr_name:
427 # nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
430 # nodes = self.sfaapi.get_resources_hrn()
432 # nodes_hrn.append(node[node.key()])
435 # def _filter_by_fixed_attr(self, filters, nodes_hrn):
437 # Query SFA API for nodes matching fixed attributes defined by the
441 ## node_tags = self.sfaapi.get_resources_tags(filters)
442 ## if node_tags is not None:
444 ## if len(nodes_id) == 0:
445 ## # first attribute being matched
446 ## for node_tag in node_tags:
447 ## nodes_id.append(node_tag['node_id'])
449 ## # remove the nodes ids that don't match the new attribute
450 ## # that is being match
453 ## for node_tag in node_tags:
454 ## if node_tag['node_id'] in nodes_id:
455 ## nodes_id_tmp.append(node_tag['node_id'])
457 ## if len(nodes_id_tmp):
458 ## nodes_id = set(nodes_id) & set(nodes_id_tmp)
460 ## # no node from before match the new constraint
461 ## self.fail_discovery()
463 ## # no nodes match the filter applied
464 ## self.fail_discovery()
468 # def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
470 # Query PLCAPI for nodes ids matching attributes defined in a certain
474 ## node_tags = self.plapi.get_node_tags(filters)
477 ## if len(nodes_id) == 0:
478 ## # first attribute being matched
479 ## for node_tag in node_tags:
481 ## # check that matches the min or max restriction
482 ## if 'min' in attr_name and node_tag['value'] != 'n/a' and \
483 ## float(node_tag['value']) > attr_value:
484 ## nodes_id.append(node_tag['node_id'])
486 ## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
487 ## float(node_tag['value']) < attr_value:
488 ## nodes_id.append(node_tag['node_id'])
491 ## # remove the nodes ids that don't match the new attribute
492 ## # that is being match
494 ## for node_tag in node_tags:
496 ## # check that matches the min or max restriction and was a
497 ## # matching previous filters
498 ## if 'min' in attr_name and node_tag['value'] != 'n/a' and \
499 ## float(node_tag['value']) > attr_value and \
500 ## node_tag['node_id'] in nodes_id:
501 ## nodes_id_tmp.append(node_tag['node_id'])
503 ## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
504 ## float(node_tag['value']) < attr_value and \
505 ## node_tag['node_id'] in nodes_id:
506 ## nodes_id_tmp.append(node_tag['node_id'])
508 ## if len(nodes_id_tmp):
509 ## nodes_id = set(nodes_id) & set(nodes_id_tmp)
511 ## # no node from before match the new constraint
512 ## self.fail_discovery()
515 ## # no nodes match the filter applied
516 ## self.fail_discovery()
520 def _choose_random_node(self, nodes, hosts_hrn):
522 From the possible nodes for provision, choose randomly to decrese the
523 probability of different RMs choosing the same node for provision
525 size = len(hosts_hrn)
528 index = randint(0, size)
529 host_hrn = hosts_hrn[index]
530 hosts_hrn[index] = hosts_hrn[size]
532 # check the node is not blacklisted or being provision by other RM
533 # and perform ping to check that is really alive
534 if not self._blacklisted(host_hrn):
535 if not self._reserved(host_hrn):
536 print(self.sfaapi._reserved ,self.guid)
537 for hostname, hrn in nodes.items():
539 print('hostname' ,hostname)
540 ping_ok = self._do_ping(hostname)
543 self._set_hostname_attr(hostname)
544 self.warning(" Node not responding PING ")
545 self._blacklist_node(host_hrn)
547 # discovered node for provision, added to provision list
548 self._node_to_provision = host_hrn
551 # def _get_nodes_id(self, filters=None):
552 # return self.plapi.get_nodes(filters, fields=['node_id'])
554 def _add_node_to_slice(self, host_hrn):
556 Add node to slice, using SFA API.
558 self.info(" Adding node to slice ")
559 slicename = self.get("username").replace('_', '.')
560 slicename = 'ple.' + slicename
561 self.sfaapi.add_resource_to_slice(slicename, host_hrn)
563 def _delete_from_slice(self):
565 Delete every node from slice, using SFA API.
566 Sfi client doesn't work for particular node urns.
568 self.warning(" Deleting node from slice ")
569 slicename = self.get("username").replace('_', '.')
570 slicename = 'ple.' + slicename
571 self.sfaapi.remove_all_from_slice(slicename)
573 def _get_hostname(self):
575 Get the attribute hostname.
577 hostname = self.get("hostname")
583 def _set_hostname_attr(self, node):
585 Query SFAAPI for the hostname of a certain host hrn and sets the
586 attribute hostname, it will over write the previous value.
588 hosts_hrn = self.sfaapi.get_resources_hrn()
589 for hostname, hrn in hosts_hrn.items():
591 self.set("hostname", hostname)
593 def _check_if_in_slice(self, hosts_hrn):
595 Check using SFA API if any host hrn from hosts_hrn is in the user's
598 slicename = self.get("username").replace('_', '.')
599 slicename = 'ple.' + slicename
600 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
602 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
603 else: slice_nodes_hrn = []
604 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
607 def _do_ping(self, hostname):
609 Perform ping command on node's IP matching hostname.
612 ip = self._get_ip(hostname)
614 command = "ping -c4 %s" % ip
615 (out, err) = lexec(command)
617 m = re.search("(\d+)% packet loss", str(out))
618 if m and int(m.groups()[0]) < 50:
623 def _blacklist_node(self, host_hrn):
625 Add mal functioning node to blacklist (in SFA API).
627 self.warning(" Blacklisting malfunctioning node ")
628 self.sfaapi.blacklist_resource(host_hrn)
629 if not self._hostname:
630 self.set('hostname', None)
632 def _reserve(self, host_hrn):
634 Add node to the list of nodes being provisioned, in order for other RMs
635 to not try to provision the same one again.
637 self.sfaapi.reserve_resource(host_hrn)
639 def _get_ip(self, hostname):
641 Query cache for the IP of a node with certain hostname
644 ip = sshfuncs.gethostbyname(hostname)
646 # Fail while trying to find the IP
650 def fail_discovery(self):
651 msg = "Discovery failed. No candidates found for node"
653 raise RuntimeError(msg)
655 def fail_node_not_alive(self, hostname=None):
656 msg = "Node %s not alive" % hostname
657 raise RuntimeError(msg)
659 def fail_node_not_available(self, hostname):
660 msg = "Node %s not available for provisioning" % hostname
661 raise RuntimeError(msg)
663 def fail_not_enough_nodes(self):
664 msg = "Not enough nodes available for provisioning"
665 raise RuntimeError(msg)
667 def fail_sfaapi(self):
668 msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
669 " attributes sfauser and sfaPrivateKey."
670 raise RuntimeError(msg)
672 def valid_connection(self, guid):