2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
29 from random import randint
35 class PlanetlabNode(LinuxNode):
36 _rtype = "PlanetlabNode"
37 _help = "Controls a PlanetLab host accessible using a SSH key " \
38 "associated to a PlanetLab user account"
39 _backend = "planetlab"
41 lock = threading.Lock()
44 def _register_attributes(cls):
45 ip = Attribute("ip", "PlanetLab host public IP address",
46 flags = Flags.ReadOnly)
48 pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
49 (e.g. www.planet-lab.eu or www.planet-lab.org) ",
50 default = "www.planet-lab.eu",
51 flags = Flags.Credential)
53 pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
54 (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
55 default = "https://%(hostname)s:443/PLCAPI/",
56 flags = Flags.ExecReadOnly)
58 pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
59 authenticate in the website) ",
60 flags = Flags.Credential)
62 pl_password = Attribute("plpassword",
63 "PlanetLab account password, as \
64 the one to authenticate in the website) ",
65 flags = Flags.Credential)
67 city = Attribute("city", "Constrain location (city) during resource \
68 discovery. May use wildcards.",
71 country = Attribute("country", "Constrain location (country) during \
72 resource discovery. May use wildcards.",
75 region = Attribute("region", "Constrain location (region) during \
76 resource discovery. May use wildcards.",
79 architecture = Attribute("architecture", "Constrain architecture \
80 during resource discovery.",
81 type = Types.Enumerate,
86 operating_system = Attribute("operatingSystem", "Constrain operating \
87 system during resource discovery.",
88 type = Types.Enumerate,
96 #site = Attribute("site", "Constrain the PlanetLab site this node \
98 # type = Types.Enumerate,
102 # flags = Flags.Filter)
104 min_reliability = Attribute("minReliability", "Constrain reliability \
105 while picking PlanetLab nodes. Specifies a lower \
109 flags = Flags.Filter)
111 max_reliability = Attribute("maxReliability", "Constrain reliability \
112 while picking PlanetLab nodes. Specifies an upper \
116 flags = Flags.Filter)
118 min_bandwidth = Attribute("minBandwidth", "Constrain available \
119 bandwidth while picking PlanetLab nodes. \
120 Specifies a lower acceptable bound.",
123 flags = Flags.Filter)
125 max_bandwidth = Attribute("maxBandwidth", "Constrain available \
126 bandwidth while picking PlanetLab nodes. \
127 Specifies an upper acceptable bound.",
130 flags = Flags.Filter)
132 min_load = Attribute("minLoad", "Constrain node load average while \
133 picking PlanetLab nodes. Specifies a lower acceptable \
137 flags = Flags.Filter)
139 max_load = Attribute("maxLoad", "Constrain node load average while \
140 picking PlanetLab nodes. Specifies an upper acceptable \
144 flags = Flags.Filter)
146 min_cpu = Attribute("minCpu", "Constrain available cpu time while \
147 picking PlanetLab nodes. Specifies a lower acceptable \
151 flags = Flags.Filter)
153 max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
154 picking PlanetLab nodes. Specifies an upper acceptable \
158 flags = Flags.Filter)
160 timeframe = Attribute("timeframe", "Past time period in which to check\
161 information about the node. Values are year,month, \
164 type = Types.Enumerate,
169 flags = Flags.Filter)
171 cls._register_attribute(ip)
172 cls._register_attribute(pl_url)
173 cls._register_attribute(pl_ptn)
174 cls._register_attribute(pl_user)
175 cls._register_attribute(pl_password)
176 #cls._register_attribute(site)
177 cls._register_attribute(city)
178 cls._register_attribute(country)
179 cls._register_attribute(region)
180 cls._register_attribute(architecture)
181 cls._register_attribute(operating_system)
182 cls._register_attribute(min_reliability)
183 cls._register_attribute(max_reliability)
184 cls._register_attribute(min_bandwidth)
185 cls._register_attribute(max_bandwidth)
186 cls._register_attribute(min_load)
187 cls._register_attribute(max_load)
188 cls._register_attribute(min_cpu)
189 cls._register_attribute(max_cpu)
190 cls._register_attribute(timeframe)
192 def __init__(self, ec, guid):
193 super(PlanetlabNode, self).__init__(ec, guid)
196 self._node_to_provision = None
197 self._slicenode = False
199 def _skip_provision(self):
200 pl_user = self.get("pluser")
201 pl_pass = self.get("plpassword")
202 if not pl_user and not pl_pass:
209 pl_user = self.get("pluser")
210 pl_pass = self.get("plpassword")
211 pl_url = self.get("plcApiUrl")
212 pl_ptn = self.get("plcApiPattern")
214 self._plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
222 def do_discover(self):
224 Based on the attributes defined by the user, discover the suitable
227 if self._skip_provision():
228 super(PlanetlabNode, self).do_discover()
231 hostname = self._get_hostname()
233 # the user specified one particular node to be provisioned
234 # check with PLCAPI if it is alvive
235 node_id = self._query_if_alive(hostname=hostname)
236 node_id = node_id.pop()
238 # check that the node is not blacklisted or being provisioned
240 with PlanetlabNode.lock:
241 plist = self.plapi.reserved()
242 blist = self.plapi.blacklisted()
243 if node_id not in blist and node_id not in plist:
245 # check that is really alive, by performing ping
246 ping_ok = self._do_ping(node_id)
248 self._blacklist_node(node_id)
249 self.fail_node_not_alive(hostname)
251 if self._check_if_in_slice([node_id]):
252 self._slicenode = True
253 self._put_node_in_provision(node_id)
254 self._node_to_provision = node_id
255 super(PlanetlabNode, self).do_discover()
258 self.fail_node_not_available(hostname)
261 # the user specifies constraints based on attributes, zero, one or
262 # more nodes can match these constraints
263 nodes = self._filter_based_on_attributes()
264 nodes_alive = self._query_if_alive(nodes)
266 # nodes that are already part of user's slice have the priority to
268 nodes_inslice = self._check_if_in_slice(nodes_alive)
269 nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
273 node_id = self._choose_random_node(nodes_inslice)
274 self._slicenode = True
277 # Either there were no matching nodes in the user's slice, or
278 # the nodes in the slice were blacklisted or being provisioned
279 # by other RM. Note nodes_not_inslice is never empty
280 node_id = self._choose_random_node(nodes_not_inslice)
281 self._slicenode = False
284 self._node_to_provision = node_id
286 self._set_hostname_attr(node_id)
287 self.info(" Selected node to provision ")
289 with PlanetlabNode.lock:
290 self._blacklist_node(node_id)
293 super(PlanetlabNode, self).do_discover()
295 self.fail_not_enough_nodes()
297 def do_provision(self):
299 Add node to user's slice after verifing that the node is functioning
302 if self._skip_provision():
303 super(PlanetlabNode, self).do_provision()
311 while not provision_ok:
312 node = self._node_to_provision
313 if not self._slicenode:
314 self._add_node_to_slice(node)
316 # check ssh connection
318 while t < timeout and not ssh_ok:
320 cmd = 'echo \'GOOD NODE\''
321 ((out, err), proc) = self.execute(cmd)
322 if out.find("GOOD NODE") < 0:
330 cmd = 'echo \'GOOD NODE\''
331 ((out, err), proc) = self.execute(cmd)
332 if not out.find("GOOD NODE") < 0:
336 # the timeout was reach without establishing ssh connection
337 # the node is blacklisted, deleted from the slice, and a new
338 # node to provision is discovered
339 with PlanetlabNode.lock:
340 self.warn(" Could not SSH login ")
341 self._blacklist_node(node)
342 #self._delete_node_from_slice(node)
343 self.set('hostname', None)
347 # check /proc directory is mounted (ssh_ok = True)
349 cmd = 'mount |grep proc'
350 ((out, err), proc) = self.execute(cmd)
351 if out.find("/proc type proc") < 0:
352 with PlanetlabNode.lock:
353 self.warn(" Could not find directory /proc ")
354 self._blacklist_node(node)
355 #self._delete_node_from_slice(node)
356 self.set('hostname', None)
363 ip = self._get_ip(node)
366 super(PlanetlabNode, self).do_provision()
368 def _filter_based_on_attributes(self):
370 Retrive the list of nodes ids that match user's constraints
372 # Map user's defined attributes with tagnames of PlanetLab
373 timeframe = self.get("timeframe")[0]
376 'country' : 'country',
378 'architecture' : 'arch',
379 'operatingSystem' : 'fcdistro',
380 #'site' : 'pldistro',
381 'minReliability' : 'reliability%s' % timeframe,
382 'maxReliability' : 'reliability%s' % timeframe,
383 'minBandwidth' : 'bw%s' % timeframe,
384 'maxBandwidth' : 'bw%s' % timeframe,
385 'minLoad' : 'load%s' % timeframe,
386 'maxLoad' : 'load%s' % timeframe,
387 'minCpu' : 'cpu%s' % timeframe,
388 'maxCpu' : 'cpu%s' % timeframe,
394 for attr_name, attr_obj in self._attrs.iteritems():
395 attr_value = self.get(attr_name)
397 if attr_value is not None and attr_obj.flags == 8 and \
398 attr_name != 'timeframe':
400 attr_tag = attr_to_tags[attr_name]
401 filters['tagname'] = attr_tag
403 # filter nodes by fixed constraints e.g. operating system
404 if not 'min' in attr_name and not 'max' in attr_name:
405 filters['value'] = attr_value
406 nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
408 # filter nodes by range constraints e.g. max bandwidth
409 elif ('min' or 'max') in attr_name:
410 nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
413 nodes = self.plapi.get_nodes()
415 nodes_id.append(node['node_id'])
420 def _filter_by_fixed_attr(self, filters, nodes_id):
422 Query PLCAPI for nodes ids matching fixed attributes defined by the
425 node_tags = self.plapi.get_node_tags(filters)
426 if node_tags is not None:
428 if len(nodes_id) == 0:
429 # first attribute being matched
430 for node_tag in node_tags:
431 nodes_id.append(node_tag['node_id'])
433 # remove the nodes ids that don't match the new attribute
434 # that is being match
437 for node_tag in node_tags:
438 if node_tag['node_id'] in nodes_id:
439 nodes_id_tmp.append(node_tag['node_id'])
441 if len(nodes_id_tmp):
442 nodes_id = set(nodes_id) & set(nodes_id_tmp)
444 # no node from before match the new constraint
445 self.fail_discovery()
447 # no nodes match the filter applied
448 self.fail_discovery()
452 def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
454 Query PLCAPI for nodes ids matching attributes defined in a certain
457 node_tags = self.plapi.get_node_tags(filters)
460 if len(nodes_id) == 0:
461 # first attribute being matched
462 for node_tag in node_tags:
464 # check that matches the min or max restriction
465 if 'min' in attr_name and node_tag['value'] != 'n/a' and \
466 float(node_tag['value']) > attr_value:
467 nodes_id.append(node_tag['node_id'])
469 elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
470 float(node_tag['value']) < attr_value:
471 nodes_id.append(node_tag['node_id'])
474 # remove the nodes ids that don't match the new attribute
475 # that is being match
477 for node_tag in node_tags:
479 # check that matches the min or max restriction and was a
480 # matching previous filters
481 if 'min' in attr_name and node_tag['value'] != 'n/a' and \
482 float(node_tag['value']) > attr_value and \
483 node_tag['node_id'] in nodes_id:
484 nodes_id_tmp.append(node_tag['node_id'])
486 elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
487 float(node_tag['value']) < attr_value and \
488 node_tag['node_id'] in nodes_id:
489 nodes_id_tmp.append(node_tag['node_id'])
491 if len(nodes_id_tmp):
492 nodes_id = set(nodes_id) & set(nodes_id_tmp)
494 # no node from before match the new constraint
495 self.fail_discovery()
498 # no nodes match the filter applied
499 self.fail_discovery()
503 def _query_if_alive(self, nodes_id=None, hostname=None):
505 Query PLCAPI for nodes that register activity recently, using filters
506 related to the state of the node, e.g. last time it was contacted
508 if nodes_id is None and hostname is None:
509 msg = "Specify nodes_id or hostname"
510 raise RuntimeError, msg
512 if nodes_id is not None and hostname is not None:
513 msg = "Specify either nodes_id or hostname"
514 raise RuntimeError, msg
516 # define PL filters to check the node is alive
518 filters['run_level'] = 'boot'
519 filters['boot_state'] = 'boot'
520 filters['node_type'] = 'regular'
521 #filters['>last_contact'] = int(time.time()) - 2*3600
523 # adding node_id or hostname to the filters to check for the particular
526 filters['node_id'] = list(nodes_id)
527 alive_nodes_id = self._get_nodes_id(filters)
529 filters['hostname'] = hostname
530 alive_nodes_id = self._get_nodes_id(filters)
532 if len(alive_nodes_id) == 0:
533 self.fail_node_not_alive(hostname)
536 for node_id in alive_nodes_id:
537 nid = node_id['node_id']
542 def _choose_random_node(self, nodes):
544 From the possible nodes for provision, choose randomly to decrese the
545 probability of different RMs choosing the same node for provision
550 index = randint(0, size)
551 node_id = nodes[index]
552 nodes[index] = nodes[size]
554 # check the node is not blacklisted or being provision by other RM
555 # and perform ping to check that is really alive
556 with PlanetlabNode.lock:
558 blist = self.plapi.blacklisted()
559 plist = self.plapi.reserved()
560 if node_id not in blist and node_id not in plist:
561 ping_ok = self._do_ping(node_id)
563 self._set_hostname_attr(node_id)
564 self.warn(" Node not responding PING ")
565 self._blacklist_node(node_id)
566 self.set('hostname', None)
568 # discovered node for provision, added to provision list
569 self._put_node_in_provision(node_id)
572 def _get_nodes_id(self, filters):
573 return self.plapi.get_nodes(filters, fields=['node_id'])
575 def _add_node_to_slice(self, node_id):
576 self.info(" Adding node to slice ")
577 slicename = self.get("username")
578 with PlanetlabNode.lock:
579 slice_nodes = self.plapi.get_slice_nodes(slicename)
580 slice_nodes.append(node_id)
581 self.plapi.add_slice_nodes(slicename, slice_nodes)
583 def _delete_node_from_slice(self, node):
584 self.warn(" Deleting node from slice ")
585 slicename = self.get("username")
586 self.plapi.delete_slice_node(slicename, [node])
588 def _get_hostname(self):
589 hostname = self.get("hostname")
594 hostname = sshfuncs.gethostbyname(ip)
599 def _set_hostname_attr(self, node):
601 Query PLCAPI for the hostname of a certain node id and sets the
602 attribute hostname, it will over write the previous value
604 hostname = self.plapi.get_nodes(node, ['hostname'])
605 self.set("hostname", hostname[0]['hostname'])
607 def _check_if_in_slice(self, nodes_id):
609 Query PLCAPI to find out if any node id from nodes_id is in the user's
612 slicename = self.get("username")
613 slice_nodes = self.plapi.get_slice_nodes(slicename)
614 nodes_inslice = list(set(nodes_id) & set(slice_nodes))
617 def _do_ping(self, node_id):
619 Perform ping command on node's IP matching node id
622 ip = self._get_ip(node_id)
623 if not ip: return ping_ok
625 command = "ping -c4 %s" % ip
627 (out, err) = lexec(command)
628 if not out.find("2 received") or not out.find("3 received") or not \
629 out.find("4 received") < 0:
634 def _blacklist_node(self, node):
636 Add node mal functioning node to blacklist
638 self.warn(" Blacklisting malfunctioning node ")
639 self._plapi.blacklist_host(node)
641 def _put_node_in_provision(self, node):
643 Add node to the list of nodes being provisioned, in order for other RMs
644 to not try to provision the same one again
646 self._plapi.reserve_host(node)
648 def _get_ip(self, node_id):
650 Query PLCAPI for the IP of a node with certain node id
652 hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
654 ip = sshfuncs.gethostbyname(hostname['hostname'])
656 # Fail while trying to find the IP
660 def fail_discovery(self):
661 msg = "Discovery failed. No candidates found for node"
663 raise RuntimeError, msg
665 def fail_node_not_alive(self, hostname=None):
666 msg = "Node %s not alive" % hostname
667 raise RuntimeError, msg
669 def fail_node_not_available(self, hostname):
670 msg = "Node %s not available for provisioning" % hostname
671 raise RuntimeError, msg
673 def fail_not_enough_nodes(self):
674 msg = "Not enough nodes available for provisioning"
675 raise RuntimeError, msg
677 def fail_plapi(self):
678 msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
679 " attributes pluser and plpassword."
680 raise RuntimeError, msg
682 def valid_connection(self, guid):