Removing querying PLC for alive nodes, optimizing waitings
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import socket
32 import threading
33 import datetime
34
35 @clsinit_copy
36 class PlanetlabNode(LinuxNode):
37     _rtype = "PlanetlabNode"
38     _help = "Controls a PlanetLab host accessible using a SSH key " \
39             "associated to a PlanetLab user account"
40     _backend = "planetlab"
41
42     lock = threading.Lock()
43
44     @classmethod
45     def _register_attributes(cls):
46         ip = Attribute("ip", "PlanetLab host public IP address",
47                 flags = Flags.ReadOnly)
48
49         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
50                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
51                     default = "www.planet-lab.eu",
52                     flags = Flags.Credential)
53
54         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
55                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
56                     default = "https://%(hostname)s:443/PLCAPI/",
57                     flags = Flags.ExecReadOnly)
58     
59         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
60                     authenticate in the website) ",
61                     flags = Flags.Credential)
62
63         pl_password = Attribute("plpassword", 
64                         "PlanetLab account password, as \
65                         the one to authenticate in the website) ",
66                         flags = Flags.Credential)
67
68         city = Attribute("city", "Constrain location (city) during resource \
69                 discovery. May use wildcards.",
70                 flags = Flags.Filter)
71
72         country = Attribute("country", "Constrain location (country) during \
73                     resource discovery. May use wildcards.",
74                     flags = Flags.Filter)
75
76         region = Attribute("region", "Constrain location (region) during \
77                     resource discovery. May use wildcards.",
78                     flags = Flags.Filter)
79
80         architecture = Attribute("architecture", "Constrain architecture \
81                         during resource discovery.",
82                         type = Types.Enumerate,
83                         allowed = ["x86_64", 
84                                     "i386"],
85                         flags = Flags.Filter)
86
87         operating_system = Attribute("operatingSystem", "Constrain operating \
88                             system during resource discovery.",
89                             type = Types.Enumerate,
90                             allowed =  ["f8",
91                                         "f12",
92                                         "f14",
93                                         "centos",
94                                         "other"],
95                             flags = Flags.Filter)
96
97         #site = Attribute("site", "Constrain the PlanetLab site this node \
98         #        should reside on.",
99         #        type = Types.Enumerate,
100         #        allowed = ["PLE",
101         #                    "PLC",
102         #                    "PLJ"],
103         #        flags = Flags.Filter)
104
105         min_reliability = Attribute("minReliability", "Constrain reliability \
106                             while picking PlanetLab nodes. Specifies a lower \
107                             acceptable bound.",
108                             type = Types.Double,
109                             range = (1, 100),
110                             flags = Flags.Filter)
111
112         max_reliability = Attribute("maxReliability", "Constrain reliability \
113                             while picking PlanetLab nodes. Specifies an upper \
114                             acceptable bound.",
115                             type = Types.Double,
116                             range = (1, 100),
117                             flags = Flags.Filter)
118
119         min_bandwidth = Attribute("minBandwidth", "Constrain available \
120                             bandwidth while picking PlanetLab nodes. \
121                             Specifies a lower acceptable bound.",
122                             type = Types.Double,
123                             range = (0, 2**31),
124                             flags = Flags.Filter)
125
126         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
127                             bandwidth while picking PlanetLab nodes. \
128                             Specifies an upper acceptable bound.",
129                             type = Types.Double,
130                             range = (0, 2**31),
131                             flags = Flags.Filter)
132
133         min_load = Attribute("minLoad", "Constrain node load average while \
134                     picking PlanetLab nodes. Specifies a lower acceptable \
135                     bound.",
136                     type = Types.Double,
137                     range = (0, 2**31),
138                     flags = Flags.Filter)
139
140         max_load = Attribute("maxLoad", "Constrain node load average while \
141                     picking PlanetLab nodes. Specifies an upper acceptable \
142                     bound.",
143                     type = Types.Double,
144                     range = (0, 2**31),
145                     flags = Flags.Filter)
146
147         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
148                     picking PlanetLab nodes. Specifies a lower acceptable \
149                     bound.",
150                     type = Types.Double,
151                     range = (0, 100),
152                     flags = Flags.Filter)
153
154         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
155                     picking PlanetLab nodes. Specifies an upper acceptable \
156                     bound.",
157                     type = Types.Double,
158                     range = (0, 100),
159                     flags = Flags.Filter)
160
161         timeframe = Attribute("timeframe", "Past time period in which to check\
162                         information about the node. Values are year,month, \
163                         week, latest",
164                         default = "week",
165                         type = Types.Enumerate,
166                         allowed = ["latest",
167                                     "week",
168                                     "month",
169                                     "year"],
170                         flags = Flags.Filter)
171
172         cls._register_attribute(ip)
173         cls._register_attribute(pl_url)
174         cls._register_attribute(pl_ptn)
175         cls._register_attribute(pl_user)
176         cls._register_attribute(pl_password)
177         #cls._register_attribute(site)
178         cls._register_attribute(city)
179         cls._register_attribute(country)
180         cls._register_attribute(region)
181         cls._register_attribute(architecture)
182         cls._register_attribute(operating_system)
183         cls._register_attribute(min_reliability)
184         cls._register_attribute(max_reliability)
185         cls._register_attribute(min_bandwidth)
186         cls._register_attribute(max_bandwidth)
187         cls._register_attribute(min_load)
188         cls._register_attribute(max_load)
189         cls._register_attribute(min_cpu)
190         cls._register_attribute(max_cpu)
191         cls._register_attribute(timeframe)
192
193     def __init__(self, ec, guid):
194         super(PlanetlabNode, self).__init__(ec, guid)
195
196         self._plapi = None
197         self._node_to_provision = None
198         self._slicenode = False
199         self._hostname = False
200
201     def _skip_provision(self):
202         pl_user = self.get("pluser")
203         pl_pass = self.get("plpassword")
204         if not pl_user and not pl_pass:
205             return True
206         else: return False
207     
208     @property
209     def plapi(self):
210         if not self._plapi:
211             pl_user = self.get("pluser")
212             pl_pass = self.get("plpassword")
213             pl_url = self.get("plcApiUrl")
214             pl_ptn = self.get("plcApiPattern")
215
216             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
217                 pl_ptn)
218             
219             if not self._plapi:
220                 self.fail_plapi()
221
222         return self._plapi
223
224     def do_discover(self):
225         """
226         Based on the attributes defined by the user, discover the suitable 
227         nodes for provision.
228         """
229         if self._skip_provision():
230             super(PlanetlabNode, self).do_discover()
231             return
232
233         hostname = self._get_hostname()
234         if hostname:
235             # the user specified one particular node to be provisioned
236             self._hostname = True
237             node_id = self._get_nodes_id({'hostname':hostname})
238             node_id = node_id.pop()['node_id']
239
240             # check that the node is not blacklisted or being provisioned
241             # by other RM
242             with PlanetlabNode.lock:
243                 plist = self.plapi.reserved()
244                 blist = self.plapi.blacklisted()
245                 if node_id not in blist and node_id not in plist:
246                 
247                     # check that is really alive, by performing ping
248                     ping_ok = self._do_ping(node_id)
249                     if not ping_ok:
250                         self._blacklist_node(node_id)
251                         self.fail_node_not_alive(hostname)
252                     else:
253                         if self._check_if_in_slice([node_id]):
254                             self._slicenode = True
255                         self._put_node_in_provision(node_id)
256                         self._node_to_provision = node_id
257                 else:
258                     self.fail_node_not_available(hostname)
259             super(PlanetlabNode, self).do_discover()
260         
261         else:
262             # the user specifies constraints based on attributes, zero, one or 
263             # more nodes can match these constraints 
264             nodes = self._filter_based_on_attributes()
265
266             # nodes that are already part of user's slice have the priority to
267             # provisioned
268             nodes_inslice = self._check_if_in_slice(nodes)
269             nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
270             
271             node_id = None
272             if nodes_inslice:
273                 node_id = self._choose_random_node(nodes_inslice)
274                 self._slicenode = True                
275                 
276             if not node_id:
277                 # Either there were no matching nodes in the user's slice, or
278                 # the nodes in the slice  were blacklisted or being provisioned
279                 # by other RM. Note nodes_not_inslice is never empty
280                 node_id = self._choose_random_node(nodes_not_inslice)
281                 self._slicenode = False
282
283             if node_id:
284                 self._node_to_provision = node_id
285                 try:
286                     self._set_hostname_attr(node_id)
287                     self.info(" Selected node to provision ")
288                     super(PlanetlabNode, self).do_discover()
289                 except:
290                     with PlanetlabNode.lock:
291                         self._blacklist_node(node_id)
292                     self.do_discover()
293             else:
294                self.fail_not_enough_nodes() 
295             
296     def do_provision(self):
297         """
298         Add node to user's slice after verifing that the node is functioning
299         correctly
300         """
301         if self._skip_provision():
302             super(PlanetlabNode, self).do_provision()
303             return
304
305         provision_ok = False
306         ssh_ok = False
307         proc_ok = False
308         timeout = 1800
309
310         while not provision_ok:
311             node = self._node_to_provision
312             if not self._slicenode:
313                 self._add_node_to_slice(node)
314             
315                 # check ssh connection
316                 t = 0 
317                 while t < timeout and not ssh_ok:
318
319                     cmd = 'echo \'GOOD NODE\''
320                     ((out, err), proc) = self.execute(cmd)
321                     if out.find("GOOD NODE") < 0:
322                         t = t + 60
323                         time.sleep(60)
324                         continue
325                     else:
326                         ssh_ok = True
327                         continue
328             else:
329                 cmd = 'echo \'GOOD NODE\''
330                 ((out, err), proc) = self.execute(cmd)
331                 if not out.find("GOOD NODE") < 0:
332                     ssh_ok = True
333
334             if not ssh_ok:
335                 # the timeout was reach without establishing ssh connection
336                 # the node is blacklisted, deleted from the slice, and a new
337                 # node to provision is discovered
338                 with PlanetlabNode.lock:
339                     self.warn(" Could not SSH login ")
340                     self._blacklist_node(node)
341                     #self._delete_node_from_slice(node)
342                 #self.set('hostname', None)
343                 self.do_discover()
344                 continue
345             
346             # check /proc directory is mounted (ssh_ok = True)
347             else:
348                 cmd = 'mount |grep proc'
349                 ((out, err), proc) = self.execute(cmd)
350                 if out.find("/proc type proc") < 0:
351                     with PlanetlabNode.lock:
352                         self.warn(" Could not find directory /proc ")
353                         self._blacklist_node(node)
354                         #self._delete_node_from_slice(node)
355                     #self.set('hostname', None)
356                     self.do_discover()
357                     continue
358             
359                 else:
360                     provision_ok = True
361                     if not self.get('hostname'):
362                         self._set_hostname_attr(node)            
363                     # set IP attribute
364                     ip = self._get_ip(node)
365                     self.set("ip", ip)
366             
367         super(PlanetlabNode, self).do_provision()
368
369     def _filter_based_on_attributes(self):
370         """
371         Retrive the list of nodes ids that match user's constraints 
372         """
373         # Map user's defined attributes with tagnames of PlanetLab
374         timeframe = self.get("timeframe")[0]
375         attr_to_tags = {
376             'city' : 'city',
377             'country' : 'country',
378             'region' : 'region',
379             'architecture' : 'arch',
380             'operatingSystem' : 'fcdistro',
381             #'site' : 'pldistro',
382             'minReliability' : 'reliability%s' % timeframe,
383             'maxReliability' : 'reliability%s' % timeframe,
384             'minBandwidth' : 'bw%s' % timeframe,
385             'maxBandwidth' : 'bw%s' % timeframe,
386             'minLoad' : 'load%s' % timeframe,
387             'maxLoad' : 'load%s' % timeframe,
388             'minCpu' : 'cpu%s' % timeframe,
389             'maxCpu' : 'cpu%s' % timeframe,
390         }
391         
392         nodes_id = []
393         filters = {}
394
395         for attr_name, attr_obj in self._attrs.iteritems():
396             attr_value = self.get(attr_name)
397             
398             if attr_value is not None and attr_obj.flags == 8 and \
399                 attr_name != 'timeframe':
400         
401                 attr_tag = attr_to_tags[attr_name]
402                 filters['tagname'] = attr_tag
403
404                 # filter nodes by fixed constraints e.g. operating system
405                 if not 'min' in attr_name and not 'max' in attr_name:
406                     filters['value'] = attr_value
407                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
408
409                 # filter nodes by range constraints e.g. max bandwidth
410                 elif ('min' or 'max') in attr_name:
411                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
412
413         if not filters:
414             nodes = self._get_nodes_id()
415             for node in nodes:
416                 nodes_id.append(node['node_id'])
417         return nodes_id
418                     
419     def _filter_by_fixed_attr(self, filters, nodes_id):
420         """
421         Query PLCAPI for nodes ids matching fixed attributes defined by the
422         user
423         """
424         node_tags = self.plapi.get_node_tags(filters)
425         if node_tags is not None:
426
427             if len(nodes_id) == 0:
428                 # first attribute being matched
429                 for node_tag in node_tags:
430                     nodes_id.append(node_tag['node_id'])
431             else:
432                 # remove the nodes ids that don't match the new attribute
433                 # that is being match
434
435                 nodes_id_tmp = []
436                 for node_tag in node_tags:
437                     if node_tag['node_id'] in nodes_id:
438                         nodes_id_tmp.append(node_tag['node_id'])
439
440                 if len(nodes_id_tmp):
441                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
442                 else:
443                     # no node from before match the new constraint
444                     self.fail_discovery()
445         else:
446             # no nodes match the filter applied
447             self.fail_discovery()
448
449         return nodes_id
450
451     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
452         """
453         Query PLCAPI for nodes ids matching attributes defined in a certain
454         range, by the user
455         """
456         node_tags = self.plapi.get_node_tags(filters)
457         if node_tags:
458             
459             if len(nodes_id) == 0:
460                 # first attribute being matched
461                 for node_tag in node_tags:
462  
463                    # check that matches the min or max restriction
464                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
465                         float(node_tag['value']) > attr_value:
466                         nodes_id.append(node_tag['node_id'])
467
468                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
469                         float(node_tag['value']) < attr_value:
470                         nodes_id.append(node_tag['node_id'])
471             else:
472
473                 # remove the nodes ids that don't match the new attribute
474                 # that is being match
475                 nodes_id_tmp = []
476                 for node_tag in node_tags:
477
478                     # check that matches the min or max restriction and was a
479                     # matching previous filters
480                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
481                         float(node_tag['value']) > attr_value and \
482                         node_tag['node_id'] in nodes_id:
483                         nodes_id_tmp.append(node_tag['node_id'])
484
485                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
486                         float(node_tag['value']) < attr_value and \
487                         node_tag['node_id'] in nodes_id:
488                         nodes_id_tmp.append(node_tag['node_id'])
489
490                 if len(nodes_id_tmp):
491                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
492                 else:
493                     # no node from before match the new constraint
494                     self.fail_discovery()
495
496         else: #TODO CHECK
497             # no nodes match the filter applied
498             self.fail_discovery()
499
500         return nodes_id
501         
502     def _choose_random_node(self, nodes):
503         """
504         From the possible nodes for provision, choose randomly to decrese the
505         probability of different RMs choosing the same node for provision
506         """
507         size = len(nodes)
508         while size:
509             size = size - 1
510             index = randint(0, size)
511             node_id = nodes[index]
512             nodes[index] = nodes[size]
513
514             # check the node is not blacklisted or being provision by other RM
515             # and perform ping to check that is really alive
516             with PlanetlabNode.lock:
517
518                 blist = self.plapi.blacklisted()
519                 plist = self.plapi.reserved()
520                 if node_id not in blist and node_id not in plist:
521                     ping_ok = self._do_ping(node_id)
522                     if not ping_ok:
523                         self._set_hostname_attr(node_id)
524                         self.warn(" Node not responding PING ")
525                         self._blacklist_node(node_id)
526                         #self.set('hostname', None)
527                     else:
528                         # discovered node for provision, added to provision list
529                         self._put_node_in_provision(node_id)
530                         return node_id
531
532     def _get_nodes_id(self, filters=None):
533         return self.plapi.get_nodes(filters, fields=['node_id'])
534
535     def _add_node_to_slice(self, node_id):
536         self.info(" Adding node to slice ")
537         slicename = self.get("username")
538         with PlanetlabNode.lock:
539             slice_nodes = self.plapi.get_slice_nodes(slicename)
540             slice_nodes.append(node_id)
541             self.plapi.add_slice_nodes(slicename, slice_nodes)
542
543     def _delete_node_from_slice(self, node):
544         self.warn(" Deleting node from slice ")
545         slicename = self.get("username")
546         self.plapi.delete_slice_node(slicename, [node])
547
548     def _get_hostname(self):
549         hostname = self.get("hostname")
550         ip = self.get("ip")
551         if hostname:
552             return hostname
553         elif ip:
554             hostname = socket.gethostbyaddr(ip)[0]
555             self.set('hostname', hostname)
556             return hostname
557         else:
558             return None
559
560     def _set_hostname_attr(self, node):
561         """
562         Query PLCAPI for the hostname of a certain node id and sets the
563         attribute hostname, it will over write the previous value
564         """
565         hostname = self.plapi.get_nodes(node, ['hostname'])
566         self.set("hostname", hostname[0]['hostname'])
567
568     def _check_if_in_slice(self, nodes_id):
569         """
570         Query PLCAPI to find out if any node id from nodes_id is in the user's
571         slice
572         """
573         slicename = self.get("username")
574         slice_nodes = self.plapi.get_slice_nodes(slicename)
575         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
576         return nodes_inslice
577
578     def _do_ping(self, node_id):
579         """
580         Perform ping command on node's IP matching node id
581         """
582         ping_ok = False
583         ip = self._get_ip(node_id)
584         if not ip: return ping_ok
585
586         command = "ping -c4 %s" % ip
587
588         (out, err) = lexec(command)
589         if not out.find("2 received") or not out.find("3 received") or not \
590             out.find("4 received") < 0:
591             ping_ok = True
592         
593         return ping_ok 
594
595     def _blacklist_node(self, node):
596         """
597         Add node mal functioning node to blacklist
598         """
599         self.warn(" Blacklisting malfunctioning node ")
600         self.plapi.blacklist_host(node)
601         if not self._hostname:
602             self.set('hostname', None)
603
604     def _put_node_in_provision(self, node):
605         """
606         Add node to the list of nodes being provisioned, in order for other RMs
607         to not try to provision the same one again
608         """
609         self.plapi.reserve_host(node)
610
611     def _get_ip(self, node_id):
612         """
613         Query PLCAPI for the IP of a node with certain node id
614         """
615         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
616         try:
617             ip = sshfuncs.gethostbyname(hostname['hostname'])
618         except:
619             # Fail while trying to find the IP
620             return None
621         return ip
622
623     def fail_discovery(self):
624         msg = "Discovery failed. No candidates found for node"
625         self.error(msg)
626         raise RuntimeError, msg
627
628     def fail_node_not_alive(self, hostname=None):
629         msg = "Node %s not alive" % hostname
630         raise RuntimeError, msg
631     
632     def fail_node_not_available(self, hostname):
633         msg = "Node %s not available for provisioning" % hostname
634         raise RuntimeError, msg
635
636     def fail_not_enough_nodes(self):
637         msg = "Not enough nodes available for provisioning"
638         raise RuntimeError, msg
639
640     def fail_plapi(self):
641         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
642             " attributes pluser and plpassword."
643         raise RuntimeError, msg
644
645     def valid_connection(self, guid):
646         # TODO: Validate!
647         return True
648
649