2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License version 2 as
7 # published by the Free Software Foundation;
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 # Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.resources.planetlab.plcapi import PLCAPIFactory
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
28 from random import randint
38 class PlanetlabNode(LinuxNode):
39 _rtype = "planetlab::Node"
40 _help = "Controls a PlanetLab host accessible using a SSH key " \
41 "associated to a PlanetLab user account"
42 _platform = "planetlab"
44 lock = threading.Lock()
47 def _register_attributes(cls):
48 ip = Attribute("ip", "PlanetLab host public IP address",
51 pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
52 (e.g. www.planet-lab.eu or www.planet-lab.org) ",
53 default = "www.planet-lab.eu",
54 flags = Flags.Credential)
56 pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
57 (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
58 default = "https://%(hostname)s:443/PLCAPI/",
61 pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
62 authenticate in the website) ",
63 flags = Flags.Credential)
65 pl_password = Attribute("plpassword",
66 "PlanetLab account password, as \
67 the one to authenticate in the website) ",
68 flags = Flags.Credential)
70 city = Attribute("city", "Constrain location (city) during resource \
71 discovery. May use wildcards.",
74 country = Attribute("country", "Constrain location (country) during \
75 resource discovery. May use wildcards.",
78 region = Attribute("region", "Constrain location (region) during \
79 resource discovery. May use wildcards.",
82 architecture = Attribute("architecture", "Constrain architecture \
83 during resource discovery.",
84 type = Types.Enumerate,
89 operating_system = Attribute("operatingSystem", "Constrain operating \
90 system during resource discovery.",
91 type = Types.Enumerate,
99 min_reliability = Attribute("minReliability", "Constrain reliability \
100 while picking PlanetLab nodes. Specifies a lower \
104 flags = Flags.Filter)
106 max_reliability = Attribute("maxReliability", "Constrain reliability \
107 while picking PlanetLab nodes. Specifies an upper \
111 flags = Flags.Filter)
113 min_bandwidth = Attribute("minBandwidth", "Constrain available \
114 bandwidth while picking PlanetLab nodes. \
115 Specifies a lower acceptable bound.",
118 flags = Flags.Filter)
120 max_bandwidth = Attribute("maxBandwidth", "Constrain available \
121 bandwidth while picking PlanetLab nodes. \
122 Specifies an upper acceptable bound.",
125 flags = Flags.Filter)
127 min_load = Attribute("minLoad", "Constrain node load average while \
128 picking PlanetLab nodes. Specifies a lower acceptable \
132 flags = Flags.Filter)
134 max_load = Attribute("maxLoad", "Constrain node load average while \
135 picking PlanetLab nodes. Specifies an upper acceptable \
139 flags = Flags.Filter)
141 min_cpu = Attribute("minCpu", "Constrain available cpu time while \
142 picking PlanetLab nodes. Specifies a lower acceptable \
146 flags = Flags.Filter)
148 max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
149 picking PlanetLab nodes. Specifies an upper acceptable \
153 flags = Flags.Filter)
155 timeframe = Attribute("timeframe", "Past time period in which to check\
156 information about the node. Values are year,month, \
159 type = Types.Enumerate,
164 flags = Flags.Filter)
166 plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
167 in the user's home directory under .nepi directory. This file \
168 contains a list of PL nodes to blacklist, and at the end \
169 of the experiment execution the new blacklisted nodes are added.",
172 flags = Flags.Global)
174 cls._register_attribute(ip)
175 cls._register_attribute(pl_url)
176 cls._register_attribute(pl_ptn)
177 cls._register_attribute(pl_user)
178 cls._register_attribute(pl_password)
179 cls._register_attribute(city)
180 cls._register_attribute(country)
181 cls._register_attribute(region)
182 cls._register_attribute(architecture)
183 cls._register_attribute(operating_system)
184 cls._register_attribute(min_reliability)
185 cls._register_attribute(max_reliability)
186 cls._register_attribute(min_bandwidth)
187 cls._register_attribute(max_bandwidth)
188 cls._register_attribute(min_load)
189 cls._register_attribute(max_load)
190 cls._register_attribute(min_cpu)
191 cls._register_attribute(max_cpu)
192 cls._register_attribute(timeframe)
193 cls._register_attribute(plblacklist)
195 def __init__(self, ec, guid):
196 super(PlanetlabNode, self).__init__(ec, guid)
198 self._ecobj = weakref.ref(ec)
200 self._node_to_provision = None
201 self._slicenode = False
202 self._hostname = False
204 if self.get("gateway") or self.get("gatewayUser"):
205 self.set("gateway", None)
206 self.set("gatewayUser", None)
209 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
210 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
211 if not os.path.exists(plblacklist_file):
212 if os.path.isdir(nepi_home):
213 open(plblacklist_file, 'w').close()
215 os.makedirs(nepi_home)
216 open(plblacklist_file, 'w').close()
218 def _skip_provision(self):
219 pl_user = self.get("pluser")
220 pl_pass = self.get("plpassword")
221 if not pl_user and not pl_pass:
228 pl_user = self.get("pluser")
229 pl_pass = self.get("plpassword")
230 pl_url = self.get("plcApiUrl")
231 pl_ptn = self.get("plcApiPattern")
232 _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
233 pl_ptn, self._ecobj())
238 self._plapi = weakref.ref(_plapi)
242 def do_discover(self):
244 Based on the attributes defined by the user, discover the suitable
247 if self._skip_provision():
248 super(PlanetlabNode, self).do_discover()
251 hostname = self._get_hostname()
253 # the user specified one particular node to be provisioned
254 self._hostname = True
255 node_id = self._get_nodes_id({'hostname':hostname})
256 node_id = node_id.pop()['node_id']
258 # check that the node is not blacklisted or being provisioned
260 with PlanetlabNode.lock:
261 plist = self.plapi.reserved()
262 blist = self.plapi.blacklisted()
263 if node_id not in blist and node_id not in plist:
265 # check that is really alive, by performing ping
266 ping_ok = self._do_ping(node_id)
268 self._blacklist_node(node_id)
269 self.fail_node_not_alive(hostname)
271 if self._check_if_in_slice([node_id]):
272 self._slicenode = True
273 self._put_node_in_provision(node_id)
274 self._node_to_provision = node_id
276 self.fail_node_not_available(hostname)
277 super(PlanetlabNode, self).do_discover()
280 # the user specifies constraints based on attributes, zero, one or
281 # more nodes can match these constraints
282 nodes = self._filter_based_on_attributes()
284 # nodes that are already part of user's slice have the priority to
286 nodes_inslice = self._check_if_in_slice(nodes)
287 nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
291 node_id = self._choose_random_node(nodes_inslice)
292 self._slicenode = True
295 # Either there were no matching nodes in the user's slice, or
296 # the nodes in the slice were blacklisted or being provisioned
297 # by other RM. Note nodes_not_inslice is never empty
298 node_id = self._choose_random_node(nodes_not_inslice)
299 self._slicenode = False
302 self._node_to_provision = node_id
304 self._set_hostname_attr(node_id)
305 self.info(" Selected node to provision ")
306 super(PlanetlabNode, self).do_discover()
308 with PlanetlabNode.lock:
309 self._blacklist_node(node_id)
312 self.fail_not_enough_nodes()
314 def do_provision(self):
316 Add node to user's slice after verifing that the node is functioning
319 if self._skip_provision():
320 super(PlanetlabNode, self).do_provision()
328 while not provision_ok:
329 node = self._node_to_provision
330 if not self._slicenode:
331 self._add_node_to_slice(node)
332 if self._check_if_in_slice([node]):
333 self.debug( "Node added to slice" )
335 self.warning(" Could not add to slice ")
336 with PlanetlabNode.lock:
337 self._blacklist_node(node)
341 # check ssh connection
343 while t < timeout and not ssh_ok:
345 cmd = 'echo \'GOOD NODE\''
346 ((out, err), proc) = self.execute(cmd)
347 if out.find("GOOD NODE") < 0:
348 self.debug( "No SSH connection, waiting 60s" )
353 self.debug( "SSH OK" )
357 cmd = 'echo \'GOOD NODE\''
358 ((out, err), proc) = self.execute(cmd)
359 if not out.find("GOOD NODE") < 0:
363 # the timeout was reach without establishing ssh connection
364 # the node is blacklisted, deleted from the slice, and a new
365 # node to provision is discovered
366 with PlanetlabNode.lock:
367 self.warning(" Could not SSH login ")
368 self._blacklist_node(node)
369 #self._delete_node_from_slice(node)
373 # check /proc directory is mounted (ssh_ok = True)
374 # and file system is not read only
376 cmd = 'mount |grep proc'
377 ((out1, err1), proc1) = self.execute(cmd)
378 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
379 ((out2, err2), proc2) = self.execute(cmd)
380 if out1.find("/proc type proc") < 0 or \
381 "Read-only file system".lower() in err2.lower():
382 with PlanetlabNode.lock:
383 self.warning(" Corrupted file system ")
384 self._blacklist_node(node)
385 #self._delete_node_from_slice(node)
391 if not self.get('hostname'):
392 self._set_hostname_attr(node)
394 ip = self._get_ip(node)
396 self.info(" Node provisioned ")
398 super(PlanetlabNode, self).do_provision()
400 def do_release(self):
401 super(PlanetlabNode, self).do_release()
402 if self.state == ResourceState.RELEASED and not self._skip_provision():
403 self.debug(" Releasing PLC API ")
406 def _filter_based_on_attributes(self):
408 Retrive the list of nodes ids that match user's constraints
410 # Map user's defined attributes with tagnames of PlanetLab
411 timeframe = self.get("timeframe")[0]
414 'country' : 'country',
416 'architecture' : 'arch',
417 'operatingSystem' : 'fcdistro',
418 'minReliability' : 'reliability%s' % timeframe,
419 'maxReliability' : 'reliability%s' % timeframe,
420 'minBandwidth' : 'bw%s' % timeframe,
421 'maxBandwidth' : 'bw%s' % timeframe,
422 'minLoad' : 'load%s' % timeframe,
423 'maxLoad' : 'load%s' % timeframe,
424 'minCpu' : 'cpu%s' % timeframe,
425 'maxCpu' : 'cpu%s' % timeframe,
431 for attr_name, attr_obj in self._attrs.iteritems():
432 attr_value = self.get(attr_name)
434 if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
435 attr_name != 'timeframe':
437 attr_tag = attr_to_tags[attr_name]
438 filters['tagname'] = attr_tag
440 # filter nodes by fixed constraints e.g. operating system
441 if not 'min' in attr_name and not 'max' in attr_name:
442 filters['value'] = attr_value
443 nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
445 # filter nodes by range constraints e.g. max bandwidth
446 elif ('min' or 'max') in attr_name:
447 nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
450 nodes = self._get_nodes_id()
452 nodes_id.append(node['node_id'])
455 def _filter_by_fixed_attr(self, filters, nodes_id):
457 Query PLCAPI for nodes ids matching fixed attributes defined by the
460 node_tags = self.plapi.get_node_tags(filters)
461 if node_tags is not None:
463 if len(nodes_id) == 0:
464 # first attribute being matched
465 for node_tag in node_tags:
466 nodes_id.append(node_tag['node_id'])
468 # remove the nodes ids that don't match the new attribute
469 # that is being match
472 for node_tag in node_tags:
473 if node_tag['node_id'] in nodes_id:
474 nodes_id_tmp.append(node_tag['node_id'])
476 if len(nodes_id_tmp):
477 nodes_id = set(nodes_id) & set(nodes_id_tmp)
479 # no node from before match the new constraint
480 self.fail_discovery()
482 # no nodes match the filter applied
483 self.fail_discovery()
487 def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
489 Query PLCAPI for nodes ids matching attributes defined in a certain
492 node_tags = self.plapi.get_node_tags(filters)
495 if len(nodes_id) == 0:
496 # first attribute being matched
497 for node_tag in node_tags:
499 # check that matches the min or max restriction
500 if 'min' in attr_name and node_tag['value'] != 'n/a' and \
501 float(node_tag['value']) > attr_value:
502 nodes_id.append(node_tag['node_id'])
504 elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
505 float(node_tag['value']) < attr_value:
506 nodes_id.append(node_tag['node_id'])
509 # remove the nodes ids that don't match the new attribute
510 # that is being match
512 for node_tag in node_tags:
514 # check that matches the min or max restriction and was a
515 # matching previous filters
516 if 'min' in attr_name and node_tag['value'] != 'n/a' and \
517 float(node_tag['value']) > attr_value and \
518 node_tag['node_id'] in nodes_id:
519 nodes_id_tmp.append(node_tag['node_id'])
521 elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
522 float(node_tag['value']) < attr_value and \
523 node_tag['node_id'] in nodes_id:
524 nodes_id_tmp.append(node_tag['node_id'])
526 if len(nodes_id_tmp):
527 nodes_id = set(nodes_id) & set(nodes_id_tmp)
529 # no node from before match the new constraint
530 self.fail_discovery()
533 # no nodes match the filter applied
534 self.fail_discovery()
538 def _choose_random_node(self, nodes):
540 From the possible nodes for provision, choose randomly to decrese the
541 probability of different RMs choosing the same node for provision
546 index = randint(0, size)
547 node_id = nodes[index]
548 nodes[index] = nodes[size]
550 # check the node is not blacklisted or being provision by other RM
551 # and perform ping to check that is really alive
552 with PlanetlabNode.lock:
554 blist = self.plapi.blacklisted()
555 plist = self.plapi.reserved()
556 if node_id not in blist and node_id not in plist:
557 ping_ok = self._do_ping(node_id)
559 self._set_hostname_attr(node_id)
560 self.warning(" Node not responding PING ")
561 self._blacklist_node(node_id)
563 # discovered node for provision, added to provision list
564 self._put_node_in_provision(node_id)
567 def _get_nodes_id(self, filters=None):
568 return self.plapi.get_nodes(filters, fields=['node_id'])
570 def _add_node_to_slice(self, node_id):
571 self.info(" Adding node to slice ")
572 slicename = self.get("username")
573 with PlanetlabNode.lock:
574 slice_nodes = self.plapi.get_slice_nodes(slicename)
575 self.debug(" Previous slice nodes %s " % slice_nodes)
576 slice_nodes.append(node_id)
577 self.plapi.add_slice_nodes(slicename, slice_nodes)
579 def _delete_node_from_slice(self, node):
580 self.warning(" Deleting node from slice ")
581 slicename = self.get("username")
582 self.plapi.delete_slice_node(slicename, [node])
584 def _get_hostname(self):
585 hostname = self.get("hostname")
590 hostname = socket.gethostbyaddr(ip)[0]
591 self.set('hostname', hostname)
596 def _set_hostname_attr(self, node):
598 Query PLCAPI for the hostname of a certain node id and sets the
599 attribute hostname, it will over write the previous value
601 hostname = self.plapi.get_nodes(node, ['hostname'])
602 self.set("hostname", hostname[0]['hostname'])
604 def _check_if_in_slice(self, nodes_id):
606 Query PLCAPI to find out if any node id from nodes_id is in the user's
609 slicename = self.get("username")
610 slice_nodes = self.plapi.get_slice_nodes(slicename)
611 nodes_inslice = list(set(nodes_id) & set(slice_nodes))
614 def _do_ping(self, node_id):
616 Perform ping command on node's IP matching node id
619 ip = self._get_ip(node_id)
621 command = "ping -c4 %s" % ip
622 (out, err) = lexec(command)
624 m = re.search("(\d+)% packet loss", str(out))
625 if m and int(m.groups()[0]) < 50:
630 def _blacklist_node(self, node):
632 Add node mal functioning node to blacklist
634 self.warning(" Blacklisting malfunctioning node ")
635 self.plapi.blacklist_host(node)
636 if not self._hostname:
637 self.set('hostname', None)
639 def _put_node_in_provision(self, node):
641 Add node to the list of nodes being provisioned, in order for other RMs
642 to not try to provision the same one again
644 self.plapi.reserve_host(node)
646 def _get_ip(self, node_id):
648 Query PLCAPI for the IP of a node with certain node id
650 hostname = self.get("hostname") or \
651 self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname']
653 ip = sshfuncs.gethostbyname(hostname)
655 # Fail while trying to find the IP
659 def fail_discovery(self):
660 msg = "Discovery failed. No candidates found for node"
662 raise RuntimeError, msg
664 def fail_node_not_alive(self, hostname=None):
665 msg = "Node %s not alive" % hostname
666 raise RuntimeError, msg
668 def fail_node_not_available(self, hostname):
669 msg = "Node %s not available for provisioning" % hostname
670 raise RuntimeError, msg
672 def fail_not_enough_nodes(self):
673 msg = "Not enough nodes available for provisioning"
674 raise RuntimeError, msg
676 def fail_plapi(self):
677 msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
678 " attributes pluser and plpassword."
679 raise RuntimeError, msg
681 def valid_connection(self, guid):