Fixing RM.DEPLOY being executed after/during RM.RELEASE by adding a release_lock...
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import threading
32
33 @clsinit_copy
34 class PlanetlabNode(LinuxNode):
35     _rtype = "PlanetlabNode"
36     _help = "Controls a PlanetLab host accessible using a SSH key " \
37             "associated to a PlanetLab user account"
38     _backend = "planetlab"
39
40     ## XXX A.Q. This lock could use a more descriptive name and 
41     #           an explanatory comment
42     lock = threading.Lock()
43
44     @classmethod
45     def _register_attributes(cls):
46         ip = Attribute("ip", "PlanetLab host public IP address",
47                 flags = Flags.ReadOnly)
48
49         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
50                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
51                     default = "www.planet-lab.eu",
52                     flags = Flags.Credential)
53
54         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
55                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
56                     default = "https://%(hostname)s:443/PLCAPI/",
57                     flags = Flags.ExecReadOnly)
58     
59         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
60                     authenticate in the website) ",
61                     flags = Flags.Credential)
62
63         pl_password = Attribute("plpassword", 
64                         "PlanetLab account password, as \
65                         the one to authenticate in the website) ",
66                         flags = Flags.Credential)
67
68         city = Attribute("city", "Constrain location (city) during resource \
69                 discovery. May use wildcards.",
70                 flags = Flags.Filter)
71
72         country = Attribute("country", "Constrain location (country) during \
73                     resource discovery. May use wildcards.",
74                     flags = Flags.Filter)
75
76         region = Attribute("region", "Constrain location (region) during \
77                     resource discovery. May use wildcards.",
78                     flags = Flags.Filter)
79
80         architecture = Attribute("architecture", "Constrain architecture \
81                         during resource discovery.",
82                         type = Types.Enumerate,
83                         allowed = ["x86_64", 
84                                     "i386"],
85                         flags = Flags.Filter)
86
87         operating_system = Attribute("operatingSystem", "Constrain operating \
88                             system during resource discovery.",
89                             type = Types.Enumerate,
90                             allowed =  ["f8",
91                                         "f12",
92                                         "f14",
93                                         "centos",
94                                         "other"],
95                             flags = Flags.Filter)
96
97         #site = Attribute("site", "Constrain the PlanetLab site this node \
98         #        should reside on.",
99         #        type = Types.Enumerate,
100         #        allowed = ["PLE",
101         #                    "PLC",
102         #                    "PLJ"],
103         #        flags = Flags.Filter)
104
105         min_reliability = Attribute("minReliability", "Constrain reliability \
106                             while picking PlanetLab nodes. Specifies a lower \
107                             acceptable bound.",
108                             type = Types.Double,
109                             range = (1, 100),
110                             flags = Flags.Filter)
111
112         max_reliability = Attribute("maxReliability", "Constrain reliability \
113                             while picking PlanetLab nodes. Specifies an upper \
114                             acceptable bound.",
115                             type = Types.Double,
116                             range = (1, 100),
117                             flags = Flags.Filter)
118
119         min_bandwidth = Attribute("minBandwidth", "Constrain available \
120                             bandwidth while picking PlanetLab nodes. \
121                             Specifies a lower acceptable bound.",
122                             type = Types.Double,
123                             range = (0, 2**31),
124                             flags = Flags.Filter)
125
126         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
127                             bandwidth while picking PlanetLab nodes. \
128                             Specifies an upper acceptable bound.",
129                             type = Types.Double,
130                             range = (0, 2**31),
131                             flags = Flags.Filter)
132
133         min_load = Attribute("minLoad", "Constrain node load average while \
134                     picking PlanetLab nodes. Specifies a lower acceptable \
135                     bound.",
136                     type = Types.Double,
137                     range = (0, 2**31),
138                     flags = Flags.Filter)
139
140         max_load = Attribute("maxLoad", "Constrain node load average while \
141                     picking PlanetLab nodes. Specifies an upper acceptable \
142                     bound.",
143                     type = Types.Double,
144                     range = (0, 2**31),
145                     flags = Flags.Filter)
146
147         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
148                     picking PlanetLab nodes. Specifies a lower acceptable \
149                     bound.",
150                     type = Types.Double,
151                     range = (0, 100),
152                     flags = Flags.Filter)
153
154         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
155                     picking PlanetLab nodes. Specifies an upper acceptable \
156                     bound.",
157                     type = Types.Double,
158                     range = (0, 100),
159                     flags = Flags.Filter)
160
161         timeframe = Attribute("timeframe", "Past time period in which to check\
162                         information about the node. Values are year,month, \
163                         week, latest",
164                         default = "week",
165                         type = Types.Enumerate,
166                         allowed = ["latest",
167                                     "week",
168                                     "month",
169                                     "year"],
170                         flags = Flags.Filter)
171
172         cls._register_attribute(ip)
173         cls._register_attribute(pl_url)
174         cls._register_attribute(pl_ptn)
175         cls._register_attribute(pl_user)
176         cls._register_attribute(pl_password)
177         #cls._register_attribute(site)
178         cls._register_attribute(city)
179         cls._register_attribute(country)
180         cls._register_attribute(region)
181         cls._register_attribute(architecture)
182         cls._register_attribute(operating_system)
183         cls._register_attribute(min_reliability)
184         cls._register_attribute(max_reliability)
185         cls._register_attribute(min_bandwidth)
186         cls._register_attribute(max_bandwidth)
187         cls._register_attribute(min_load)
188         cls._register_attribute(max_load)
189         cls._register_attribute(min_cpu)
190         cls._register_attribute(max_cpu)
191         cls._register_attribute(timeframe)
192
193     def __init__(self, ec, guid):
194         super(PlanetlabNode, self).__init__(ec, guid)
195
196         self._plapi = None
197         self._node_to_provision = None
198     
199     @property
200     def plapi(self):
201         if not self._plapi:
202             pl_user = self.get("pluser")
203             pl_pass = self.get("plpassword")
204             pl_url = self.get("plcApiUrl")
205             pl_ptn = self.get("plcApiPattern")
206
207             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
208                     pl_ptn)
209             
210         return self._plapi
211
212     def do_discover(self):
213         """
214         Based on the attributes defined by the user, discover the suitable nodes
215         """
216         hostname = self._get_hostname()
217         print self.guid, hostname 
218         if hostname:
219             # the user specified one particular node to be provisioned
220             # check with PLCAPI if it is alvive
221             node_id = self._query_if_alive(hostname=hostname)
222             node_id = node_id.pop()
223             print self.guid, node_id
224
225             # check that the node is not blacklisted or being provisioned
226             # by other RM
227             with PlanetlabNode.lock:
228                 plist = self.plapi.reserved()
229                 blist = self.plapi.blacklisted()
230                 print self.guid,plist
231                 print self.guid,blist
232                 if node_id not in blist and node_id not in plist:
233                 
234                     # check that is really alive, by performing ping
235                     ping_ok = self._do_ping(node_id)
236                     if not ping_ok:
237                         self._blacklist_node(node_id)
238                         self.fail_node_not_alive(hostname)
239                     else:
240                         self._put_node_in_provision(node_id)
241                         self._node_to_provision = node_id
242                         super(PlanetlabNode, self).do_discover()
243                 
244                 else:
245                     self.fail_node_not_available(hostname)
246         
247         else:
248             # the user specifies constraints based on attributes, zero, one or 
249             # more nodes can match these constraints 
250             nodes = self._filter_based_on_attributes()
251             nodes_alive = self._query_if_alive(nodes)
252     
253             # nodes that are already part of user's slice have the priority to
254             # provisioned
255             nodes_inslice = self._check_if_in_slice(nodes_alive)
256             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
257             
258             node_id = None
259             if nodes_inslice:
260                 node_id = self._choose_random_node(nodes_inslice)
261                 
262             if not node_id:
263                 # Either there were no matching nodes in the user's slice, or
264                 # the nodes in the slice  were blacklisted or being provisioned
265                 # by other RM. Note nodes_not_inslice is never empty
266                 node_id = self._choose_random_node(nodes_not_inslice)
267
268             if node_id:
269                 self._node_to_provision = node_id
270                 super(PlanetlabNode, self).do_discover()
271             else:
272                self.fail_not_enough_nodes() 
273             
274     def do_provision(self):
275         """
276         Add node to user's slice after verifing that the node is functioning
277         correctly
278         """
279         provision_ok = False
280         ssh_ok = False
281         proc_ok = False
282         timeout = 120
283
284         while not provision_ok:
285             node = self._node_to_provision
286             # Adding try catch to set hostname because sometimes MyPLC fails
287             # when trying to retrive node's hostname
288             try:
289                 self._set_hostname_attr(node)
290             except:
291                 with PlanetlabNode.lock:
292                     self._blacklist_node(node)
293                 self.do_discover()
294                 continue
295
296             self._add_node_to_slice(node)
297             
298             # check ssh connection
299             t = 0 
300             while t < timeout and not ssh_ok:
301
302                 cmd = 'echo \'GOOD NODE\''
303                 ((out, err), proc) = self.execute(cmd)
304                 if out.find("GOOD NODE") < 0:
305                     t = t + 60
306                     time.sleep(60)
307                     continue
308                 else:
309                     ssh_ok = True
310                     continue
311
312             if not ssh_ok:
313                 # the timeout was reach without establishing ssh connection
314                 # the node is blacklisted, deleted from the slice, and a new
315                 # node to provision is discovered
316                 with PlanetlabNode.lock:
317                     self._blacklist_node(node)
318                     self._delete_node_from_slice(node)
319                 self.set('hostname', None)
320                 self.do_discover()
321                 continue
322             
323             # check /proc directory is mounted (ssh_ok = True)
324             else:
325                 cmd = 'mount |grep proc'
326                 ((out, err), proc) = self.execute(cmd)
327                 if out.find("/proc type proc") < 0:
328                     with PlanetlabNode.lock:
329                         self._blacklist_node(node)
330                         self._delete_node_from_slice(node)
331                     self.set('hostname', None)
332                     self.do_discover()
333                     continue
334             
335                 else:
336                     provision_ok = True
337                     # set IP attribute
338                     ip = self._get_ip(node)
339                     self.set("ip", ip)
340             
341         super(PlanetlabNode, self).do_provision()
342
343     def _filter_based_on_attributes(self):
344         """
345         Retrive the list of nodes ids that match user's constraints 
346         """
347
348         # Map user's defined attributes with tagnames of PlanetLab
349         timeframe = self.get("timeframe")[0]
350         attr_to_tags = {
351             'city' : 'city',
352             'country' : 'country',
353             'region' : 'region',
354             'architecture' : 'arch',
355             'operatingSystem' : 'fcdistro',
356             #'site' : 'pldistro',
357             'minReliability' : 'reliability%s' % timeframe,
358             'maxReliability' : 'reliability%s' % timeframe,
359             'minBandwidth' : 'bw%s' % timeframe,
360             'maxBandwidth' : 'bw%s' % timeframe,
361             'minLoad' : 'load%s' % timeframe,
362             'maxLoad' : 'load%s' % timeframe,
363             'minCpu' : 'cpu%s' % timeframe,
364             'maxCpu' : 'cpu%s' % timeframe,
365         }
366         
367         nodes_id = []
368         filters = {}
369
370         for attr_name, attr_obj in self._attrs.iteritems():
371             attr_value = self.get(attr_name)
372             
373             if attr_value is not None and attr_obj.flags == 8 and \
374                 attr_name != 'timeframe':
375         
376                 attr_tag = attr_to_tags[attr_name]
377                 filters['tagname'] = attr_tag
378
379                 # filter nodes by fixed constraints e.g. operating system
380                 if not 'min' in attr_name and not 'max' in attr_name:
381                     filters['value'] = attr_value
382                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
383
384                 # filter nodes by range constraints e.g. max bandwidth
385                 elif ('min' or 'max') in attr_name:
386                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
387
388         if not filters:
389             nodes = self.plapi.get_nodes()
390             for node in nodes:
391                 nodes_id.append(node['node_id'])
392                 
393         return nodes_id
394                     
395
396     def _filter_by_fixed_attr(self, filters, nodes_id):
397         """
398         Query PLCAPI for nodes ids matching fixed attributes defined by the
399         user
400         """
401         node_tags = self.plapi.get_node_tags(filters)
402         if node_tags is not None:
403
404             if len(nodes_id) == 0:
405                 # first attribute being matched
406                 for node_tag in node_tags:
407                     nodes_id.append(node_tag['node_id'])
408             else:
409                 # remove the nodes ids that don't match the new attribute
410                 # that is being match
411
412                 nodes_id_tmp = []
413                 for node_tag in node_tags:
414                     if node_tag['node_id'] in nodes_id:
415                         nodes_id_tmp.append(node_tag['node_id'])
416
417                 if len(nodes_id_tmp):
418                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
419                 else:
420                     # no node from before match the new constraint
421                     self.fail_discovery()
422         else:
423             # no nodes match the filter applied
424             self.fail_discovery()
425
426         return nodes_id
427
428     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
429         """
430         Query PLCAPI for nodes ids matching attributes defined in a certain
431         range, by the user
432         """
433         node_tags = self.plapi.get_node_tags(filters)
434         if node_tags is not None:
435             
436             if len(nodes_id) == 0:
437                 # first attribute being matched
438                 for node_tag in node_tags:
439  
440                    # check that matches the min or max restriction
441                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
442                         float(node_tag['value']) > attr_value:
443                         nodes_id.append(node_tag['node_id'])
444
445                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
446                         float(node_tag['value']) < attr_value:
447                         nodes_id.append(node_tag['node_id'])
448             else:
449
450                 # remove the nodes ids that don't match the new attribute
451                 # that is being match
452                 nodes_id_tmp = []
453                 for node_tag in node_tags:
454
455                     # check that matches the min or max restriction and was a
456                     # matching previous filters
457                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
458                         float(node_tag['value']) > attr_value and \
459                         node_tag['node_id'] in nodes_id:
460                         nodes_id_tmp.append(node_tag['node_id'])
461
462                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
463                         float(node_tag['value']) < attr_value and \
464                         node_tag['node_id'] in nodes_id:
465                         nodes_id_tmp.append(node_tag['node_id'])
466
467                 if len(nodes_id_tmp):
468                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
469                 else:
470                     # no node from before match the new constraint
471                     self.fail_discovery()
472
473         else: #TODO CHECK
474             # no nodes match the filter applied
475             self.fail_discovery()
476
477         return nodes_id
478         
479     def _query_if_alive(self, nodes_id=None, hostname=None):
480         """
481         Query PLCAPI for nodes that register activity recently, using filters 
482         related to the state of the node, e.g. last time it was contacted
483         """
484         if nodes_id is None and hostname is None:
485             msg = "Specify nodes_id or hostname"
486             raise RuntimeError, msg
487
488         if nodes_id is not None and hostname is not None:
489             msg = "Specify either nodes_id or hostname"
490             raise RuntimeError, msg
491
492         # define PL filters to check the node is alive
493         filters = dict()
494         filters['run_level'] = 'boot'
495         filters['boot_state'] = 'boot'
496         filters['node_type'] = 'regular' 
497         #filters['>last_contact'] =  int(time.time()) - 2*3600
498
499         # adding node_id or hostname to the filters to check for the particular
500         # node
501         if nodes_id:
502             filters['node_id'] = list(nodes_id)
503             alive_nodes_id = self._get_nodes_id(filters)
504         elif hostname:
505             filters['hostname'] = hostname
506             alive_nodes_id = self._get_nodes_id(filters)
507
508         if len(alive_nodes_id) == 0:
509             self.fail_node_not_alive(self, hostname)
510         else:
511             nodes_id = list()
512             for node_id in alive_nodes_id:
513                 nid = node_id['node_id']
514                 nodes_id.append(nid)
515
516             return nodes_id
517
518     def _choose_random_node(self, nodes):
519         """ 
520         From the possible nodes for provision, choose randomly to decrese the
521         probability of different RMs choosing the same node for provision
522         """
523         size = len(nodes)
524         while size:
525             size = size - 1
526             index = randint(0, size)
527             node_id = nodes[index]
528             nodes[index] = nodes[size]
529
530             # check the node is not blacklisted or being provision by other RM
531             # and perform ping to check that is really alive
532             with PlanetlabNode.lock:
533
534                 blist = self.plapi.blacklisted()
535                 plist = self.plapi.reserved()
536                 if node_id not in blist and node_id not in plist:
537                     ping_ok = self._do_ping(node_id)
538                     print " ### ping_ok #### %s guid %s" % (ping_ok, self.guid)
539                     if not ping_ok:
540                         self._blacklist_node(node_id)
541                     else:
542                         # discovered node for provision, added to provision list
543                         self._put_node_in_provision(node_id)
544                         print "node_id %s , guid %s" % (node_id, self.guid)
545                         return node_id
546
547     def _get_nodes_id(self, filters):
548         return self.plapi.get_nodes(filters, fields=['node_id'])
549
550     def _add_node_to_slice(self, node_id):
551         self.info(" Selected node to provision ")
552         slicename = self.get("username")
553         with PlanetlabNode.lock:
554             slice_nodes = self.plapi.get_slice_nodes(slicename)
555             slice_nodes.append(node_id)
556             self.plapi.add_slice_nodes(slicename, slice_nodes)
557
558     def _delete_node_from_slice(self, node):
559         self.warn(" Deleting node from slice ")
560         slicename = self.get("username")
561         self.plapi.delete_slice_node(slicename, [node])
562
563     def _get_hostname(self):
564         hostname = self.get("hostname")
565         ip = self.get("ip")
566         if hostname:
567             return hostname
568         elif ip:
569             hostname = sshfuncs.gethostbyname(ip)
570             return hostname
571         else:
572             return None
573
574     def _set_hostname_attr(self, node):
575         """
576         Query PLCAPI for the hostname of a certain node id and sets the
577         attribute hostname, it will over write the previous value
578         """
579         hostname = self.plapi.get_nodes(node, ['hostname'])
580         self.set("hostname", hostname[0]['hostname'])
581
582     def _check_if_in_slice(self, nodes_id):
583         """
584         Query PLCAPI to find out if any node id from nodes_id is in the user's
585         slice
586         """
587         slicename = self.get("username")
588         slice_nodes = self.plapi.get_slice_nodes(slicename)
589         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
590         return nodes_inslice
591
592     def _do_ping(self, node_id):
593         """
594         Perform ping command on node's IP matching node id
595         """
596         ping_ok = False
597         ip = self._get_ip(node_id)
598         print "ip de do_ping %s, guid %s" % (ip, self.guid)
599         if not ip: return ping_ok
600
601         command = "ping -c2 %s" % ip
602
603         (out, err) = lexec(command)
604         print "out de do_ping %s, guid %s" % (out, self.guid)
605         if not out.find("2 received") < 0:
606             ping_ok = True
607         
608         print "ping_ok de do_ping %s, guid %s" % (ping_ok, self.guid)
609         return ping_ok 
610
611     def _blacklist_node(self, node):
612         """
613         Add node mal functioning node to blacklist
614         """
615         self.warn(" Blacklisting malfunctioning node ")
616         self._plapi.blacklist_host(node)
617
618     def _put_node_in_provision(self, node):
619         """
620         Add node to the list of nodes being provisioned, in order for other RMs
621         to not try to provision the same one again
622         """
623         self._plapi.reserve_host(node)
624
625     def _get_ip(self, node_id):
626         """
627         Query PLCAPI for the IP of a node with certain node id
628         """
629         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
630         print "#### HOSTNAME ##### %s ### guid %s " % (hostname['hostname'], self.guid)
631         ip = sshfuncs.gethostbyname(hostname['hostname'])
632         if not ip:
633             # Fail while trying to find the IP
634             return None
635         return ip
636
637     def fail_discovery(self):
638         msg = "Discovery failed. No candidates found for node"
639         self.error(msg)
640         raise RuntimeError, msg
641
642     def fail_node_not_alive(self, hostname=None):
643         msg = "Node %s not alive" % hostname
644         raise RuntimeError, msg
645     
646     def fail_node_not_available(self, hostname):
647         msg = "Node %s not available for provisioning" % hostname
648         raise RuntimeError, msg
649
650     def fail_not_enough_nodes(self):
651         msg = "Not enough nodes available for provisioning"
652         raise RuntimeError, msg
653
654     def valid_connection(self, guid):
655         # TODO: Validate!
656         return True
657
658