2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19 from __future__ import print_function
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sfaapi import SFAAPIFactory
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
29 from random import randint
39 class PlanetlabSfaNode(LinuxNode):
40 _rtype = "planetlab::sfa::Node"
41 _help = "Controls a PlanetLab host accessible using a SSH key " \
42 "and provisioned using SFA"
43 _platform = "planetlab"
46 def _register_attributes(cls):
48 sfa_user = Attribute("sfauser", "SFA user",
49 flags = Flags.Credential)
51 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
52 used to generate the user credential")
54 city = Attribute("city", "Constrain location (city) during resource \
55 discovery. May use wildcards.",
58 country = Attribute("country", "Constrain location (country) during \
59 resource discovery. May use wildcards.",
62 region = Attribute("region", "Constrain location (region) during \
63 resource discovery. May use wildcards.",
66 architecture = Attribute("architecture", "Constrain architecture \
67 during resource discovery.",
68 type = Types.Enumerate,
73 operating_system = Attribute("operatingSystem", "Constrain operating \
74 system during resource discovery.",
75 type = Types.Enumerate,
83 min_reliability = Attribute("minReliability", "Constrain reliability \
84 while picking PlanetLab nodes. Specifies a lower \
90 max_reliability = Attribute("maxReliability", "Constrain reliability \
91 while picking PlanetLab nodes. Specifies an upper \
97 min_bandwidth = Attribute("minBandwidth", "Constrain available \
98 bandwidth while picking PlanetLab nodes. \
99 Specifies a lower acceptable bound.",
102 flags = Flags.Filter)
104 max_bandwidth = Attribute("maxBandwidth", "Constrain available \
105 bandwidth while picking PlanetLab nodes. \
106 Specifies an upper acceptable bound.",
109 flags = Flags.Filter)
111 min_load = Attribute("minLoad", "Constrain node load average while \
112 picking PlanetLab nodes. Specifies a lower acceptable \
116 flags = Flags.Filter)
118 max_load = Attribute("maxLoad", "Constrain node load average while \
119 picking PlanetLab nodes. Specifies an upper acceptable \
123 flags = Flags.Filter)
125 min_cpu = Attribute("minCpu", "Constrain available cpu time while \
126 picking PlanetLab nodes. Specifies a lower acceptable \
130 flags = Flags.Filter)
132 max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
133 picking PlanetLab nodes. Specifies an upper acceptable \
137 flags = Flags.Filter)
139 timeframe = Attribute("timeframe", "Past time period in which to check\
140 information about the node. Values are year,month, \
143 type = Types.Enumerate,
148 flags = Flags.Filter)
150 plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
151 in the user's home directory under .nepi directory. This file \
152 contains a list of PL nodes to blacklist, and at the end \
153 of the experiment execution the new blacklisted nodes are added.",
156 flags = Flags.Global)
158 cls._register_attribute(sfa_user)
159 cls._register_attribute(sfa_private_key)
160 cls._register_attribute(city)
161 cls._register_attribute(country)
162 cls._register_attribute(region)
163 cls._register_attribute(architecture)
164 cls._register_attribute(operating_system)
165 cls._register_attribute(min_reliability)
166 cls._register_attribute(max_reliability)
167 cls._register_attribute(min_bandwidth)
168 cls._register_attribute(max_bandwidth)
169 cls._register_attribute(min_load)
170 cls._register_attribute(max_load)
171 cls._register_attribute(min_cpu)
172 cls._register_attribute(max_cpu)
173 cls._register_attribute(timeframe)
174 cls._register_attribute(plblacklist)
176 def __init__(self, ec, guid):
177 super(PlanetlabSfaNode, self).__init__(ec, guid)
179 self._ecobj = weakref.ref(ec)
181 self._node_to_provision = None
182 self._slicenode = False
183 self._hostname = False
185 if self.get("gateway") or self.get("gatewayUser"):
186 self.set("gateway", None)
187 self.set("gatewayUser", None)
189 # Blacklist file for PL nodes
190 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
191 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
192 if not os.path.exists(plblacklist_file):
193 if os.path.isdir(nepi_home):
194 open(plblacklist_file, 'w').close()
196 os.makedirs(nepi_home)
197 open(plblacklist_file, 'w').close()
199 def _skip_provision(self):
200 sfa_user = self.get("sfauser")
208 Property to instanciate the SFA API based in sfi client.
209 For each SFA method called this instance is used.
212 sfa_user = self.get("sfauser")
213 sfa_sm = "http://sfa3.planet-lab.eu:12346/"
214 sfa_auth = '.'.join(sfa_user.split('.')[:2])
215 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
216 sfa_private_key = self.get("sfaPrivateKey")
218 _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
219 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
224 self._sfaapi = weakref.ref(_sfaapi)
226 return self._sfaapi()
228 def do_discover(self):
230 Based on the attributes defined by the user, discover the suitable
233 if self._skip_provision():
234 super(PlanetlabSfaNode, self).do_discover()
237 nodes = self.sfaapi.get_resources_hrn()
239 hostname = self._get_hostname()
241 # the user specified one particular node to be provisioned
242 self._hostname = True
243 host_hrn = nodes[hostname]
245 # check that the node is not blacklisted or being provisioned
247 if not self._blacklisted(host_hrn) and not self._reserved(host_hrn):
248 # Node in reservation
249 ping_ok = self._do_ping(hostname)
251 self._blacklist_node(host_hrn)
252 self.fail_node_not_alive(hostname)
254 if self._check_if_in_slice([host_hrn]):
255 self.debug("The node %s is already in the slice" % hostname)
256 self._slicenode = True
257 self._node_to_provision = host_hrn
259 self.fail_node_not_available(hostname)
260 super(PlanetlabSfaNode, self).do_discover()
263 hosts_hrn = nodes.values()
264 nodes_inslice = self._check_if_in_slice(hosts_hrn)
265 nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice))
268 host_hrn = self._choose_random_node(nodes, nodes_inslice)
269 self._slicenode = True
272 # Either there were no matching nodes in the user's slice, or
273 # the nodes in the slice were blacklisted or being provisioned
274 # by other RM. Note nodes_not_inslice is never empty
275 host_hrn = self._choose_random_node(nodes, nodes_not_inslice)
276 self._slicenode = False
279 self._node_to_provision = host_hrn
281 self._set_hostname_attr(host_hrn)
282 self.info(" Selected node to provision ")
283 super(PlanetlabSfaNode, self).do_discover()
285 self._blacklist_node(host_hrn)
288 self.fail_not_enough_nodes()
290 def _blacklisted(self, host_hrn):
292 Check in the SFA API that the node is not in the blacklist.
294 if self.sfaapi.blacklisted(host_hrn):
298 def _reserved(self, host_hrn):
300 Check in the SFA API that the node is not in the reserved
303 if self.sfaapi.reserved(host_hrn):
307 def do_provision(self):
309 Add node to user's slice and verifing that the node is functioning
310 correctly. Check ssh, file system.
312 if self._skip_provision():
313 super(PlanetlabSfaNode, self).do_provision()
321 while not provision_ok:
322 node = self._node_to_provision
323 if not self._slicenode:
324 self._add_node_to_slice(node)
326 # check ssh connection
328 while t < timeout and not ssh_ok:
330 cmd = 'echo \'GOOD NODE\''
331 ((out, err), proc) = self.execute(cmd)
332 if out.find("GOOD NODE") < 0:
333 self.debug( "No SSH connection, waiting 60s" )
338 self.debug( "SSH OK" )
342 cmd = 'echo \'GOOD NODE\''
343 ((out, err), proc) = self.execute(cmd)
344 if not out.find("GOOD NODE") < 0:
348 # the timeout was reach without establishing ssh connection
349 # the node is blacklisted, deleted from the slice, and a new
350 # node to provision is discovered
351 self.warning(" Could not SSH login ")
352 self._blacklist_node(node)
356 # check /proc directory is mounted (ssh_ok = True)
357 # and file system is not read only
359 cmd = 'mount |grep proc'
360 ((out1, err1), proc1) = self.execute(cmd)
361 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
362 ((out2, err2), proc2) = self.execute(cmd)
363 if out1.find("/proc type proc") < 0 or \
364 "Read-only file system".lower() in err2.lower():
365 self.warning(" Corrupted file system ")
366 self._blacklist_node(node)
372 if not self.get('hostname'):
373 self._set_hostname_attr(node)
374 self.info(" Node provisioned ")
376 super(PlanetlabSfaNode, self).do_provision()
378 def do_release(self):
379 super(PlanetlabSfaNode, self).do_release()
380 if self.state == ResourceState.RELEASED and not self._skip_provision():
381 self.debug(" Releasing SFA API ")
382 self.sfaapi.release()
384 # def _filter_based_on_attributes(self):
386 # Retrive the list of nodes hrn that match user's constraints
388 # # Map user's defined attributes with tagnames of PlanetLab
389 # timeframe = self.get("timeframe")[0]
392 # 'country' : 'country',
393 # 'region' : 'region',
394 # 'architecture' : 'arch',
395 # 'operatingSystem' : 'fcdistro',
396 # 'minReliability' : 'reliability%s' % timeframe,
397 # 'maxReliability' : 'reliability%s' % timeframe,
398 # 'minBandwidth' : 'bw%s' % timeframe,
399 # 'maxBandwidth' : 'bw%s' % timeframe,
400 # 'minLoad' : 'load%s' % timeframe,
401 # 'maxLoad' : 'load%s' % timeframe,
402 # 'minCpu' : 'cpu%s' % timeframe,
403 # 'maxCpu' : 'cpu%s' % timeframe,
409 # for attr_name, attr_obj in self._attrs.iteritems():
410 # attr_value = self.get(attr_name)
412 # if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
413 # attr_name != 'timeframe':
415 # attr_tag = attr_to_tags[attr_name]
416 # filters['tagname'] = attr_tag
418 # # filter nodes by fixed constraints e.g. operating system
419 # if not 'min' in attr_name and not 'max' in attr_name:
420 # filters['value'] = attr_value
421 # nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
423 # # filter nodes by range constraints e.g. max bandwidth
424 # elif ('min' or 'max') in attr_name:
425 # nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
428 # nodes = self.sfaapi.get_resources_hrn()
430 # nodes_hrn.append(node[node.key()])
433 # def _filter_by_fixed_attr(self, filters, nodes_hrn):
435 # Query SFA API for nodes matching fixed attributes defined by the
439 ## node_tags = self.sfaapi.get_resources_tags(filters)
440 ## if node_tags is not None:
442 ## if len(nodes_id) == 0:
443 ## # first attribute being matched
444 ## for node_tag in node_tags:
445 ## nodes_id.append(node_tag['node_id'])
447 ## # remove the nodes ids that don't match the new attribute
448 ## # that is being match
451 ## for node_tag in node_tags:
452 ## if node_tag['node_id'] in nodes_id:
453 ## nodes_id_tmp.append(node_tag['node_id'])
455 ## if len(nodes_id_tmp):
456 ## nodes_id = set(nodes_id) & set(nodes_id_tmp)
458 ## # no node from before match the new constraint
459 ## self.fail_discovery()
461 ## # no nodes match the filter applied
462 ## self.fail_discovery()
466 # def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
468 # Query PLCAPI for nodes ids matching attributes defined in a certain
472 ## node_tags = self.plapi.get_node_tags(filters)
475 ## if len(nodes_id) == 0:
476 ## # first attribute being matched
477 ## for node_tag in node_tags:
479 ## # check that matches the min or max restriction
480 ## if 'min' in attr_name and node_tag['value'] != 'n/a' and \
481 ## float(node_tag['value']) > attr_value:
482 ## nodes_id.append(node_tag['node_id'])
484 ## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
485 ## float(node_tag['value']) < attr_value:
486 ## nodes_id.append(node_tag['node_id'])
489 ## # remove the nodes ids that don't match the new attribute
490 ## # that is being match
492 ## for node_tag in node_tags:
494 ## # check that matches the min or max restriction and was a
495 ## # matching previous filters
496 ## if 'min' in attr_name and node_tag['value'] != 'n/a' and \
497 ## float(node_tag['value']) > attr_value and \
498 ## node_tag['node_id'] in nodes_id:
499 ## nodes_id_tmp.append(node_tag['node_id'])
501 ## elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
502 ## float(node_tag['value']) < attr_value and \
503 ## node_tag['node_id'] in nodes_id:
504 ## nodes_id_tmp.append(node_tag['node_id'])
506 ## if len(nodes_id_tmp):
507 ## nodes_id = set(nodes_id) & set(nodes_id_tmp)
509 ## # no node from before match the new constraint
510 ## self.fail_discovery()
513 ## # no nodes match the filter applied
514 ## self.fail_discovery()
518 def _choose_random_node(self, nodes, hosts_hrn):
520 From the possible nodes for provision, choose randomly to decrese the
521 probability of different RMs choosing the same node for provision
523 size = len(hosts_hrn)
526 index = randint(0, size)
527 host_hrn = hosts_hrn[index]
528 hosts_hrn[index] = hosts_hrn[size]
530 # check the node is not blacklisted or being provision by other RM
531 # and perform ping to check that is really alive
532 if not self._blacklisted(host_hrn):
533 if not self._reserved(host_hrn):
534 print(self.sfaapi._reserved ,self.guid)
535 for hostname, hrn in nodes.iteritems():
537 print('hostname' ,hostname)
538 ping_ok = self._do_ping(hostname)
541 self._set_hostname_attr(hostname)
542 self.warning(" Node not responding PING ")
543 self._blacklist_node(host_hrn)
545 # discovered node for provision, added to provision list
546 self._node_to_provision = host_hrn
549 # def _get_nodes_id(self, filters=None):
550 # return self.plapi.get_nodes(filters, fields=['node_id'])
552 def _add_node_to_slice(self, host_hrn):
554 Add node to slice, using SFA API.
556 self.info(" Adding node to slice ")
557 slicename = self.get("username").replace('_', '.')
558 slicename = 'ple.' + slicename
559 self.sfaapi.add_resource_to_slice(slicename, host_hrn)
561 def _delete_from_slice(self):
563 Delete every node from slice, using SFA API.
564 Sfi client doesn't work for particular node urns.
566 self.warning(" Deleting node from slice ")
567 slicename = self.get("username").replace('_', '.')
568 slicename = 'ple.' + slicename
569 self.sfaapi.remove_all_from_slice(slicename)
571 def _get_hostname(self):
573 Get the attribute hostname.
575 hostname = self.get("hostname")
581 def _set_hostname_attr(self, node):
583 Query SFAAPI for the hostname of a certain host hrn and sets the
584 attribute hostname, it will over write the previous value.
586 hosts_hrn = self.sfaapi.get_resources_hrn()
587 for hostname, hrn in hosts_hrn.iteritems():
589 self.set("hostname", hostname)
591 def _check_if_in_slice(self, hosts_hrn):
593 Check using SFA API if any host hrn from hosts_hrn is in the user's
596 slicename = self.get("username").replace('_', '.')
597 slicename = 'ple.' + slicename
598 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
600 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
601 else: slice_nodes_hrn = []
602 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
605 def _do_ping(self, hostname):
607 Perform ping command on node's IP matching hostname.
610 ip = self._get_ip(hostname)
612 command = "ping -c4 %s" % ip
613 (out, err) = lexec(command)
615 m = re.search("(\d+)% packet loss", str(out))
616 if m and int(m.groups()[0]) < 50:
621 def _blacklist_node(self, host_hrn):
623 Add mal functioning node to blacklist (in SFA API).
625 self.warning(" Blacklisting malfunctioning node ")
626 self.sfaapi.blacklist_resource(host_hrn)
627 if not self._hostname:
628 self.set('hostname', None)
630 def _reserve(self, host_hrn):
632 Add node to the list of nodes being provisioned, in order for other RMs
633 to not try to provision the same one again.
635 self.sfaapi.reserve_resource(host_hrn)
637 def _get_ip(self, hostname):
639 Query cache for the IP of a node with certain hostname
642 ip = sshfuncs.gethostbyname(hostname)
644 # Fail while trying to find the IP
648 def fail_discovery(self):
649 msg = "Discovery failed. No candidates found for node"
651 raise RuntimeError, msg
653 def fail_node_not_alive(self, hostname=None):
654 msg = "Node %s not alive" % hostname
655 raise RuntimeError, msg
657 def fail_node_not_available(self, hostname):
658 msg = "Node %s not available for provisioning" % hostname
659 raise RuntimeError, msg
661 def fail_not_enough_nodes(self):
662 msg = "Not enough nodes available for provisioning"
663 raise RuntimeError, msg
665 def fail_sfaapi(self):
666 msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
667 " attributes sfauser and sfaPrivateKey."
668 raise RuntimeError, msg
670 def valid_connection(self, guid):