2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23 ResourceState, reschedule_delay, failtrap
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory
26 from nepi.util.execfuncs import lexec
28 from random import randint
33 class PlanetlabNode(LinuxNode):
34 _rtype = "PlanetlabNode"
35 _help = "Controls a PlanetLab host accessible using a SSH key " \
36 "associated to a PlanetLab user account"
37 _backend = "planetlab"
40 provisionlist = list()
42 lock_blist = threading.Lock()
43 lock_plist = threading.Lock()
45 lock_slice = threading.Lock()
49 def _register_attributes(cls):
50 ip = Attribute("ip", "PlanetLab host public IP address",
51 flags = Flags.ReadOnly)
53 pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
54 (e.g. www.planet-lab.eu or www.planet-lab.org) ",
55 default = "www.planet-lab.eu",
56 flags = Flags.Credential)
58 pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
59 (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
60 default = "https://%(hostname)s:443/PLCAPI/",
61 flags = Flags.ExecReadOnly)
63 pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
64 authenticate in the website) ",
65 flags = Flags.Credential)
67 pl_password = Attribute("password",
68 "PlanetLab account password, as \
69 the one to authenticate in the website) ",
70 flags = Flags.Credential)
72 city = Attribute("city", "Constrain location (city) during resource \
73 discovery. May use wildcards.",
76 country = Attribute("country", "Constrain location (country) during \
77 resource discovery. May use wildcards.",
80 region = Attribute("region", "Constrain location (region) during \
81 resource discovery. May use wildcards.",
84 architecture = Attribute("architecture", "Constrain architecture \
85 during resource discovery.",
86 type = Types.Enumerate,
91 operating_system = Attribute("operatingSystem", "Constrain operating \
92 system during resource discovery.",
93 type = Types.Enumerate,
101 site = Attribute("site", "Constrain the PlanetLab site this node \
103 type = Types.Enumerate,
107 flags = Flags.Filter)
109 min_reliability = Attribute("minReliability", "Constrain reliability \
110 while picking PlanetLab nodes. Specifies a lower \
114 flags = Flags.Filter)
116 max_reliability = Attribute("maxReliability", "Constrain reliability \
117 while picking PlanetLab nodes. Specifies an upper \
121 flags = Flags.Filter)
123 min_bandwidth = Attribute("minBandwidth", "Constrain available \
124 bandwidth while picking PlanetLab nodes. \
125 Specifies a lower acceptable bound.",
128 flags = Flags.Filter)
130 max_bandwidth = Attribute("maxBandwidth", "Constrain available \
131 bandwidth while picking PlanetLab nodes. \
132 Specifies an upper acceptable bound.",
135 flags = Flags.Filter)
137 min_load = Attribute("minLoad", "Constrain node load average while \
138 picking PlanetLab nodes. Specifies a lower acceptable \
142 flags = Flags.Filter)
144 max_load = Attribute("maxLoad", "Constrain node load average while \
145 picking PlanetLab nodes. Specifies an upper acceptable \
149 flags = Flags.Filter)
151 min_cpu = Attribute("minCpu", "Constrain available cpu time while \
152 picking PlanetLab nodes. Specifies a lower acceptable \
156 flags = Flags.Filter)
158 max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
159 picking PlanetLab nodes. Specifies an upper acceptable \
163 flags = Flags.Filter)
165 timeframe = Attribute("timeframe", "Past time period in which to check\
166 information about the node. Values are year,month, \
169 type = Types.Enumerate,
174 flags = Flags.Filter)
176 cls._register_attribute(ip)
177 cls._register_attribute(pl_url)
178 cls._register_attribute(pl_ptn)
179 cls._register_attribute(pl_user)
180 cls._register_attribute(pl_password)
181 cls._register_attribute(site)
182 cls._register_attribute(city)
183 cls._register_attribute(country)
184 cls._register_attribute(region)
185 cls._register_attribute(architecture)
186 cls._register_attribute(operating_system)
187 cls._register_attribute(min_reliability)
188 cls._register_attribute(max_reliability)
189 cls._register_attribute(min_bandwidth)
190 cls._register_attribute(max_bandwidth)
191 cls._register_attribute(min_load)
192 cls._register_attribute(max_load)
193 cls._register_attribute(min_cpu)
194 cls._register_attribute(max_cpu)
195 cls._register_attribute(timeframe)
197 def __init__(self, ec, guid):
198 super(PlanetlabNode, self).__init__(ec, guid)
201 self._node_to_provision = None
206 pl_user = self.get("pluser")
207 pl_pass = self.get("password")
208 pl_url = self.get("plcApiUrl")
209 pl_ptn = self.get("plcApiPattern")
211 self._plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
218 Based on the attributes defined by the user, discover the suitable nodes
220 hostname = self.get("hostname")
222 # the user specified one particular node to be provisioned
223 # check with PLCAPI if it is alvive
224 node_id = self._query_if_alive(hostname=hostname)
225 node_id = node_id.pop()
227 # check that the node is not blacklisted or already being provision
229 blist = PlanetlabNode.blacklist
230 plist = PlanetlabNode.provisionlist
231 if node_id not in blist and node_id not in plist:
233 # check that is really alive, by performing ping
234 ping_ok = self._do_ping(node_id)
236 self._blacklist_node(node_id)
237 self.fail_node_not_alive(hostname)
239 self._node_to_provision = node_id
240 self._put_node_in_provision(node_id)
241 super(PlanetlabNode, self).discover()
244 self.fail_node_not_available(hostname)
247 # the user specifies constraints based on attributes, zero, one or
248 # more nodes can match these constraints
249 nodes = self._filter_based_on_attributes()
250 nodes_alive = self._query_if_alive(nodes)
252 # nodes that are already part of user's slice have the priority to
254 nodes_inslice = self._check_if_in_slice(nodes_alive)
255 nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
259 node_id = self._choose_random_node(nodes_inslice)
261 if not node_id and nodes_not_inslice:
262 # Either there were no matching nodes in the user's slice, or
263 # the nodes in the slice were blacklisted or being provisioned
264 # by other RM. Note nodes_not_inslice is never empty
265 node_id = self._choose_random_node(nodes_not_inslice)
267 self.fail_not_enough_nodes()
269 self._node_to_provision = node_id
270 super(PlanetlabNode, self).discover()
272 def provisionl(self):
274 Add node to user's slice after verifing that the node is functioning
282 while not provision_ok:
283 node = self._node_to_provision
284 self._set_hostname_attr(node)
285 self._add_node_to_slice(node)
287 # check ssh connection
289 while t < timeout and not ssh_ok:
291 cmd = 'echo \'GOOD NODE\''
292 ((out, err), proc) = self.execute(cmd)
293 if out.find("GOOD NODE") < 0:
302 # the timeout was reach without establishing ssh connection
303 # the node is blacklisted, deleted from the slice, and a new
304 # node to provision is discovered
305 self._blacklist_node(node)
306 self._delete_node_from_slice(node)
310 # check /proc directory is mounted (ssh_ok = True)
312 cmd = 'mount |grep proc'
313 ((out, err), proc) = self.execute(cmd)
314 if out.find("/proc type proc") < 0:
315 self._blacklist_node(node)
316 self._delete_node_from_slice(node)
323 ip = self._get_ip(node)
326 super(PlanetlabNode, self).provision()
328 def _filter_based_on_attributes(self):
330 Retrive the list of nodes ids that match user's constraints
332 # Map user's defined attributes with tagnames of PlanetLab
333 timeframe = self.get("timeframe")[0]
336 'country' : 'country',
338 'architecture' : 'arch',
339 'operatingSystem' : 'fcdistro',
340 #'site' : 'pldistro',
341 'minReliability' : 'reliability%s' % timeframe,
342 'maxReliability' : 'reliability%s' % timeframe,
343 'minBandwidth' : 'bw%s' % timeframe,
344 'maxBandwidth' : 'bw%s' % timeframe,
345 'minLoad' : 'load%s' % timeframe,
346 'maxLoad' : 'load%s' % timeframe,
347 'minCpu' : 'cpu%s' % timeframe,
348 'maxCpu' : 'cpu%s' % timeframe,
354 for attr_name, attr_obj in self._attrs.iteritems():
355 attr_value = self.get(attr_name)
357 if attr_value is not None and attr_obj.flags == 8 and \
358 attr_name != 'timeframe':
360 attr_tag = attr_to_tags[attr_name]
361 filters['tagname'] = attr_tag
363 # filter nodes by fixed constraints e.g. operating system
364 if not 'min' in attr_name and not 'max' in attr_name:
365 filters['value'] = attr_value
366 nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
368 # filter nodes by range constraints e.g. max bandwidth
369 elif ('min' or 'max') in attr_name:
370 nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
375 def _filter_by_fixed_attr(self, filters, nodes_id):
377 Query PLCAPI for nodes ids matching fixed attributes defined by the
380 node_tags = self.plapi.get_node_tags(filters)
381 if node_tags is not None:
383 if len(nodes_id) == 0:
384 # first attribute being matched
385 for node_tag in node_tags:
386 nodes_id.append(node_tag['node_id'])
388 # remove the nodes ids that don't match the new attribute
389 # that is being match
392 for node_tag in node_tags:
393 if node_tag['node_id'] in nodes_id:
394 nodes_id_tmp.append(node_tag['node_id'])
396 if len(nodes_id_tmp):
397 nodes_id = set(nodes_id) & set(nodes_id_tmp)
399 # no node from before match the new constraint
400 self.fail_discovery()
402 # no nodes match the filter applied
403 self.fail_discovery()
407 def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
409 Query PLCAPI for nodes ids matching attributes defined in a certain
412 node_tags = self.plapi.get_node_tags(filters)
413 if node_tags is not None:
415 if len(nodes_id) == 0:
416 # first attribute being matched
417 for node_tag in node_tags:
419 # check that matches the min or max restriction
420 if 'min' in attr_name and node_tag['value'] != 'n/a' and \
421 float(node_tag['value']) > attr_value:
422 nodes_id.append(node_tag['node_id'])
424 elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
425 float(node_tag['value']) < attr_value:
426 nodes_id.append(node_tag['node_id'])
429 # remove the nodes ids that don't match the new attribute
430 # that is being match
432 for node_tag in node_tags:
434 # check that matches the min or max restriction and was a
435 # matching previous filters
436 if 'min' in attr_name and node_tag['value'] != 'n/a' and \
437 float(node_tag['value']) > attr_value and \
438 node_tag['node_id'] in nodes_id:
439 nodes_id_tmp.append(node_tag['node_id'])
441 elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
442 float(node_tag['value']) < attr_value and \
443 node_tag['node_id'] in nodes_id:
444 nodes_id_tmp.append(node_tag['node_id'])
446 if len(nodes_id_tmp):
447 nodes_id = set(nodes_id) & set(nodes_id_tmp)
449 # no node from before match the new constraint
450 self.fail_discovery()
453 # no nodes match the filter applied
454 self.fail_discovery()
458 def _query_if_alive(self, nodes_id=None, hostname=None):
460 Query PLCAPI for nodes that register activity recently, using filters
461 related to the state of the node, e.g. last time it was contacted
463 if nodes_id is None and hostname is None:
464 msg = "Specify nodes_id or hostname"
465 raise RuntimeError, msg
467 if nodes_id is not None and hostname is not None:
468 msg = "Specify either nodes_id or hostname"
469 raise RuntimeError, msg
471 # define PL filters to check the node is alive
473 filters['run_level'] = 'boot'
474 filters['boot_state'] = 'boot'
475 filters['node_type'] = 'regular'
476 filters['>last_contact'] = int(time.time()) - 2*3600
478 # adding node_id or hostname to the filters to check for the particular
481 filters['node_id'] = list(nodes_id)
482 alive_nodes_id = self._get_nodes_id(filters)
484 filters['hostname'] = hostname
485 alive_nodes_id = self._get_nodes_id(filters)
487 if len(alive_nodes_id) == 0:
488 self.fail_discovery()
491 for node_id in alive_nodes_id:
492 nid = node_id['node_id']
497 def _choose_random_node(self, nodes):
499 From the possible nodes for provision, choose randomly to decrese the
500 probability of different RMs choosing the same node for provision
502 blist = PlanetlabNode.blacklist
503 plist = PlanetlabNode.provisionlist
508 index = randint(0, size)
509 node_id = nodes[index]
510 nodes[index] = nodes[size]
512 # check the node is not blacklisted or being provision by other RM
513 # and perform ping to check that is really alive
514 if node_id not in blist and node_id not in plist:
515 ping_ok = self._do_ping(node_id)
517 self._blacklist_node(node_id)
519 # discovered node for provision, added to provision list
520 self._put_node_in_provision(node_id)
523 def _get_nodes_id(self, filters):
524 return self.plapi.get_nodes(filters, fields=['node_id'])
526 def _add_node_to_slice(self, node_id):
527 self.warn(" Adding node to slice ")
528 slicename = self.get("username")
529 with PlanetlabNode.lock_slice:
530 slice_nodes = self.plapi.get_slice_nodes(slicename)
531 slice_nodes.append(node_id)
532 self.plapi.add_slice_nodes(slicename, slice_nodes)
534 def _delete_node_from_slice(self, node):
535 self.warn(" Deleting node from slice ")
536 slicename = self.get("username")
537 self.plapi.delete_slice_node(slicename, [node])
539 def _set_hostname_attr(self, node):
541 Query PLCAPI for the hostname of a certain node id and sets the
542 attribute hostname, it will over write the previous value
544 hostname = self.plapi.get_nodes(node, ['hostname'])
545 self.set("hostname", hostname[0]['hostname'])
547 def _check_if_in_slice(self, nodes_id):
549 Query PLCAPI to find out if any node id from nodes_id is in the user's
552 slicename = self.get("username")
553 slice_nodes = self.plapi.get_slice_nodes(slicename)
554 nodes_inslice = list(set(nodes_id) & set(slice_nodes))
558 def _do_ping(self, node_id):
560 Perform ping command on node's IP matching node id
563 ip = self._get_ip(node_id)
564 command = "ping -c2 %s | echo \"PING OK\"" % ip
566 (out, err) = lexec(command)
567 if not out.find("PING OK") < 0:
572 def _blacklist_node(self, node):
574 Add node mal functioning node to blacklist
576 blist = PlanetlabNode.blacklist
578 self.warn(" Blacklisting malfunctioning node ")
579 with PlanetlabNode.lock_blist:
582 def _put_node_in_provision(self, node):
584 Add node to the list of nodes being provisioned, in order for other RMs
585 to not try to provision the same one again
587 plist = PlanetlabNode.provisionlist
589 self.warn(" Provisioning node ")
590 with PlanetlabNode.lock_plist:
593 def _get_ip(self, node_id):
595 Query PLCAPI for the IP of a node with certain node id
597 ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
601 def fail_discovery(self):
603 msg = "Discovery failed. No candidates found for node"
605 raise RuntimeError, msg
607 def fail_node_not_alive(self, hostname):
608 msg = "Node %s not alive, pick another node" % hostname
609 raise RuntimeError, msg
611 def fail_node_not_available(self, hostname):
612 msg = "Node %s not available for provisioning, pick another \
614 raise RuntimeError, msg
616 def fail_not_enough_nodes(self):
617 msg = "Not enough nodes available for provisioning"
618 raise RuntimeError, msg
620 def valid_connection(self, guid):