Fixing Planetlab Node, ping too agresive, no need for pl credentials
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import threading
32 import datetime
33
34 @clsinit_copy
35 class PlanetlabNode(LinuxNode):
36     _rtype = "PlanetlabNode"
37     _help = "Controls a PlanetLab host accessible using a SSH key " \
38             "associated to a PlanetLab user account"
39     _backend = "planetlab"
40
41     lock = threading.Lock()
42
43     @classmethod
44     def _register_attributes(cls):
45         ip = Attribute("ip", "PlanetLab host public IP address",
46                 flags = Flags.ReadOnly)
47
48         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
49                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
50                     default = "www.planet-lab.eu",
51                     flags = Flags.Credential)
52
53         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
54                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
55                     default = "https://%(hostname)s:443/PLCAPI/",
56                     flags = Flags.ExecReadOnly)
57     
58         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
59                     authenticate in the website) ",
60                     flags = Flags.Credential)
61
62         pl_password = Attribute("plpassword", 
63                         "PlanetLab account password, as \
64                         the one to authenticate in the website) ",
65                         flags = Flags.Credential)
66
67         city = Attribute("city", "Constrain location (city) during resource \
68                 discovery. May use wildcards.",
69                 flags = Flags.Filter)
70
71         country = Attribute("country", "Constrain location (country) during \
72                     resource discovery. May use wildcards.",
73                     flags = Flags.Filter)
74
75         region = Attribute("region", "Constrain location (region) during \
76                     resource discovery. May use wildcards.",
77                     flags = Flags.Filter)
78
79         architecture = Attribute("architecture", "Constrain architecture \
80                         during resource discovery.",
81                         type = Types.Enumerate,
82                         allowed = ["x86_64", 
83                                     "i386"],
84                         flags = Flags.Filter)
85
86         operating_system = Attribute("operatingSystem", "Constrain operating \
87                             system during resource discovery.",
88                             type = Types.Enumerate,
89                             allowed =  ["f8",
90                                         "f12",
91                                         "f14",
92                                         "centos",
93                                         "other"],
94                             flags = Flags.Filter)
95
96         #site = Attribute("site", "Constrain the PlanetLab site this node \
97         #        should reside on.",
98         #        type = Types.Enumerate,
99         #        allowed = ["PLE",
100         #                    "PLC",
101         #                    "PLJ"],
102         #        flags = Flags.Filter)
103
104         min_reliability = Attribute("minReliability", "Constrain reliability \
105                             while picking PlanetLab nodes. Specifies a lower \
106                             acceptable bound.",
107                             type = Types.Double,
108                             range = (1, 100),
109                             flags = Flags.Filter)
110
111         max_reliability = Attribute("maxReliability", "Constrain reliability \
112                             while picking PlanetLab nodes. Specifies an upper \
113                             acceptable bound.",
114                             type = Types.Double,
115                             range = (1, 100),
116                             flags = Flags.Filter)
117
118         min_bandwidth = Attribute("minBandwidth", "Constrain available \
119                             bandwidth while picking PlanetLab nodes. \
120                             Specifies a lower acceptable bound.",
121                             type = Types.Double,
122                             range = (0, 2**31),
123                             flags = Flags.Filter)
124
125         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
126                             bandwidth while picking PlanetLab nodes. \
127                             Specifies an upper acceptable bound.",
128                             type = Types.Double,
129                             range = (0, 2**31),
130                             flags = Flags.Filter)
131
132         min_load = Attribute("minLoad", "Constrain node load average while \
133                     picking PlanetLab nodes. Specifies a lower acceptable \
134                     bound.",
135                     type = Types.Double,
136                     range = (0, 2**31),
137                     flags = Flags.Filter)
138
139         max_load = Attribute("maxLoad", "Constrain node load average while \
140                     picking PlanetLab nodes. Specifies an upper acceptable \
141                     bound.",
142                     type = Types.Double,
143                     range = (0, 2**31),
144                     flags = Flags.Filter)
145
146         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
147                     picking PlanetLab nodes. Specifies a lower acceptable \
148                     bound.",
149                     type = Types.Double,
150                     range = (0, 100),
151                     flags = Flags.Filter)
152
153         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
154                     picking PlanetLab nodes. Specifies an upper acceptable \
155                     bound.",
156                     type = Types.Double,
157                     range = (0, 100),
158                     flags = Flags.Filter)
159
160         timeframe = Attribute("timeframe", "Past time period in which to check\
161                         information about the node. Values are year,month, \
162                         week, latest",
163                         default = "week",
164                         type = Types.Enumerate,
165                         allowed = ["latest",
166                                     "week",
167                                     "month",
168                                     "year"],
169                         flags = Flags.Filter)
170
171         cls._register_attribute(ip)
172         cls._register_attribute(pl_url)
173         cls._register_attribute(pl_ptn)
174         cls._register_attribute(pl_user)
175         cls._register_attribute(pl_password)
176         #cls._register_attribute(site)
177         cls._register_attribute(city)
178         cls._register_attribute(country)
179         cls._register_attribute(region)
180         cls._register_attribute(architecture)
181         cls._register_attribute(operating_system)
182         cls._register_attribute(min_reliability)
183         cls._register_attribute(max_reliability)
184         cls._register_attribute(min_bandwidth)
185         cls._register_attribute(max_bandwidth)
186         cls._register_attribute(min_load)
187         cls._register_attribute(max_load)
188         cls._register_attribute(min_cpu)
189         cls._register_attribute(max_cpu)
190         cls._register_attribute(timeframe)
191
192     def __init__(self, ec, guid):
193         super(PlanetlabNode, self).__init__(ec, guid)
194
195         self._plapi = None
196         self._node_to_provision = None
197         self._slicenode = False
198
199     def _skip_provision(self):
200         pl_user = self.get("pluser")
201         pl_pass = self.get("plpassword")
202         if not pl_user and not pl_pass:
203             return True
204         else: return False
205     
206     @property
207     def plapi(self):
208         if not self._plapi:
209             pl_user = self.get("pluser")
210             pl_pass = self.get("plpassword")
211             pl_url = self.get("plcApiUrl")
212             pl_ptn = self.get("plcApiPattern")
213
214             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
215                 pl_ptn)
216             
217             if not self._plapi:
218                 self.fail_plapi()
219
220         return self._plapi
221
222     def do_discover(self):
223         """
224         Based on the attributes defined by the user, discover the suitable 
225         nodes for provision.
226         """
227         if self._skip_provision():
228             super(PlanetlabNode, self).do_discover()
229             return
230
231         hostname = self._get_hostname()
232         if hostname:
233             # the user specified one particular node to be provisioned
234             # check with PLCAPI if it is alvive
235             node_id = self._query_if_alive(hostname=hostname)
236             node_id = node_id.pop()
237
238             # check that the node is not blacklisted or being provisioned
239             # by other RM
240             with PlanetlabNode.lock:
241                 plist = self.plapi.reserved()
242                 blist = self.plapi.blacklisted()
243                 if node_id not in blist and node_id not in plist:
244                 
245                     # check that is really alive, by performing ping
246                     ping_ok = self._do_ping(node_id)
247                     if not ping_ok:
248                         self._blacklist_node(node_id)
249                         self.fail_node_not_alive(hostname)
250                     else:
251                         if self._check_if_in_slice([node_id]):
252                             self._slicenode = True
253                         self._put_node_in_provision(node_id)
254                         self._node_to_provision = node_id
255                         super(PlanetlabNode, self).do_discover()
256                 
257                 else:
258                     self.fail_node_not_available(hostname)
259         
260         else:
261             # the user specifies constraints based on attributes, zero, one or 
262             # more nodes can match these constraints 
263             nodes = self._filter_based_on_attributes()
264             nodes_alive = self._query_if_alive(nodes)
265
266             # nodes that are already part of user's slice have the priority to
267             # provisioned
268             nodes_inslice = self._check_if_in_slice(nodes_alive)
269             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
270             
271             node_id = None
272             if nodes_inslice:
273                 node_id = self._choose_random_node(nodes_inslice)
274                 self._slicenode = True                
275                 
276             if not node_id:
277                 # Either there were no matching nodes in the user's slice, or
278                 # the nodes in the slice  were blacklisted or being provisioned
279                 # by other RM. Note nodes_not_inslice is never empty
280                 node_id = self._choose_random_node(nodes_not_inslice)
281                 self._slicenode = False
282
283             if node_id:
284                 self._node_to_provision = node_id
285                 try:
286                     self._set_hostname_attr(node_id)
287                     self.info(" Selected node to provision ")
288                 except:
289                     with PlanetlabNode.lock:
290                         self._blacklist_node(node_id)
291                         self.do_discover()
292  
293                 super(PlanetlabNode, self).do_discover()
294             else:
295                self.fail_not_enough_nodes() 
296             
297     def do_provision(self):
298         """
299         Add node to user's slice after verifing that the node is functioning
300         correctly
301         """
302         if self._skip_provision():
303             super(PlanetlabNode, self).do_provision()
304             return
305
306         provision_ok = False
307         ssh_ok = False
308         proc_ok = False
309         timeout = 1800
310
311         while not provision_ok:
312             node = self._node_to_provision
313             if not self._slicenode:
314                 self._add_node_to_slice(node)
315             
316                 # check ssh connection
317                 t = 0 
318                 while t < timeout and not ssh_ok:
319
320                     cmd = 'echo \'GOOD NODE\''
321                     ((out, err), proc) = self.execute(cmd)
322                     if out.find("GOOD NODE") < 0:
323                         t = t + 60
324                         time.sleep(60)
325                         continue
326                     else:
327                         ssh_ok = True
328                         continue
329             else:
330                 cmd = 'echo \'GOOD NODE\''
331                 ((out, err), proc) = self.execute(cmd)
332                 if not out.find("GOOD NODE") < 0:
333                     ssh_ok = True
334
335             if not ssh_ok:
336                 # the timeout was reach without establishing ssh connection
337                 # the node is blacklisted, deleted from the slice, and a new
338                 # node to provision is discovered
339                 with PlanetlabNode.lock:
340                     self.warn(" Could not SSH login ")
341                     self._blacklist_node(node)
342                     #self._delete_node_from_slice(node)
343                 self.set('hostname', None)
344                 self.do_discover()
345                 continue
346             
347             # check /proc directory is mounted (ssh_ok = True)
348             else:
349                 cmd = 'mount |grep proc'
350                 ((out, err), proc) = self.execute(cmd)
351                 if out.find("/proc type proc") < 0:
352                     with PlanetlabNode.lock:
353                         self.warn(" Could not find directory /proc ")
354                         self._blacklist_node(node)
355                         #self._delete_node_from_slice(node)
356                     self.set('hostname', None)
357                     self.do_discover()
358                     continue
359             
360                 else:
361                     provision_ok = True
362                     # set IP attribute
363                     ip = self._get_ip(node)
364                     self.set("ip", ip)
365             
366         super(PlanetlabNode, self).do_provision()
367
368     def _filter_based_on_attributes(self):
369         """
370         Retrive the list of nodes ids that match user's constraints 
371         """
372         # Map user's defined attributes with tagnames of PlanetLab
373         timeframe = self.get("timeframe")[0]
374         attr_to_tags = {
375             'city' : 'city',
376             'country' : 'country',
377             'region' : 'region',
378             'architecture' : 'arch',
379             'operatingSystem' : 'fcdistro',
380             #'site' : 'pldistro',
381             'minReliability' : 'reliability%s' % timeframe,
382             'maxReliability' : 'reliability%s' % timeframe,
383             'minBandwidth' : 'bw%s' % timeframe,
384             'maxBandwidth' : 'bw%s' % timeframe,
385             'minLoad' : 'load%s' % timeframe,
386             'maxLoad' : 'load%s' % timeframe,
387             'minCpu' : 'cpu%s' % timeframe,
388             'maxCpu' : 'cpu%s' % timeframe,
389         }
390         
391         nodes_id = []
392         filters = {}
393
394         for attr_name, attr_obj in self._attrs.iteritems():
395             attr_value = self.get(attr_name)
396             
397             if attr_value is not None and attr_obj.flags == 8 and \
398                 attr_name != 'timeframe':
399         
400                 attr_tag = attr_to_tags[attr_name]
401                 filters['tagname'] = attr_tag
402
403                 # filter nodes by fixed constraints e.g. operating system
404                 if not 'min' in attr_name and not 'max' in attr_name:
405                     filters['value'] = attr_value
406                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
407
408                 # filter nodes by range constraints e.g. max bandwidth
409                 elif ('min' or 'max') in attr_name:
410                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
411
412         if not filters:
413             nodes = self.plapi.get_nodes()
414             for node in nodes:
415                 nodes_id.append(node['node_id'])
416         
417         return nodes_id
418                     
419
420     def _filter_by_fixed_attr(self, filters, nodes_id):
421         """
422         Query PLCAPI for nodes ids matching fixed attributes defined by the
423         user
424         """
425         node_tags = self.plapi.get_node_tags(filters)
426         if node_tags is not None:
427
428             if len(nodes_id) == 0:
429                 # first attribute being matched
430                 for node_tag in node_tags:
431                     nodes_id.append(node_tag['node_id'])
432             else:
433                 # remove the nodes ids that don't match the new attribute
434                 # that is being match
435
436                 nodes_id_tmp = []
437                 for node_tag in node_tags:
438                     if node_tag['node_id'] in nodes_id:
439                         nodes_id_tmp.append(node_tag['node_id'])
440
441                 if len(nodes_id_tmp):
442                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
443                 else:
444                     # no node from before match the new constraint
445                     self.fail_discovery()
446         else:
447             # no nodes match the filter applied
448             self.fail_discovery()
449
450         return nodes_id
451
452     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
453         """
454         Query PLCAPI for nodes ids matching attributes defined in a certain
455         range, by the user
456         """
457         node_tags = self.plapi.get_node_tags(filters)
458         if node_tags:
459             
460             if len(nodes_id) == 0:
461                 # first attribute being matched
462                 for node_tag in node_tags:
463  
464                    # check that matches the min or max restriction
465                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
466                         float(node_tag['value']) > attr_value:
467                         nodes_id.append(node_tag['node_id'])
468
469                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
470                         float(node_tag['value']) < attr_value:
471                         nodes_id.append(node_tag['node_id'])
472             else:
473
474                 # remove the nodes ids that don't match the new attribute
475                 # that is being match
476                 nodes_id_tmp = []
477                 for node_tag in node_tags:
478
479                     # check that matches the min or max restriction and was a
480                     # matching previous filters
481                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
482                         float(node_tag['value']) > attr_value and \
483                         node_tag['node_id'] in nodes_id:
484                         nodes_id_tmp.append(node_tag['node_id'])
485
486                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
487                         float(node_tag['value']) < attr_value and \
488                         node_tag['node_id'] in nodes_id:
489                         nodes_id_tmp.append(node_tag['node_id'])
490
491                 if len(nodes_id_tmp):
492                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
493                 else:
494                     # no node from before match the new constraint
495                     self.fail_discovery()
496
497         else: #TODO CHECK
498             # no nodes match the filter applied
499             self.fail_discovery()
500
501         return nodes_id
502         
503     def _query_if_alive(self, nodes_id=None, hostname=None):
504         """
505         Query PLCAPI for nodes that register activity recently, using filters 
506         related to the state of the node, e.g. last time it was contacted
507         """
508         if nodes_id is None and hostname is None:
509             msg = "Specify nodes_id or hostname"
510             raise RuntimeError, msg
511
512         if nodes_id is not None and hostname is not None:
513             msg = "Specify either nodes_id or hostname"
514             raise RuntimeError, msg
515
516         # define PL filters to check the node is alive
517         filters = dict()
518         filters['run_level'] = 'boot'
519         filters['boot_state'] = 'boot'
520         filters['node_type'] = 'regular' 
521         #filters['>last_contact'] =  int(time.time()) - 2*3600
522
523         # adding node_id or hostname to the filters to check for the particular
524         # node
525         if nodes_id:
526             filters['node_id'] = list(nodes_id)
527             alive_nodes_id = self._get_nodes_id(filters)
528         elif hostname:
529             filters['hostname'] = hostname
530             alive_nodes_id = self._get_nodes_id(filters)
531
532         if len(alive_nodes_id) == 0:
533             self.fail_node_not_alive(hostname)
534         else:
535             nodes_id = list()
536             for node_id in alive_nodes_id:
537                 nid = node_id['node_id']
538                 nodes_id.append(nid)
539
540             return nodes_id
541
542     def _choose_random_node(self, nodes):
543         """
544         From the possible nodes for provision, choose randomly to decrese the
545         probability of different RMs choosing the same node for provision
546         """
547         size = len(nodes)
548         while size:
549             size = size - 1
550             index = randint(0, size)
551             node_id = nodes[index]
552             nodes[index] = nodes[size]
553
554             # check the node is not blacklisted or being provision by other RM
555             # and perform ping to check that is really alive
556             with PlanetlabNode.lock:
557
558                 blist = self.plapi.blacklisted()
559                 plist = self.plapi.reserved()
560                 if node_id not in blist and node_id not in plist:
561                     ping_ok = self._do_ping(node_id)
562                     if not ping_ok:
563                         self._set_hostname_attr(node_id)
564                         self.warn(" Node not responding PING ")
565                         self._blacklist_node(node_id)
566                         self.set('hostname', None)
567                     else:
568                         # discovered node for provision, added to provision list
569                         self._put_node_in_provision(node_id)
570                         return node_id
571
572     def _get_nodes_id(self, filters):
573         return self.plapi.get_nodes(filters, fields=['node_id'])
574
575     def _add_node_to_slice(self, node_id):
576         self.info(" Adding node to slice ")
577         slicename = self.get("username")
578         with PlanetlabNode.lock:
579             slice_nodes = self.plapi.get_slice_nodes(slicename)
580             slice_nodes.append(node_id)
581             self.plapi.add_slice_nodes(slicename, slice_nodes)
582
583     def _delete_node_from_slice(self, node):
584         self.warn(" Deleting node from slice ")
585         slicename = self.get("username")
586         self.plapi.delete_slice_node(slicename, [node])
587
588     def _get_hostname(self):
589         hostname = self.get("hostname")
590         ip = self.get("ip")
591         if hostname:
592             return hostname
593         elif ip:
594             hostname = sshfuncs.gethostbyname(ip)
595             return hostname
596         else:
597             return None
598
599     def _set_hostname_attr(self, node):
600         """
601         Query PLCAPI for the hostname of a certain node id and sets the
602         attribute hostname, it will over write the previous value
603         """
604         hostname = self.plapi.get_nodes(node, ['hostname'])
605         self.set("hostname", hostname[0]['hostname'])
606
607     def _check_if_in_slice(self, nodes_id):
608         """
609         Query PLCAPI to find out if any node id from nodes_id is in the user's
610         slice
611         """
612         slicename = self.get("username")
613         slice_nodes = self.plapi.get_slice_nodes(slicename)
614         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
615         return nodes_inslice
616
617     def _do_ping(self, node_id):
618         """
619         Perform ping command on node's IP matching node id
620         """
621         ping_ok = False
622         ip = self._get_ip(node_id)
623         if not ip: return ping_ok
624
625         command = "ping -c4 %s" % ip
626
627         (out, err) = lexec(command)
628         if not out.find("2 received") or not out.find("3 received") or not \
629             out.find("4 received") < 0:
630             ping_ok = True
631         
632         return ping_ok 
633
634     def _blacklist_node(self, node):
635         """
636         Add node mal functioning node to blacklist
637         """
638         self.warn(" Blacklisting malfunctioning node ")
639         self._plapi.blacklist_host(node)
640
641     def _put_node_in_provision(self, node):
642         """
643         Add node to the list of nodes being provisioned, in order for other RMs
644         to not try to provision the same one again
645         """
646         self._plapi.reserve_host(node)
647
648     def _get_ip(self, node_id):
649         """
650         Query PLCAPI for the IP of a node with certain node id
651         """
652         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
653         try:
654             ip = sshfuncs.gethostbyname(hostname['hostname'])
655         except:
656             # Fail while trying to find the IP
657             return None
658         return ip
659
660     def fail_discovery(self):
661         msg = "Discovery failed. No candidates found for node"
662         self.error(msg)
663         raise RuntimeError, msg
664
665     def fail_node_not_alive(self, hostname=None):
666         msg = "Node %s not alive" % hostname
667         raise RuntimeError, msg
668     
669     def fail_node_not_available(self, hostname):
670         msg = "Node %s not available for provisioning" % hostname
671         raise RuntimeError, msg
672
673     def fail_not_enough_nodes(self):
674         msg = "Not enough nodes available for provisioning"
675         raise RuntimeError, msg
676
677     def fail_plapi(self):
678         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
679             " attributes pluser and plpassword."
680         raise RuntimeError, msg
681
682     def valid_connection(self, guid):
683         # TODO: Validate!
684         return True
685
686