Adding warning method to utils Logger
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import socket
32 import threading
33 import datetime
34
35 @clsinit_copy
36 class PlanetlabNode(LinuxNode):
37     _rtype = "PlanetlabNode"
38     _help = "Controls a PlanetLab host accessible using a SSH key " \
39             "associated to a PlanetLab user account"
40     _backend = "planetlab"
41
42     lock = threading.Lock()
43
44     @classmethod
45     def _register_attributes(cls):
46         ip = Attribute("ip", "PlanetLab host public IP address",
47                     flags = Flags.Design)
48
49         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
50                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
51                     default = "www.planet-lab.eu",
52                     flags = Flags.Credential)
53
54         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
55                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
56                     default = "https://%(hostname)s:443/PLCAPI/",
57                     flags = Flags.Design)
58     
59         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
60                     authenticate in the website) ",
61                     flags = Flags.Credential)
62
63         pl_password = Attribute("plpassword", 
64                         "PlanetLab account password, as \
65                         the one to authenticate in the website) ",
66                         flags = Flags.Credential)
67
68         city = Attribute("city", "Constrain location (city) during resource \
69                 discovery. May use wildcards.",
70                 flags = Flags.Filter)
71
72         country = Attribute("country", "Constrain location (country) during \
73                     resource discovery. May use wildcards.",
74                     flags = Flags.Filter)
75
76         region = Attribute("region", "Constrain location (region) during \
77                     resource discovery. May use wildcards.",
78                     flags = Flags.Filter)
79
80         architecture = Attribute("architecture", "Constrain architecture \
81                         during resource discovery.",
82                         type = Types.Enumerate,
83                         allowed = ["x86_64", 
84                                     "i386"],
85                         flags = Flags.Filter)
86
87         operating_system = Attribute("operatingSystem", "Constrain operating \
88                             system during resource discovery.",
89                             type = Types.Enumerate,
90                             allowed =  ["f8",
91                                         "f12",
92                                         "f14",
93                                         "centos",
94                                         "other"],
95                             flags = Flags.Filter)
96
97         #site = Attribute("site", "Constrain the PlanetLab site this node \
98         #        should reside on.",
99         #        type = Types.Enumerate,
100         #        allowed = ["PLE",
101         #                    "PLC",
102         #                    "PLJ"],
103         #        flags = Flags.Filter)
104
105         min_reliability = Attribute("minReliability", "Constrain reliability \
106                             while picking PlanetLab nodes. Specifies a lower \
107                             acceptable bound.",
108                             type = Types.Double,
109                             range = (1, 100),
110                             flags = Flags.Filter)
111
112         max_reliability = Attribute("maxReliability", "Constrain reliability \
113                             while picking PlanetLab nodes. Specifies an upper \
114                             acceptable bound.",
115                             type = Types.Double,
116                             range = (1, 100),
117                             flags = Flags.Filter)
118
119         min_bandwidth = Attribute("minBandwidth", "Constrain available \
120                             bandwidth while picking PlanetLab nodes. \
121                             Specifies a lower acceptable bound.",
122                             type = Types.Double,
123                             range = (0, 2**31),
124                             flags = Flags.Filter)
125
126         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
127                             bandwidth while picking PlanetLab nodes. \
128                             Specifies an upper acceptable bound.",
129                             type = Types.Double,
130                             range = (0, 2**31),
131                             flags = Flags.Filter)
132
133         min_load = Attribute("minLoad", "Constrain node load average while \
134                     picking PlanetLab nodes. Specifies a lower acceptable \
135                     bound.",
136                     type = Types.Double,
137                     range = (0, 2**31),
138                     flags = Flags.Filter)
139
140         max_load = Attribute("maxLoad", "Constrain node load average while \
141                     picking PlanetLab nodes. Specifies an upper acceptable \
142                     bound.",
143                     type = Types.Double,
144                     range = (0, 2**31),
145                     flags = Flags.Filter)
146
147         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
148                     picking PlanetLab nodes. Specifies a lower acceptable \
149                     bound.",
150                     type = Types.Double,
151                     range = (0, 100),
152                     flags = Flags.Filter)
153
154         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
155                     picking PlanetLab nodes. Specifies an upper acceptable \
156                     bound.",
157                     type = Types.Double,
158                     range = (0, 100),
159                     flags = Flags.Filter)
160
161         timeframe = Attribute("timeframe", "Past time period in which to check\
162                         information about the node. Values are year,month, \
163                         week, latest",
164                         default = "week",
165                         type = Types.Enumerate,
166                         allowed = ["latest",
167                                     "week",
168                                     "month",
169                                     "year"],
170                         flags = Flags.Filter)
171
172 #        plblacklist = Attribute("blacklist", "Take into account the file plblacklist \
173 #                        in the user's home directory under .nepi directory. This file \
174 #                        contains a list of PL nodes to blacklist, and at the end \
175 #                        of the experiment execution the new blacklisted nodes are added.",
176 #                    type = Types.Bool,
177 #                    default = True,
178 #                    flags = Flags.ReadOnly)
179 #
180
181         cls._register_attribute(ip)
182         cls._register_attribute(pl_url)
183         cls._register_attribute(pl_ptn)
184         cls._register_attribute(pl_user)
185         cls._register_attribute(pl_password)
186         #cls._register_attribute(site)
187         cls._register_attribute(city)
188         cls._register_attribute(country)
189         cls._register_attribute(region)
190         cls._register_attribute(architecture)
191         cls._register_attribute(operating_system)
192         cls._register_attribute(min_reliability)
193         cls._register_attribute(max_reliability)
194         cls._register_attribute(min_bandwidth)
195         cls._register_attribute(max_bandwidth)
196         cls._register_attribute(min_load)
197         cls._register_attribute(max_load)
198         cls._register_attribute(min_cpu)
199         cls._register_attribute(max_cpu)
200         cls._register_attribute(timeframe)
201
202     def __init__(self, ec, guid):
203         super(PlanetlabNode, self).__init__(ec, guid)
204
205         self._plapi = None
206         self._node_to_provision = None
207         self._slicenode = False
208         self._hostname = False
209
210         if self.get("gateway") or self.get("gatewayUser"):
211             self.set("gateway", None)
212             self.set("gatewayUser", None)
213
214     def _skip_provision(self):
215         pl_user = self.get("pluser")
216         pl_pass = self.get("plpassword")
217         if not pl_user and not pl_pass:
218             return True
219         else: return False
220     
221     @property
222     def plapi(self):
223         if not self._plapi:
224             pl_user = self.get("pluser")
225             pl_pass = self.get("plpassword")
226             pl_url = self.get("plcApiUrl")
227             pl_ptn = self.get("plcApiPattern")
228
229             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
230                 pl_ptn)
231             
232             if not self._plapi:
233                 self.fail_plapi()
234
235         return self._plapi
236
237     def do_discover(self):
238         """
239         Based on the attributes defined by the user, discover the suitable 
240         nodes for provision.
241         """
242         if self._skip_provision():
243             super(PlanetlabNode, self).do_discover()
244             return
245
246         hostname = self._get_hostname()
247         if hostname:
248             # the user specified one particular node to be provisioned
249             self._hostname = True
250             node_id = self._get_nodes_id({'hostname':hostname})
251             node_id = node_id.pop()['node_id']
252
253             # check that the node is not blacklisted or being provisioned
254             # by other RM
255             with PlanetlabNode.lock:
256                 plist = self.plapi.reserved()
257                 blist = self.plapi.blacklisted()
258                 if node_id not in blist and node_id not in plist:
259                 
260                     # check that is really alive, by performing ping
261                     ping_ok = self._do_ping(node_id)
262                     if not ping_ok:
263                         self._blacklist_node(node_id)
264                         self.fail_node_not_alive(hostname)
265                     else:
266                         if self._check_if_in_slice([node_id]):
267                             self._slicenode = True
268                         self._put_node_in_provision(node_id)
269                         self._node_to_provision = node_id
270                 else:
271                     self.fail_node_not_available(hostname)
272             super(PlanetlabNode, self).do_discover()
273         
274         else:
275             # the user specifies constraints based on attributes, zero, one or 
276             # more nodes can match these constraints 
277             nodes = self._filter_based_on_attributes()
278
279             # nodes that are already part of user's slice have the priority to
280             # provisioned
281             nodes_inslice = self._check_if_in_slice(nodes)
282             nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
283             
284             node_id = None
285             if nodes_inslice:
286                 node_id = self._choose_random_node(nodes_inslice)
287                 self._slicenode = True                
288                 
289             if not node_id:
290                 # Either there were no matching nodes in the user's slice, or
291                 # the nodes in the slice  were blacklisted or being provisioned
292                 # by other RM. Note nodes_not_inslice is never empty
293                 node_id = self._choose_random_node(nodes_not_inslice)
294                 self._slicenode = False
295
296             if node_id:
297                 self._node_to_provision = node_id
298                 try:
299                     self._set_hostname_attr(node_id)
300                     self.info(" Selected node to provision ")
301                     super(PlanetlabNode, self).do_discover()
302                 except:
303                     with PlanetlabNode.lock:
304                         self._blacklist_node(node_id)
305                     self.do_discover()
306             else:
307                self.fail_not_enough_nodes() 
308             
309     def do_provision(self):
310         """
311         Add node to user's slice after verifing that the node is functioning
312         correctly
313         """
314         if self._skip_provision():
315             super(PlanetlabNode, self).do_provision()
316             return
317
318         provision_ok = False
319         ssh_ok = False
320         proc_ok = False
321         timeout = 1800
322
323         while not provision_ok:
324             node = self._node_to_provision
325             if not self._slicenode:
326                 self._add_node_to_slice(node)
327             
328                 # check ssh connection
329                 t = 0 
330                 while t < timeout and not ssh_ok:
331
332                     cmd = 'echo \'GOOD NODE\''
333                     ((out, err), proc) = self.execute(cmd)
334                     if out.find("GOOD NODE") < 0:
335                         t = t + 60
336                         time.sleep(60)
337                         continue
338                     else:
339                         ssh_ok = True
340                         continue
341             else:
342                 cmd = 'echo \'GOOD NODE\''
343                 ((out, err), proc) = self.execute(cmd)
344                 if not out.find("GOOD NODE") < 0:
345                     ssh_ok = True
346
347             if not ssh_ok:
348                 # the timeout was reach without establishing ssh connection
349                 # the node is blacklisted, deleted from the slice, and a new
350                 # node to provision is discovered
351                 with PlanetlabNode.lock:
352                     self.warning(" Could not SSH login ")
353                     self._blacklist_node(node)
354                     #self._delete_node_from_slice(node)
355                 self.do_discover()
356                 continue
357             
358             # check /proc directory is mounted (ssh_ok = True)
359             # and file system is not read only
360             else:
361                 cmd = 'mount |grep proc'
362                 ((out1, err1), proc1) = self.execute(cmd)
363                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
364                 ((out2, err2), proc2) = self.execute(cmd)
365                 if out1.find("/proc type proc") < 0 or \
366                     "Read-only file system".lower() in err2.lower():
367                     with PlanetlabNode.lock:
368                         self.warning(" Corrupted file system ")
369                         self._blacklist_node(node)
370                         #self._delete_node_from_slice(node)
371                     self.do_discover()
372                     continue
373             
374                 else:
375                     provision_ok = True
376                     if not self.get('hostname'):
377                         self._set_hostname_attr(node)            
378                     # set IP attribute
379                     ip = self._get_ip(node)
380                     self.set("ip", ip)
381                     self.info(" Node provisioned ")            
382             
383         super(PlanetlabNode, self).do_provision()
384
385     def _filter_based_on_attributes(self):
386         """
387         Retrive the list of nodes ids that match user's constraints 
388         """
389         # Map user's defined attributes with tagnames of PlanetLab
390         timeframe = self.get("timeframe")[0]
391         attr_to_tags = {
392             'city' : 'city',
393             'country' : 'country',
394             'region' : 'region',
395             'architecture' : 'arch',
396             'operatingSystem' : 'fcdistro',
397             #'site' : 'pldistro',
398             'minReliability' : 'reliability%s' % timeframe,
399             'maxReliability' : 'reliability%s' % timeframe,
400             'minBandwidth' : 'bw%s' % timeframe,
401             'maxBandwidth' : 'bw%s' % timeframe,
402             'minLoad' : 'load%s' % timeframe,
403             'maxLoad' : 'load%s' % timeframe,
404             'minCpu' : 'cpu%s' % timeframe,
405             'maxCpu' : 'cpu%s' % timeframe,
406         }
407         
408         nodes_id = []
409         filters = {}
410
411         for attr_name, attr_obj in self._attrs.iteritems():
412             attr_value = self.get(attr_name)
413             
414             if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
415                 attr_name != 'timeframe':
416         
417                 attr_tag = attr_to_tags[attr_name]
418                 filters['tagname'] = attr_tag
419
420                 # filter nodes by fixed constraints e.g. operating system
421                 if not 'min' in attr_name and not 'max' in attr_name:
422                     filters['value'] = attr_value
423                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
424
425                 # filter nodes by range constraints e.g. max bandwidth
426                 elif ('min' or 'max') in attr_name:
427                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
428
429         if not filters:
430             nodes = self._get_nodes_id()
431             for node in nodes:
432                 nodes_id.append(node['node_id'])
433         return nodes_id
434                     
435     def _filter_by_fixed_attr(self, filters, nodes_id):
436         """
437         Query PLCAPI for nodes ids matching fixed attributes defined by the
438         user
439         """
440         node_tags = self.plapi.get_node_tags(filters)
441         if node_tags is not None:
442
443             if len(nodes_id) == 0:
444                 # first attribute being matched
445                 for node_tag in node_tags:
446                     nodes_id.append(node_tag['node_id'])
447             else:
448                 # remove the nodes ids that don't match the new attribute
449                 # that is being match
450
451                 nodes_id_tmp = []
452                 for node_tag in node_tags:
453                     if node_tag['node_id'] in nodes_id:
454                         nodes_id_tmp.append(node_tag['node_id'])
455
456                 if len(nodes_id_tmp):
457                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
458                 else:
459                     # no node from before match the new constraint
460                     self.fail_discovery()
461         else:
462             # no nodes match the filter applied
463             self.fail_discovery()
464
465         return nodes_id
466
467     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
468         """
469         Query PLCAPI for nodes ids matching attributes defined in a certain
470         range, by the user
471         """
472         node_tags = self.plapi.get_node_tags(filters)
473         if node_tags:
474             
475             if len(nodes_id) == 0:
476                 # first attribute being matched
477                 for node_tag in node_tags:
478  
479                    # check that matches the min or max restriction
480                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
481                         float(node_tag['value']) > attr_value:
482                         nodes_id.append(node_tag['node_id'])
483
484                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
485                         float(node_tag['value']) < attr_value:
486                         nodes_id.append(node_tag['node_id'])
487             else:
488
489                 # remove the nodes ids that don't match the new attribute
490                 # that is being match
491                 nodes_id_tmp = []
492                 for node_tag in node_tags:
493
494                     # check that matches the min or max restriction and was a
495                     # matching previous filters
496                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
497                         float(node_tag['value']) > attr_value and \
498                         node_tag['node_id'] in nodes_id:
499                         nodes_id_tmp.append(node_tag['node_id'])
500
501                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
502                         float(node_tag['value']) < attr_value and \
503                         node_tag['node_id'] in nodes_id:
504                         nodes_id_tmp.append(node_tag['node_id'])
505
506                 if len(nodes_id_tmp):
507                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
508                 else:
509                     # no node from before match the new constraint
510                     self.fail_discovery()
511
512         else: #TODO CHECK
513             # no nodes match the filter applied
514             self.fail_discovery()
515
516         return nodes_id
517         
518     def _choose_random_node(self, nodes):
519         """
520         From the possible nodes for provision, choose randomly to decrese the
521         probability of different RMs choosing the same node for provision
522         """
523         size = len(nodes)
524         while size:
525             size = size - 1
526             index = randint(0, size)
527             node_id = nodes[index]
528             nodes[index] = nodes[size]
529
530             # check the node is not blacklisted or being provision by other RM
531             # and perform ping to check that is really alive
532             with PlanetlabNode.lock:
533
534                 blist = self.plapi.blacklisted()
535                 plist = self.plapi.reserved()
536                 if node_id not in blist and node_id not in plist:
537                     ping_ok = self._do_ping(node_id)
538                     if not ping_ok:
539                         self._set_hostname_attr(node_id)
540                         self.warning(" Node not responding PING ")
541                         self._blacklist_node(node_id)
542                     else:
543                         # discovered node for provision, added to provision list
544                         self._put_node_in_provision(node_id)
545                         return node_id
546
547     def _get_nodes_id(self, filters=None):
548         return self.plapi.get_nodes(filters, fields=['node_id'])
549
550     def _add_node_to_slice(self, node_id):
551         self.info(" Adding node to slice ")
552         slicename = self.get("username")
553         with PlanetlabNode.lock:
554             slice_nodes = self.plapi.get_slice_nodes(slicename)
555             slice_nodes.append(node_id)
556             self.plapi.add_slice_nodes(slicename, slice_nodes)
557
558     def _delete_node_from_slice(self, node):
559         self.warning(" Deleting node from slice ")
560         slicename = self.get("username")
561         self.plapi.delete_slice_node(slicename, [node])
562
563     def _get_hostname(self):
564         hostname = self.get("hostname")
565         if hostname:
566             return hostname
567         ip = self.get("ip")
568         if ip:
569             hostname = socket.gethostbyaddr(ip)[0]
570             self.set('hostname', hostname)
571             return hostname
572         else:
573             return None
574
575     def _set_hostname_attr(self, node):
576         """
577         Query PLCAPI for the hostname of a certain node id and sets the
578         attribute hostname, it will over write the previous value
579         """
580         hostname = self.plapi.get_nodes(node, ['hostname'])
581         self.set("hostname", hostname[0]['hostname'])
582
583     def _check_if_in_slice(self, nodes_id):
584         """
585         Query PLCAPI to find out if any node id from nodes_id is in the user's
586         slice
587         """
588         slicename = self.get("username")
589         slice_nodes = self.plapi.get_slice_nodes(slicename)
590         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
591         return nodes_inslice
592
593     def _do_ping(self, node_id):
594         """
595         Perform ping command on node's IP matching node id
596         """
597         ping_ok = False
598         ip = self._get_ip(node_id)
599         if not ip: return ping_ok
600
601         command = "ping -c4 %s" % ip
602
603         (out, err) = lexec(command)
604         if not str(out).find("2 received") < 0 or not str(out).find("3 received") < 0 or not \
605             str(out).find("4 received") < 0:
606             ping_ok = True
607        
608         return ping_ok 
609
610     def _blacklist_node(self, node):
611         """
612         Add node mal functioning node to blacklist
613         """
614         self.warning(" Blacklisting malfunctioning node ")
615         self.plapi.blacklist_host(node)
616         if not self._hostname:
617             self.set('hostname', None)
618
619     def _put_node_in_provision(self, node):
620         """
621         Add node to the list of nodes being provisioned, in order for other RMs
622         to not try to provision the same one again
623         """
624         self.plapi.reserve_host(node)
625
626     def _get_ip(self, node_id):
627         """
628         Query PLCAPI for the IP of a node with certain node id
629         """
630         hostname = self.get("hostname") or \
631             self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname']
632         try:
633             ip = sshfuncs.gethostbyname(hostname)
634         except:
635             # Fail while trying to find the IP
636             return None
637         return ip
638
639     def fail_discovery(self):
640         msg = "Discovery failed. No candidates found for node"
641         self.error(msg)
642         raise RuntimeError, msg
643
644     def fail_node_not_alive(self, hostname=None):
645         msg = "Node %s not alive" % hostname
646         raise RuntimeError, msg
647     
648     def fail_node_not_available(self, hostname):
649         msg = "Node %s not available for provisioning" % hostname
650         raise RuntimeError, msg
651
652     def fail_not_enough_nodes(self):
653         msg = "Not enough nodes available for provisioning"
654         raise RuntimeError, msg
655
656     def fail_plapi(self):
657         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
658             " attributes pluser and plpassword."
659         raise RuntimeError, msg
660
661     def valid_connection(self, guid):
662         # TODO: Validate!
663         return True
664
665