2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22 ResourceState, reschedule_delay
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
28 from random import randint
35 class PlanetlabSfaNode(LinuxNode):
36 _rtype = "PlanetlabSfaNode"
37 _help = "Controls a PlanetLab host accessible using a SSH key " \
38 "and provisioned using SFA"
39 _backend = "planetlab"
41 lock = threading.Lock()
44 def _register_attributes(cls):
46 sfa_user = Attribute("sfauser", "SFA user",
47 flags = Flags.Credential)
49 sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50 used to generate the user credential")
52 city = Attribute("city", "Constrain location (city) during resource \
53 discovery. May use wildcards.",
56 country = Attribute("country", "Constrain location (country) during \
57 resource discovery. May use wildcards.",
60 region = Attribute("region", "Constrain location (region) during \
61 resource discovery. May use wildcards.",
64 architecture = Attribute("architecture", "Constrain architecture \
65 during resource discovery.",
66 type = Types.Enumerate,
71 operating_system = Attribute("operatingSystem", "Constrain operating \
72 system during resource discovery.",
73 type = Types.Enumerate,
81 min_reliability = Attribute("minReliability", "Constrain reliability \
82 while picking PlanetLab nodes. Specifies a lower \
88 max_reliability = Attribute("maxReliability", "Constrain reliability \
89 while picking PlanetLab nodes. Specifies an upper \
95 min_bandwidth = Attribute("minBandwidth", "Constrain available \
96 bandwidth while picking PlanetLab nodes. \
97 Specifies a lower acceptable bound.",
100 flags = Flags.Filter)
102 max_bandwidth = Attribute("maxBandwidth", "Constrain available \
103 bandwidth while picking PlanetLab nodes. \
104 Specifies an upper acceptable bound.",
107 flags = Flags.Filter)
109 min_load = Attribute("minLoad", "Constrain node load average while \
110 picking PlanetLab nodes. Specifies a lower acceptable \
114 flags = Flags.Filter)
116 max_load = Attribute("maxLoad", "Constrain node load average while \
117 picking PlanetLab nodes. Specifies an upper acceptable \
121 flags = Flags.Filter)
123 min_cpu = Attribute("minCpu", "Constrain available cpu time while \
124 picking PlanetLab nodes. Specifies a lower acceptable \
128 flags = Flags.Filter)
130 max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
131 picking PlanetLab nodes. Specifies an upper acceptable \
135 flags = Flags.Filter)
137 timeframe = Attribute("timeframe", "Past time period in which to check\
138 information about the node. Values are year,month, \
141 type = Types.Enumerate,
146 flags = Flags.Filter)
148 # plblacklist = Attribute("blacklist", "Take into account the file plblacklist \
149 # in the user's home directory under .nepi directory. This file \
150 # contains a list of PL nodes to blacklist, and at the end \
151 # of the experiment execution the new blacklisted nodes are added.",
154 # flags = Flags.ReadOnly)
157 cls._register_attribute(sfa_user)
158 cls._register_attribute(sfa_private_key)
159 cls._register_attribute(city)
160 cls._register_attribute(country)
161 cls._register_attribute(region)
162 cls._register_attribute(architecture)
163 cls._register_attribute(operating_system)
164 cls._register_attribute(min_reliability)
165 cls._register_attribute(max_reliability)
166 cls._register_attribute(min_bandwidth)
167 cls._register_attribute(max_bandwidth)
168 cls._register_attribute(min_load)
169 cls._register_attribute(max_load)
170 cls._register_attribute(min_cpu)
171 cls._register_attribute(max_cpu)
172 cls._register_attribute(timeframe)
174 def __init__(self, ec, guid):
175 super(PlanetlabSfaNode, self).__init__(ec, guid)
178 self._node_to_provision = None
179 self._slicenode = False
180 self._hostname = False
182 if self.get("gateway") or self.get("gatewayUser"):
183 self.set("gateway", None)
184 self.set("gatewayUser", None)
186 def _skip_provision(self):
187 sfa_user = self.get("sfauser")
195 sfa_user = self.get("sfauser")
196 sfa_sm = "http://sfa3.planet-lab.eu:12346/"
197 sfa_auth = '.'.join(sfa_user.split('.')[:2])
198 sfa_registry = "http://sfa3.planet-lab.eu:12345/"
199 sfa_private_key = self.get("sfaPrivateKey")
201 self._sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth,
202 sfa_registry, sfa_sm, sfa_private_key)
209 def do_discover(self):
211 Based on the attributes defined by the user, discover the suitable
214 if self._skip_provision():
215 super(PlanetlabSfaNode, self).do_discover()
218 nodes = self.sfaapi.get_resources_hrn()
220 hostname = self._get_hostname()
222 # the user specified one particular node to be provisioned
223 self._hostname = True
224 host_hrn = nodes[hostname]
227 # check that the node is not blacklisted or being provisioned
229 with PlanetlabSfaNode.lock:
230 plist = self.sfaapi.reserved()
231 blist = self.sfaapi.blacklisted()
232 if host_hrn not in blist and host_hrn not in plist:
234 # check that is really alive, by performing ping
235 ping_ok = self._do_ping(hostname)
237 self._blacklist_node(host_hrn)
238 self.fail_node_not_alive(hostname)
240 if self._check_if_in_slice([host_hrn]):
241 self._slicenode = True
242 self._put_node_in_provision(host_hrn)
243 self._node_to_provision = host_hrn
245 self.fail_node_not_available(hostname)
246 super(PlanetlabSfaNode, self).do_discover()
249 # the user specifies constraints based on attributes, zero, one or
250 # more nodes can match these constraints
251 nodes = self._filter_based_on_attributes()
253 # nodes that are already part of user's slice have the priority to
255 nodes_inslice = self._check_if_in_slice(nodes)
256 nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
260 node_id = self._choose_random_node(nodes_inslice)
261 self._slicenode = True
264 # Either there were no matching nodes in the user's slice, or
265 # the nodes in the slice were blacklisted or being provisioned
266 # by other RM. Note nodes_not_inslice is never empty
267 node_id = self._choose_random_node(nodes_not_inslice)
268 self._slicenode = False
271 self._node_to_provision = node_id
273 self._set_hostname_attr(node_id)
274 self.info(" Selected node to provision ")
275 super(PlanetlabSfaNode, self).do_discover()
277 with PlanetlabSfaNode.lock:
278 self._blacklist_node(node_id)
281 self.fail_not_enough_nodes()
283 def do_provision(self):
285 Add node to user's slice after verifing that the node is functioning
288 if self._skip_provision():
289 super(PlanetlabSfaNode, self).do_provision()
297 while not provision_ok:
298 node = self._node_to_provision
299 if not self._slicenode:
300 self._add_node_to_slice(node)
302 # check ssh connection
304 while t < timeout and not ssh_ok:
306 cmd = 'echo \'GOOD NODE\''
307 ((out, err), proc) = self.execute(cmd)
308 if out.find("GOOD NODE") < 0:
316 cmd = 'echo \'GOOD NODE\''
317 ((out, err), proc) = self.execute(cmd)
318 if not out.find("GOOD NODE") < 0:
322 # the timeout was reach without establishing ssh connection
323 # the node is blacklisted, deleted from the slice, and a new
324 # node to provision is discovered
325 with PlanetlabSfaNode.lock:
326 self.warn(" Could not SSH login ")
327 self._blacklist_node(node)
328 #self._delete_node_from_slice(node)
332 # check /proc directory is mounted (ssh_ok = True)
333 # and file system is not read only
335 cmd = 'mount |grep proc'
336 ((out1, err1), proc1) = self.execute(cmd)
337 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
338 ((out2, err2), proc2) = self.execute(cmd)
339 if out1.find("/proc type proc") < 0 or \
340 "Read-only file system".lower() in err2.lower():
341 with PlanetlabNode.lock:
342 self.warn(" Corrupted file system ")
343 self._blacklist_node(node)
344 #self._delete_node_from_slice(node)
350 if not self.get('hostname'):
351 self._set_hostname_attr(node)
352 self.info(" Node provisioned ")
354 super(PlanetlabSfaNode, self).do_provision()
356 def _filter_based_on_attributes(self):
358 Retrive the list of nodes hrn that match user's constraints
360 # Map user's defined attributes with tagnames of PlanetLab
361 timeframe = self.get("timeframe")[0]
364 'country' : 'country',
366 'architecture' : 'arch',
367 'operatingSystem' : 'fcdistro',
368 'minReliability' : 'reliability%s' % timeframe,
369 'maxReliability' : 'reliability%s' % timeframe,
370 'minBandwidth' : 'bw%s' % timeframe,
371 'maxBandwidth' : 'bw%s' % timeframe,
372 'minLoad' : 'load%s' % timeframe,
373 'maxLoad' : 'load%s' % timeframe,
374 'minCpu' : 'cpu%s' % timeframe,
375 'maxCpu' : 'cpu%s' % timeframe,
381 for attr_name, attr_obj in self._attrs.iteritems():
382 attr_value = self.get(attr_name)
384 if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
385 attr_name != 'timeframe':
387 attr_tag = attr_to_tags[attr_name]
388 filters['tagname'] = attr_tag
390 # filter nodes by fixed constraints e.g. operating system
391 if not 'min' in attr_name and not 'max' in attr_name:
392 filters['value'] = attr_value
393 nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
395 # filter nodes by range constraints e.g. max bandwidth
396 elif ('min' or 'max') in attr_name:
397 nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
400 nodes = self.sfaapi.get_resources_hrn()
402 nodes_hrn.append(node[node.key()])
405 def _filter_by_fixed_attr(self, filters, nodes_hrn):
407 Query SFA API for nodes matching fixed attributes defined by the
411 # node_tags = self.sfaapi.get_resources_tags(filters)
412 # if node_tags is not None:
414 # if len(nodes_id) == 0:
415 # # first attribute being matched
416 # for node_tag in node_tags:
417 # nodes_id.append(node_tag['node_id'])
419 # # remove the nodes ids that don't match the new attribute
420 # # that is being match
423 # for node_tag in node_tags:
424 # if node_tag['node_id'] in nodes_id:
425 # nodes_id_tmp.append(node_tag['node_id'])
427 # if len(nodes_id_tmp):
428 # nodes_id = set(nodes_id) & set(nodes_id_tmp)
430 # # no node from before match the new constraint
431 # self.fail_discovery()
433 # # no nodes match the filter applied
434 # self.fail_discovery()
438 def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
440 Query PLCAPI for nodes ids matching attributes defined in a certain
444 # node_tags = self.plapi.get_node_tags(filters)
447 # if len(nodes_id) == 0:
448 # # first attribute being matched
449 # for node_tag in node_tags:
451 # # check that matches the min or max restriction
452 # if 'min' in attr_name and node_tag['value'] != 'n/a' and \
453 # float(node_tag['value']) > attr_value:
454 # nodes_id.append(node_tag['node_id'])
456 # elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
457 # float(node_tag['value']) < attr_value:
458 # nodes_id.append(node_tag['node_id'])
461 # # remove the nodes ids that don't match the new attribute
462 # # that is being match
464 # for node_tag in node_tags:
466 # # check that matches the min or max restriction and was a
467 # # matching previous filters
468 # if 'min' in attr_name and node_tag['value'] != 'n/a' and \
469 # float(node_tag['value']) > attr_value and \
470 # node_tag['node_id'] in nodes_id:
471 # nodes_id_tmp.append(node_tag['node_id'])
473 # elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
474 # float(node_tag['value']) < attr_value and \
475 # node_tag['node_id'] in nodes_id:
476 # nodes_id_tmp.append(node_tag['node_id'])
478 # if len(nodes_id_tmp):
479 # nodes_id = set(nodes_id) & set(nodes_id_tmp)
481 # # no node from before match the new constraint
482 # self.fail_discovery()
485 # # no nodes match the filter applied
486 # self.fail_discovery()
490 def _choose_random_node(self, nodes):
492 From the possible nodes for provision, choose randomly to decrese the
493 probability of different RMs choosing the same node for provision
498 index = randint(0, size)
499 node_id = nodes[index]
500 nodes[index] = nodes[size]
502 # check the node is not blacklisted or being provision by other RM
503 # and perform ping to check that is really alive
504 with PlanetlabNode.lock:
506 blist = self.plapi.blacklisted()
507 plist = self.plapi.reserved()
508 if node_id not in blist and node_id not in plist:
509 ping_ok = self._do_ping(node_id)
511 self._set_hostname_attr(node_id)
512 self.warn(" Node not responding PING ")
513 self._blacklist_node(node_id)
515 # discovered node for provision, added to provision list
516 self._put_node_in_provision(node_id)
519 def _get_nodes_id(self, filters=None):
520 return self.plapi.get_nodes(filters, fields=['node_id'])
522 def _add_node_to_slice(self, host_hrn):
523 self.info(" Adding node to slice ")
524 slicename = self.get("username").replace('_', '.')
525 slicename = 'ple.' + slicename
526 self.sfaapi.add_resource_to_slice(slicename, host_hrn)
528 def _delete_node_from_slice(self, node):
529 self.warn(" Deleting node from slice ")
530 slicename = self.get("username")
531 self.plapi.delete_slice_node(slicename, [node])
533 def _get_hostname(self):
534 hostname = self.get("hostname")
540 def _set_hostname_attr(self, node):
542 Query PLCAPI for the hostname of a certain node id and sets the
543 attribute hostname, it will over write the previous value
545 hostname = self.plapi.get_nodes(node, ['hostname'])
546 self.set("hostname", hostname[0]['hostname'])
548 def _check_if_in_slice(self, hosts_hrn):
550 Check using SFA API if any host hrn from hosts_hrn is in the user's
553 slicename = self.get("username").replace('_', '.')
554 slicename = 'ple.' + slicename
555 slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
556 slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes)
557 nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
560 def _do_ping(self, hostname):
562 Perform ping command on node's IP matching hostname
565 ip = self._get_ip(hostname)
566 if not ip: return ping_ok
568 command = "ping -c4 %s" % ip
570 (out, err) = lexec(command)
571 if not str(out).find("2 received") < 0 or not str(out).find("3 received") < 0 or not \
572 str(out).find("4 received") < 0:
577 def _blacklist_node(self, host_hrn):
579 Add node mal functioning node to blacklist
581 self.warn(" Blacklisting malfunctioning node ")
582 self.sfaapi.blacklist_resource(host_hrn)
583 if not self._hostname:
584 self.set('hostname', None)
586 def _put_node_in_provision(self, host_hrn):
588 Add node to the list of nodes being provisioned, in order for other RMs
589 to not try to provision the same one again
591 self.sfaapi.reserve_resource(host_hrn)
593 def _get_ip(self, hostname):
595 Query PLCAPI for the IP of a node with certain node id
598 ip = sshfuncs.gethostbyname(hostname)
600 # Fail while trying to find the IP
604 def _get_nodes_hrn(self):
605 nodes = self.sfaapi.get_resouces_hrn()
609 def fail_discovery(self):
610 msg = "Discovery failed. No candidates found for node"
612 raise RuntimeError, msg
614 def fail_node_not_alive(self, hostname=None):
615 msg = "Node %s not alive" % hostname
616 raise RuntimeError, msg
618 def fail_node_not_available(self, hostname):
619 msg = "Node %s not available for provisioning" % hostname
620 raise RuntimeError, msg
622 def fail_not_enough_nodes(self):
623 msg = "Not enough nodes available for provisioning"
624 raise RuntimeError, msg
626 def fail_plapi(self):
627 msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
628 " attributes pluser and plpassword."
629 raise RuntimeError, msg
631 def valid_connection(self, guid):