Fixing concurrency problems in PlanetlabNode
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
23         reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27
28 from random import randint
29 import time
30 import threading
31
32 @clsinit_copy
33 class PlanetlabNode(LinuxNode):
34     _rtype = "PlanetlabNode"
35     _help = "Controls a PlanetLab host accessible using a SSH key " \
36             "associated to a PlanetLab user account"
37     _backend = "planetlab"
38
39     lock = threading.Lock()
40
41     @classmethod
42     def _register_attributes(cls):
43         ip = Attribute("ip", "PlanetLab host public IP address",
44                 flags = Flags.ReadOnly)
45
46         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
47                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
48                     default = "www.planet-lab.eu",
49                     flags = Flags.Credential)
50
51         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
52                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
53                     default = "https://%(hostname)s:443/PLCAPI/",
54                     flags = Flags.ExecReadOnly)
55     
56         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
57                     authenticate in the website) ",
58                     flags = Flags.Credential)
59
60         pl_password = Attribute("plpassword", 
61                         "PlanetLab account password, as \
62                         the one to authenticate in the website) ",
63                         flags = Flags.Credential)
64
65         city = Attribute("city", "Constrain location (city) during resource \
66                 discovery. May use wildcards.",
67                 flags = Flags.Filter)
68
69         country = Attribute("country", "Constrain location (country) during \
70                     resource discovery. May use wildcards.",
71                     flags = Flags.Filter)
72
73         region = Attribute("region", "Constrain location (region) during \
74                     resource discovery. May use wildcards.",
75                     flags = Flags.Filter)
76
77         architecture = Attribute("architecture", "Constrain architecture \
78                         during resource discovery.",
79                         type = Types.Enumerate,
80                         allowed = ["x86_64", 
81                                     "i386"],
82                         flags = Flags.Filter)
83
84         operating_system = Attribute("operatingSystem", "Constrain operating \
85                             system during resource discovery.",
86                             type = Types.Enumerate,
87                             allowed =  ["f8",
88                                         "f12",
89                                         "f14",
90                                         "centos",
91                                         "other"],
92                             flags = Flags.Filter)
93
94         #site = Attribute("site", "Constrain the PlanetLab site this node \
95         #        should reside on.",
96         #        type = Types.Enumerate,
97         #        allowed = ["PLE",
98         #                    "PLC",
99         #                    "PLJ"],
100         #        flags = Flags.Filter)
101
102         min_reliability = Attribute("minReliability", "Constrain reliability \
103                             while picking PlanetLab nodes. Specifies a lower \
104                             acceptable bound.",
105                             type = Types.Double,
106                             range = (1, 100),
107                             flags = Flags.Filter)
108
109         max_reliability = Attribute("maxReliability", "Constrain reliability \
110                             while picking PlanetLab nodes. Specifies an upper \
111                             acceptable bound.",
112                             type = Types.Double,
113                             range = (1, 100),
114                             flags = Flags.Filter)
115
116         min_bandwidth = Attribute("minBandwidth", "Constrain available \
117                             bandwidth while picking PlanetLab nodes. \
118                             Specifies a lower acceptable bound.",
119                             type = Types.Double,
120                             range = (0, 2**31),
121                             flags = Flags.Filter)
122
123         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
124                             bandwidth while picking PlanetLab nodes. \
125                             Specifies an upper acceptable bound.",
126                             type = Types.Double,
127                             range = (0, 2**31),
128                             flags = Flags.Filter)
129
130         min_load = Attribute("minLoad", "Constrain node load average while \
131                     picking PlanetLab nodes. Specifies a lower acceptable \
132                     bound.",
133                     type = Types.Double,
134                     range = (0, 2**31),
135                     flags = Flags.Filter)
136
137         max_load = Attribute("maxLoad", "Constrain node load average while \
138                     picking PlanetLab nodes. Specifies an upper acceptable \
139                     bound.",
140                     type = Types.Double,
141                     range = (0, 2**31),
142                     flags = Flags.Filter)
143
144         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
145                     picking PlanetLab nodes. Specifies a lower acceptable \
146                     bound.",
147                     type = Types.Double,
148                     range = (0, 100),
149                     flags = Flags.Filter)
150
151         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
152                     picking PlanetLab nodes. Specifies an upper acceptable \
153                     bound.",
154                     type = Types.Double,
155                     range = (0, 100),
156                     flags = Flags.Filter)
157
158         timeframe = Attribute("timeframe", "Past time period in which to check\
159                         information about the node. Values are year,month, \
160                         week, latest",
161                         default = "week",
162                         type = Types.Enumerate,
163                         allowed = ["latest",
164                                     "week",
165                                     "month",
166                                     "year"],
167                         flags = Flags.Filter)
168
169         cls._register_attribute(ip)
170         cls._register_attribute(pl_url)
171         cls._register_attribute(pl_ptn)
172         cls._register_attribute(pl_user)
173         cls._register_attribute(pl_password)
174         #cls._register_attribute(site)
175         cls._register_attribute(city)
176         cls._register_attribute(country)
177         cls._register_attribute(region)
178         cls._register_attribute(architecture)
179         cls._register_attribute(operating_system)
180         cls._register_attribute(min_reliability)
181         cls._register_attribute(max_reliability)
182         cls._register_attribute(min_bandwidth)
183         cls._register_attribute(max_bandwidth)
184         cls._register_attribute(min_load)
185         cls._register_attribute(max_load)
186         cls._register_attribute(min_cpu)
187         cls._register_attribute(max_cpu)
188         cls._register_attribute(timeframe)
189         
190
191     def __init__(self, ec, guid):
192         super(PlanetlabNode, self).__init__(ec, guid)
193
194         self._plapi = None
195         self._node_to_provision = None
196     
197     @property
198     def plapi(self):
199         if not self._plapi:
200             pl_user = self.get("pluser")
201             pl_pass = self.get("plpassword")
202             pl_url = self.get("plcApiUrl")
203             pl_ptn = self.get("plcApiPattern")
204
205             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
206                     pl_ptn)
207             
208         return self._plapi
209
210     def discover(self):
211         """
212         Based on the attributes defined by the user, discover the suitable nodes
213         """
214         hostname = self.get("hostname")
215         if hostname:
216             # the user specified one particular node to be provisioned
217             # check with PLCAPI if it is alvive
218             node_id = self._query_if_alive(hostname=hostname)
219             node_id = node_id.pop()
220
221             # check that the node is not blacklisted or being provisioned
222             # by other RM
223             with PlanetlabNode.lock:
224                 plist = self.plapi.reserved()
225                 blist = self.plapi.blacklisted()
226                 if node_id not in blist and node_id not in plist:
227                 
228                     # check that is really alive, by performing ping
229                     ping_ok = self._do_ping(node_id)
230                     if not ping_ok:
231                         self._blacklist_node(node_id)
232                         self.fail_node_not_alive(hostname)
233                     else:
234                         self._put_node_in_provision(node_id)
235                         self._node_to_provision = node_id
236                         super(PlanetlabNode, self).discover()
237                 
238                 else:
239                     self.fail_node_not_available(hostname)
240         
241         else:
242             # the user specifies constraints based on attributes, zero, one or 
243             # more nodes can match these constraints 
244             nodes = self._filter_based_on_attributes()
245             nodes_alive = self._query_if_alive(nodes)
246     
247             # nodes that are already part of user's slice have the priority to
248             # provisioned
249             nodes_inslice = self._check_if_in_slice(nodes_alive)
250             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
251             
252             node_id = None
253             if nodes_inslice:
254                 node_id = self._choose_random_node(nodes_inslice)
255                 
256             if not node_id:
257                 # Either there were no matching nodes in the user's slice, or
258                 # the nodes in the slice  were blacklisted or being provisioned
259                 # by other RM. Note nodes_not_inslice is never empty
260                 node_id = self._choose_random_node(nodes_not_inslice)
261
262             if node_id:
263                 self._node_to_provision = node_id
264                 super(PlanetlabNode, self).discover()
265             else:
266                self.fail_not_enough_nodes() 
267             
268     def provision(self):
269         """
270         Add node to user's slice after verifing that the node is functioning
271         correctly
272         """
273         provision_ok = False
274         ssh_ok = False
275         proc_ok = False
276         timeout = 1200
277
278         while not provision_ok:
279             node = self._node_to_provision
280             # Adding try catch to set hostname because sometimes MyPLC fails
281             # when trying to retrive node's hostname
282             try:
283                 self._set_hostname_attr(node)
284             except:
285                 self.discover()
286             self._add_node_to_slice(node)
287             
288             # check ssh connection
289             t = 0 
290             while t < timeout and not ssh_ok:
291
292                 cmd = 'echo \'GOOD NODE\''
293                 ((out, err), proc) = self.execute(cmd)
294                 if out.find("GOOD NODE") < 0:
295                     t = t + 60
296                     time.sleep(60)
297                     continue
298                 else:
299                     ssh_ok = True
300                     continue
301
302             if not ssh_ok:
303                 # the timeout was reach without establishing ssh connection
304                 # the node is blacklisted, deleted from the slice, and a new
305                 # node to provision is discovered
306                 with PlanetlabNode.lock:
307                     self._blacklist_node(node)
308                     self._delete_node_from_slice(node)
309                 self.discover()
310                 continue
311             
312             # check /proc directory is mounted (ssh_ok = True)
313             else:
314                 cmd = 'mount |grep proc'
315                 ((out, err), proc) = self.execute(cmd)
316                 if out.find("/proc type proc") < 0:
317                     with PlanetlabNode.lock:
318                         self._blacklist_node(node)
319                         self._delete_node_from_slice(node)
320                     self.discover()
321                     continue
322             
323                 else:
324                     provision_ok = True
325                     # set IP attribute
326                     ip = self._get_ip(node)
327                     self.set("ip", ip)
328             
329         super(PlanetlabNode, self).provision()
330
331     def _filter_based_on_attributes(self):
332         """
333         Retrive the list of nodes ids that match user's constraints 
334         """
335         # Map user's defined attributes with tagnames of PlanetLab
336         timeframe = self.get("timeframe")[0]
337         attr_to_tags = {
338             'city' : 'city',
339             'country' : 'country',
340             'region' : 'region',
341             'architecture' : 'arch',
342             'operatingSystem' : 'fcdistro',
343             #'site' : 'pldistro',
344             'minReliability' : 'reliability%s' % timeframe,
345             'maxReliability' : 'reliability%s' % timeframe,
346             'minBandwidth' : 'bw%s' % timeframe,
347             'maxBandwidth' : 'bw%s' % timeframe,
348             'minLoad' : 'load%s' % timeframe,
349             'maxLoad' : 'load%s' % timeframe,
350             'minCpu' : 'cpu%s' % timeframe,
351             'maxCpu' : 'cpu%s' % timeframe,
352         }
353         
354         nodes_id = []
355         filters = {}
356
357         for attr_name, attr_obj in self._attrs.iteritems():
358             attr_value = self.get(attr_name)
359             
360             if attr_value is not None and attr_obj.flags == 8 and \
361                 attr_name != 'timeframe':
362         
363                 attr_tag = attr_to_tags[attr_name]
364                 filters['tagname'] = attr_tag
365
366                 # filter nodes by fixed constraints e.g. operating system
367                 if not 'min' in attr_name and not 'max' in attr_name:
368                     filters['value'] = attr_value
369                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
370
371                 # filter nodes by range constraints e.g. max bandwidth
372                 elif ('min' or 'max') in attr_name:
373                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
374
375         if not filters:
376             nodes = self.plapi.get_nodes()
377             for node in nodes:
378                 nodes_id.append(node['node_id'])
379                 
380         return nodes_id
381                     
382
383     def _filter_by_fixed_attr(self, filters, nodes_id):
384         """
385         Query PLCAPI for nodes ids matching fixed attributes defined by the
386         user
387         """
388         node_tags = self.plapi.get_node_tags(filters)
389         if node_tags is not None:
390
391             if len(nodes_id) == 0:
392                 # first attribute being matched
393                 for node_tag in node_tags:
394                     nodes_id.append(node_tag['node_id'])
395             else:
396                 # remove the nodes ids that don't match the new attribute
397                 # that is being match
398
399                 nodes_id_tmp = []
400                 for node_tag in node_tags:
401                     if node_tag['node_id'] in nodes_id:
402                         nodes_id_tmp.append(node_tag['node_id'])
403
404                 if len(nodes_id_tmp):
405                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
406                 else:
407                     # no node from before match the new constraint
408                     self.fail_discovery()
409         else:
410             # no nodes match the filter applied
411             self.fail_discovery()
412
413         return nodes_id
414
415     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
416         """
417         Query PLCAPI for nodes ids matching attributes defined in a certain
418         range, by the user
419         """
420         node_tags = self.plapi.get_node_tags(filters)
421         if node_tags is not None:
422             
423             if len(nodes_id) == 0:
424                 # first attribute being matched
425                 for node_tag in node_tags:
426  
427                    # check that matches the min or max restriction
428                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
429                         float(node_tag['value']) > attr_value:
430                         nodes_id.append(node_tag['node_id'])
431
432                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
433                         float(node_tag['value']) < attr_value:
434                         nodes_id.append(node_tag['node_id'])
435             else:
436
437                 # remove the nodes ids that don't match the new attribute
438                 # that is being match
439                 nodes_id_tmp = []
440                 for node_tag in node_tags:
441
442                     # check that matches the min or max restriction and was a
443                     # matching previous filters
444                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
445                         float(node_tag['value']) > attr_value and \
446                         node_tag['node_id'] in nodes_id:
447                         nodes_id_tmp.append(node_tag['node_id'])
448
449                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
450                         float(node_tag['value']) < attr_value and \
451                         node_tag['node_id'] in nodes_id:
452                         nodes_id_tmp.append(node_tag['node_id'])
453
454                 if len(nodes_id_tmp):
455                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
456                 else:
457                     # no node from before match the new constraint
458                     self.fail_discovery()
459
460         else: #TODO CHECK
461             # no nodes match the filter applied
462             self.fail_discovery()
463
464         return nodes_id
465         
466     def _query_if_alive(self, nodes_id=None, hostname=None):
467         """
468         Query PLCAPI for nodes that register activity recently, using filters 
469         related to the state of the node, e.g. last time it was contacted
470         """
471         if nodes_id is None and hostname is None:
472             msg = "Specify nodes_id or hostname"
473             raise RuntimeError, msg
474
475         if nodes_id is not None and hostname is not None:
476             msg = "Specify either nodes_id or hostname"
477             raise RuntimeError, msg
478
479         # define PL filters to check the node is alive
480         filters = dict()
481         filters['run_level'] = 'boot'
482         filters['boot_state'] = 'boot'
483         filters['node_type'] = 'regular' 
484         filters['>last_contact'] =  int(time.time()) - 2*3600
485
486         # adding node_id or hostname to the filters to check for the particular
487         # node
488         if nodes_id:
489             filters['node_id'] = list(nodes_id)
490             alive_nodes_id = self._get_nodes_id(filters)
491         elif hostname:
492             filters['hostname'] = hostname
493             alive_nodes_id = self._get_nodes_id(filters)
494
495         if len(alive_nodes_id) == 0:
496             self.fail_discovery()
497         else:
498             nodes_id = list()
499             for node_id in alive_nodes_id:
500                 nid = node_id['node_id']
501                 nodes_id.append(nid)
502
503             return nodes_id
504
505     def _choose_random_node(self, nodes):
506         """
507         From the possible nodes for provision, choose randomly to decrese the
508         probability of different RMs choosing the same node for provision
509         """
510         size = len(nodes)
511         while size:
512             size = size - 1
513             index = randint(0, size)
514             node_id = nodes[index]
515             nodes[index] = nodes[size]
516
517             # check the node is not blacklisted or being provision by other RM
518             # and perform ping to check that is really alive
519             with PlanetlabNode.lock:
520
521                 blist = self.plapi.blacklisted()
522                 plist = self.plapi.reserved()
523                 if node_id not in blist and node_id not in plist:
524                     ping_ok = self._do_ping(node_id)
525                     if not ping_ok:
526                         self._blacklist_node(node_id)
527                     else:
528                         # discovered node for provision, added to provision list
529                         self._put_node_in_provision(node_id)
530                         return node_id
531
532     def _get_nodes_id(self, filters):
533         return self.plapi.get_nodes(filters, fields=['node_id'])
534
535     def _add_node_to_slice(self, node_id):
536         self.info(" Selecting node ... ")
537         slicename = self.get("username")
538         with PlanetlabNode.lock:
539             slice_nodes = self.plapi.get_slice_nodes(slicename)
540             slice_nodes.append(node_id)
541             self.plapi.add_slice_nodes(slicename, slice_nodes)
542
543     def _delete_node_from_slice(self, node):
544         self.warn(" Deleting node from slice ")
545         slicename = self.get("username")
546         self.plapi.delete_slice_node(slicename, [node])
547
548     def _set_hostname_attr(self, node):
549         """
550         Query PLCAPI for the hostname of a certain node id and sets the
551         attribute hostname, it will over write the previous value
552         """
553         hostname = self.plapi.get_nodes(node, ['hostname'])
554         self.set("hostname", hostname[0]['hostname'])
555
556     def _check_if_in_slice(self, nodes_id):
557         """
558         Query PLCAPI to find out if any node id from nodes_id is in the user's
559         slice
560         """
561         slicename = self.get("username")
562         slice_nodes = self.plapi.get_slice_nodes(slicename)
563         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
564         return nodes_inslice
565
566     def _do_ping(self, node_id):
567         """
568         Perform ping command on node's IP matching node id
569         """
570         ping_ok = False
571         ip = self._get_ip(node_id)
572         command = "ping -c2 %s | echo \"PING OK\"" % ip
573
574         (out, err) = lexec(command)
575         if not out.find("PING OK") < 0:
576             ping_ok = True
577
578         return ping_ok 
579
580     def _blacklist_node(self, node):
581         """
582         Add node mal functioning node to blacklist
583         """
584         self.warn(" Blacklisting malfunctioning node ")
585         self._plapi.blacklist_host(node)
586
587     def _put_node_in_provision(self, node):
588         """
589         Add node to the list of nodes being provisioned, in order for other RMs
590         to not try to provision the same one again
591         """
592         self._plapi.reserve_host(node)
593
594     def _get_ip(self, node_id):
595         """
596         Query PLCAPI for the IP of a node with certain node id
597         """
598         ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
599         ip = ip[0]['ip']
600         return ip
601
602     def fail_discovery(self):
603         self.fail()
604         msg = "Discovery failed. No candidates found for node"
605         self.error(msg)
606         raise RuntimeError, msg
607
608     def fail_node_not_alive(self, hostname):
609         msg = "Node %s not alive, pick another node" % hostname
610         raise RuntimeError, msg
611     
612     def fail_node_not_available(self, hostname):
613         msg = "Node %s not available for provisioning, pick another \
614                 node" % hostname
615         raise RuntimeError, msg
616
617     def fail_not_enough_nodes(self):
618         msg = "Not enough nodes available for provisioning"
619         raise RuntimeError, msg
620
621     def valid_connection(self, guid):
622         # TODO: Validate!
623         return True
624
625