2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
29 from random import randint
39 class PlanetlabNode(LinuxNode):
40 _rtype = "PlanetlabNode"
41 _help = "Controls a PlanetLab host accessible using a SSH key " \
42 "associated to a PlanetLab user account"
43 _backend = "planetlab"
45 lock = threading.Lock()
48 def _register_attributes(cls):
49 ip = Attribute("ip", "PlanetLab host public IP address",
52 pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
53 (e.g. www.planet-lab.eu or www.planet-lab.org) ",
54 default = "www.planet-lab.eu",
55 flags = Flags.Credential)
57 pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
58 (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
59 default = "https://%(hostname)s:443/PLCAPI/",
62 pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
63 authenticate in the website) ",
64 flags = Flags.Credential)
66 pl_password = Attribute("plpassword",
67 "PlanetLab account password, as \
68 the one to authenticate in the website) ",
69 flags = Flags.Credential)
71 city = Attribute("city", "Constrain location (city) during resource \
72 discovery. May use wildcards.",
75 country = Attribute("country", "Constrain location (country) during \
76 resource discovery. May use wildcards.",
79 region = Attribute("region", "Constrain location (region) during \
80 resource discovery. May use wildcards.",
83 architecture = Attribute("architecture", "Constrain architecture \
84 during resource discovery.",
85 type = Types.Enumerate,
90 operating_system = Attribute("operatingSystem", "Constrain operating \
91 system during resource discovery.",
92 type = Types.Enumerate,
100 min_reliability = Attribute("minReliability", "Constrain reliability \
101 while picking PlanetLab nodes. Specifies a lower \
105 flags = Flags.Filter)
107 max_reliability = Attribute("maxReliability", "Constrain reliability \
108 while picking PlanetLab nodes. Specifies an upper \
112 flags = Flags.Filter)
114 min_bandwidth = Attribute("minBandwidth", "Constrain available \
115 bandwidth while picking PlanetLab nodes. \
116 Specifies a lower acceptable bound.",
119 flags = Flags.Filter)
121 max_bandwidth = Attribute("maxBandwidth", "Constrain available \
122 bandwidth while picking PlanetLab nodes. \
123 Specifies an upper acceptable bound.",
126 flags = Flags.Filter)
128 min_load = Attribute("minLoad", "Constrain node load average while \
129 picking PlanetLab nodes. Specifies a lower acceptable \
133 flags = Flags.Filter)
135 max_load = Attribute("maxLoad", "Constrain node load average while \
136 picking PlanetLab nodes. Specifies an upper acceptable \
140 flags = Flags.Filter)
142 min_cpu = Attribute("minCpu", "Constrain available cpu time while \
143 picking PlanetLab nodes. Specifies a lower acceptable \
147 flags = Flags.Filter)
149 max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
150 picking PlanetLab nodes. Specifies an upper acceptable \
154 flags = Flags.Filter)
156 timeframe = Attribute("timeframe", "Past time period in which to check\
157 information about the node. Values are year,month, \
160 type = Types.Enumerate,
165 flags = Flags.Filter)
167 plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
168 in the user's home directory under .nepi directory. This file \
169 contains a list of PL nodes to blacklist, and at the end \
170 of the experiment execution the new blacklisted nodes are added.",
173 flags = Flags.Global)
175 cls._register_attribute(ip)
176 cls._register_attribute(pl_url)
177 cls._register_attribute(pl_ptn)
178 cls._register_attribute(pl_user)
179 cls._register_attribute(pl_password)
180 cls._register_attribute(city)
181 cls._register_attribute(country)
182 cls._register_attribute(region)
183 cls._register_attribute(architecture)
184 cls._register_attribute(operating_system)
185 cls._register_attribute(min_reliability)
186 cls._register_attribute(max_reliability)
187 cls._register_attribute(min_bandwidth)
188 cls._register_attribute(max_bandwidth)
189 cls._register_attribute(min_load)
190 cls._register_attribute(max_load)
191 cls._register_attribute(min_cpu)
192 cls._register_attribute(max_cpu)
193 cls._register_attribute(timeframe)
194 cls._register_attribute(plblacklist)
196 def __init__(self, ec, guid):
197 super(PlanetlabNode, self).__init__(ec, guid)
199 self._ecobj = weakref.ref(ec)
201 self._node_to_provision = None
202 self._slicenode = False
203 self._hostname = False
205 if self.get("gateway") or self.get("gatewayUser"):
206 self.set("gateway", None)
207 self.set("gatewayUser", None)
210 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
211 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
212 if not os.path.exists(plblacklist_file):
213 if os.path.isdir(nepi_home):
214 open(plblacklist_file, 'w').close()
216 os.makedirs(nepi_home)
217 open(plblacklist_file, 'w').close()
219 def _skip_provision(self):
220 pl_user = self.get("pluser")
221 pl_pass = self.get("plpassword")
222 if not pl_user and not pl_pass:
229 pl_user = self.get("pluser")
230 pl_pass = self.get("plpassword")
231 pl_url = self.get("plcApiUrl")
232 pl_ptn = self.get("plcApiPattern")
233 _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
234 pl_ptn, self._ecobj())
239 self._plapi = weakref.ref(_plapi)
243 def do_discover(self):
245 Based on the attributes defined by the user, discover the suitable
248 if self._skip_provision():
249 super(PlanetlabNode, self).do_discover()
252 hostname = self._get_hostname()
254 # the user specified one particular node to be provisioned
255 self._hostname = True
256 node_id = self._get_nodes_id({'hostname':hostname})
257 node_id = node_id.pop()['node_id']
259 # check that the node is not blacklisted or being provisioned
261 with PlanetlabNode.lock:
262 plist = self.plapi.reserved()
263 blist = self.plapi.blacklisted()
264 if node_id not in blist and node_id not in plist:
266 # check that is really alive, by performing ping
267 ping_ok = self._do_ping(node_id)
269 self._blacklist_node(node_id)
270 self.fail_node_not_alive(hostname)
272 if self._check_if_in_slice([node_id]):
273 self._slicenode = True
274 self._put_node_in_provision(node_id)
275 self._node_to_provision = node_id
277 self.fail_node_not_available(hostname)
278 super(PlanetlabNode, self).do_discover()
281 # the user specifies constraints based on attributes, zero, one or
282 # more nodes can match these constraints
283 nodes = self._filter_based_on_attributes()
285 # nodes that are already part of user's slice have the priority to
287 nodes_inslice = self._check_if_in_slice(nodes)
288 nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
292 node_id = self._choose_random_node(nodes_inslice)
293 self._slicenode = True
296 # Either there were no matching nodes in the user's slice, or
297 # the nodes in the slice were blacklisted or being provisioned
298 # by other RM. Note nodes_not_inslice is never empty
299 node_id = self._choose_random_node(nodes_not_inslice)
300 self._slicenode = False
303 self._node_to_provision = node_id
305 self._set_hostname_attr(node_id)
306 self.info(" Selected node to provision ")
307 super(PlanetlabNode, self).do_discover()
309 with PlanetlabNode.lock:
310 self._blacklist_node(node_id)
313 self.fail_not_enough_nodes()
315 def do_provision(self):
317 Add node to user's slice after verifing that the node is functioning
320 if self._skip_provision():
321 super(PlanetlabNode, self).do_provision()
329 while not provision_ok:
330 node = self._node_to_provision
331 if not self._slicenode:
332 self._add_node_to_slice(node)
333 if self._check_if_in_slice([node]):
334 self.debug( "Node added to slice" )
336 self.warning(" Could not add to slice ")
337 with PlanetlabNode.lock:
338 self._blacklist_node(node)
342 # check ssh connection
344 while t < timeout and not ssh_ok:
346 cmd = 'echo \'GOOD NODE\''
347 ((out, err), proc) = self.execute(cmd)
348 if out.find("GOOD NODE") < 0:
349 self.debug( "No SSH connection, waiting 60s" )
354 self.debug( "SSH OK" )
358 cmd = 'echo \'GOOD NODE\''
359 ((out, err), proc) = self.execute(cmd)
360 if not out.find("GOOD NODE") < 0:
364 # the timeout was reach without establishing ssh connection
365 # the node is blacklisted, deleted from the slice, and a new
366 # node to provision is discovered
367 with PlanetlabNode.lock:
368 self.warning(" Could not SSH login ")
369 self._blacklist_node(node)
370 #self._delete_node_from_slice(node)
374 # check /proc directory is mounted (ssh_ok = True)
375 # and file system is not read only
377 cmd = 'mount |grep proc'
378 ((out1, err1), proc1) = self.execute(cmd)
379 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
380 ((out2, err2), proc2) = self.execute(cmd)
381 if out1.find("/proc type proc") < 0 or \
382 "Read-only file system".lower() in err2.lower():
383 with PlanetlabNode.lock:
384 self.warning(" Corrupted file system ")
385 self._blacklist_node(node)
386 #self._delete_node_from_slice(node)
392 if not self.get('hostname'):
393 self._set_hostname_attr(node)
395 ip = self._get_ip(node)
397 self.info(" Node provisioned ")
399 super(PlanetlabNode, self).do_provision()
401 def do_release(self):
402 super(PlanetlabNode, self).do_release()
403 if self.state == ResourceState.RELEASED and not self._skip_provision():
404 self.debug(" Releasing PLC API ")
407 def _filter_based_on_attributes(self):
409 Retrive the list of nodes ids that match user's constraints
411 # Map user's defined attributes with tagnames of PlanetLab
412 timeframe = self.get("timeframe")[0]
415 'country' : 'country',
417 'architecture' : 'arch',
418 'operatingSystem' : 'fcdistro',
419 'minReliability' : 'reliability%s' % timeframe,
420 'maxReliability' : 'reliability%s' % timeframe,
421 'minBandwidth' : 'bw%s' % timeframe,
422 'maxBandwidth' : 'bw%s' % timeframe,
423 'minLoad' : 'load%s' % timeframe,
424 'maxLoad' : 'load%s' % timeframe,
425 'minCpu' : 'cpu%s' % timeframe,
426 'maxCpu' : 'cpu%s' % timeframe,
432 for attr_name, attr_obj in self._attrs.iteritems():
433 attr_value = self.get(attr_name)
435 if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
436 attr_name != 'timeframe':
438 attr_tag = attr_to_tags[attr_name]
439 filters['tagname'] = attr_tag
441 # filter nodes by fixed constraints e.g. operating system
442 if not 'min' in attr_name and not 'max' in attr_name:
443 filters['value'] = attr_value
444 nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
446 # filter nodes by range constraints e.g. max bandwidth
447 elif ('min' or 'max') in attr_name:
448 nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
451 nodes = self._get_nodes_id()
453 nodes_id.append(node['node_id'])
456 def _filter_by_fixed_attr(self, filters, nodes_id):
458 Query PLCAPI for nodes ids matching fixed attributes defined by the
461 node_tags = self.plapi.get_node_tags(filters)
462 if node_tags is not None:
464 if len(nodes_id) == 0:
465 # first attribute being matched
466 for node_tag in node_tags:
467 nodes_id.append(node_tag['node_id'])
469 # remove the nodes ids that don't match the new attribute
470 # that is being match
473 for node_tag in node_tags:
474 if node_tag['node_id'] in nodes_id:
475 nodes_id_tmp.append(node_tag['node_id'])
477 if len(nodes_id_tmp):
478 nodes_id = set(nodes_id) & set(nodes_id_tmp)
480 # no node from before match the new constraint
481 self.fail_discovery()
483 # no nodes match the filter applied
484 self.fail_discovery()
488 def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
490 Query PLCAPI for nodes ids matching attributes defined in a certain
493 node_tags = self.plapi.get_node_tags(filters)
496 if len(nodes_id) == 0:
497 # first attribute being matched
498 for node_tag in node_tags:
500 # check that matches the min or max restriction
501 if 'min' in attr_name and node_tag['value'] != 'n/a' and \
502 float(node_tag['value']) > attr_value:
503 nodes_id.append(node_tag['node_id'])
505 elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
506 float(node_tag['value']) < attr_value:
507 nodes_id.append(node_tag['node_id'])
510 # remove the nodes ids that don't match the new attribute
511 # that is being match
513 for node_tag in node_tags:
515 # check that matches the min or max restriction and was a
516 # matching previous filters
517 if 'min' in attr_name and node_tag['value'] != 'n/a' and \
518 float(node_tag['value']) > attr_value and \
519 node_tag['node_id'] in nodes_id:
520 nodes_id_tmp.append(node_tag['node_id'])
522 elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
523 float(node_tag['value']) < attr_value and \
524 node_tag['node_id'] in nodes_id:
525 nodes_id_tmp.append(node_tag['node_id'])
527 if len(nodes_id_tmp):
528 nodes_id = set(nodes_id) & set(nodes_id_tmp)
530 # no node from before match the new constraint
531 self.fail_discovery()
534 # no nodes match the filter applied
535 self.fail_discovery()
539 def _choose_random_node(self, nodes):
541 From the possible nodes for provision, choose randomly to decrese the
542 probability of different RMs choosing the same node for provision
547 index = randint(0, size)
548 node_id = nodes[index]
549 nodes[index] = nodes[size]
551 # check the node is not blacklisted or being provision by other RM
552 # and perform ping to check that is really alive
553 with PlanetlabNode.lock:
555 blist = self.plapi.blacklisted()
556 plist = self.plapi.reserved()
557 if node_id not in blist and node_id not in plist:
558 ping_ok = self._do_ping(node_id)
560 self._set_hostname_attr(node_id)
561 self.warning(" Node not responding PING ")
562 self._blacklist_node(node_id)
564 # discovered node for provision, added to provision list
565 self._put_node_in_provision(node_id)
568 def _get_nodes_id(self, filters=None):
569 return self.plapi.get_nodes(filters, fields=['node_id'])
571 def _add_node_to_slice(self, node_id):
572 self.info(" Adding node to slice ")
573 slicename = self.get("username")
574 with PlanetlabNode.lock:
575 slice_nodes = self.plapi.get_slice_nodes(slicename)
576 self.debug(" Previous slice nodes %s " % slice_nodes)
577 slice_nodes.append(node_id)
578 self.plapi.add_slice_nodes(slicename, slice_nodes)
580 def _delete_node_from_slice(self, node):
581 self.warning(" Deleting node from slice ")
582 slicename = self.get("username")
583 self.plapi.delete_slice_node(slicename, [node])
585 def _get_hostname(self):
586 hostname = self.get("hostname")
591 hostname = socket.gethostbyaddr(ip)[0]
592 self.set('hostname', hostname)
597 def _set_hostname_attr(self, node):
599 Query PLCAPI for the hostname of a certain node id and sets the
600 attribute hostname, it will over write the previous value
602 hostname = self.plapi.get_nodes(node, ['hostname'])
603 self.set("hostname", hostname[0]['hostname'])
605 def _check_if_in_slice(self, nodes_id):
607 Query PLCAPI to find out if any node id from nodes_id is in the user's
610 slicename = self.get("username")
611 slice_nodes = self.plapi.get_slice_nodes(slicename)
612 nodes_inslice = list(set(nodes_id) & set(slice_nodes))
615 def _do_ping(self, node_id):
617 Perform ping command on node's IP matching node id
620 ip = self._get_ip(node_id)
622 command = "ping -c4 %s" % ip
623 (out, err) = lexec(command)
625 m = re.search("(\d+)% packet loss", str(out))
626 if m and int(m.groups()[0]) < 50:
631 def _blacklist_node(self, node):
633 Add node mal functioning node to blacklist
635 self.warning(" Blacklisting malfunctioning node ")
636 self.plapi.blacklist_host(node)
637 if not self._hostname:
638 self.set('hostname', None)
640 def _put_node_in_provision(self, node):
642 Add node to the list of nodes being provisioned, in order for other RMs
643 to not try to provision the same one again
645 self.plapi.reserve_host(node)
647 def _get_ip(self, node_id):
649 Query PLCAPI for the IP of a node with certain node id
651 hostname = self.get("hostname") or \
652 self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname']
654 ip = sshfuncs.gethostbyname(hostname)
656 # Fail while trying to find the IP
660 def fail_discovery(self):
661 msg = "Discovery failed. No candidates found for node"
663 raise RuntimeError, msg
665 def fail_node_not_alive(self, hostname=None):
666 msg = "Node %s not alive" % hostname
667 raise RuntimeError, msg
669 def fail_node_not_available(self, hostname):
670 msg = "Node %s not available for provisioning" % hostname
671 raise RuntimeError, msg
673 def fail_not_enough_nodes(self):
674 msg = "Not enough nodes available for provisioning"
675 raise RuntimeError, msg
677 def fail_plapi(self):
678 msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
679 " attributes pluser and plpassword."
680 raise RuntimeError, msg
682 def valid_connection(self, guid):