Adding logs for PL node for Phung
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import socket
32 import threading
33 import datetime
34
35 @clsinit_copy
36 class PlanetlabNode(LinuxNode):
37     _rtype = "PlanetlabNode"
38     _help = "Controls a PlanetLab host accessible using a SSH key " \
39             "associated to a PlanetLab user account"
40     _backend = "planetlab"
41
42     lock = threading.Lock()
43
44     @classmethod
45     def _register_attributes(cls):
46         ip = Attribute("ip", "PlanetLab host public IP address",
47                     flags = Flags.Design)
48
49         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
50                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
51                     default = "www.planet-lab.eu",
52                     flags = Flags.Credential)
53
54         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
55                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
56                     default = "https://%(hostname)s:443/PLCAPI/",
57                     flags = Flags.Design)
58     
59         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
60                     authenticate in the website) ",
61                     flags = Flags.Credential)
62
63         pl_password = Attribute("plpassword", 
64                         "PlanetLab account password, as \
65                         the one to authenticate in the website) ",
66                         flags = Flags.Credential)
67
68         city = Attribute("city", "Constrain location (city) during resource \
69                 discovery. May use wildcards.",
70                 flags = Flags.Filter)
71
72         country = Attribute("country", "Constrain location (country) during \
73                     resource discovery. May use wildcards.",
74                     flags = Flags.Filter)
75
76         region = Attribute("region", "Constrain location (region) during \
77                     resource discovery. May use wildcards.",
78                     flags = Flags.Filter)
79
80         architecture = Attribute("architecture", "Constrain architecture \
81                         during resource discovery.",
82                         type = Types.Enumerate,
83                         allowed = ["x86_64", 
84                                     "i386"],
85                         flags = Flags.Filter)
86
87         operating_system = Attribute("operatingSystem", "Constrain operating \
88                             system during resource discovery.",
89                             type = Types.Enumerate,
90                             allowed =  ["f8",
91                                         "f12",
92                                         "f14",
93                                         "centos",
94                                         "other"],
95                             flags = Flags.Filter)
96
97         #site = Attribute("site", "Constrain the PlanetLab site this node \
98         #        should reside on.",
99         #        type = Types.Enumerate,
100         #        allowed = ["PLE",
101         #                    "PLC",
102         #                    "PLJ"],
103         #        flags = Flags.Filter)
104
105         min_reliability = Attribute("minReliability", "Constrain reliability \
106                             while picking PlanetLab nodes. Specifies a lower \
107                             acceptable bound.",
108                             type = Types.Double,
109                             range = (1, 100),
110                             flags = Flags.Filter)
111
112         max_reliability = Attribute("maxReliability", "Constrain reliability \
113                             while picking PlanetLab nodes. Specifies an upper \
114                             acceptable bound.",
115                             type = Types.Double,
116                             range = (1, 100),
117                             flags = Flags.Filter)
118
119         min_bandwidth = Attribute("minBandwidth", "Constrain available \
120                             bandwidth while picking PlanetLab nodes. \
121                             Specifies a lower acceptable bound.",
122                             type = Types.Double,
123                             range = (0, 2**31),
124                             flags = Flags.Filter)
125
126         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
127                             bandwidth while picking PlanetLab nodes. \
128                             Specifies an upper acceptable bound.",
129                             type = Types.Double,
130                             range = (0, 2**31),
131                             flags = Flags.Filter)
132
133         min_load = Attribute("minLoad", "Constrain node load average while \
134                     picking PlanetLab nodes. Specifies a lower acceptable \
135                     bound.",
136                     type = Types.Double,
137                     range = (0, 2**31),
138                     flags = Flags.Filter)
139
140         max_load = Attribute("maxLoad", "Constrain node load average while \
141                     picking PlanetLab nodes. Specifies an upper acceptable \
142                     bound.",
143                     type = Types.Double,
144                     range = (0, 2**31),
145                     flags = Flags.Filter)
146
147         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
148                     picking PlanetLab nodes. Specifies a lower acceptable \
149                     bound.",
150                     type = Types.Double,
151                     range = (0, 100),
152                     flags = Flags.Filter)
153
154         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
155                     picking PlanetLab nodes. Specifies an upper acceptable \
156                     bound.",
157                     type = Types.Double,
158                     range = (0, 100),
159                     flags = Flags.Filter)
160
161         timeframe = Attribute("timeframe", "Past time period in which to check\
162                         information about the node. Values are year,month, \
163                         week, latest",
164                         default = "week",
165                         type = Types.Enumerate,
166                         allowed = ["latest",
167                                     "week",
168                                     "month",
169                                     "year"],
170                         flags = Flags.Filter)
171
172 #        plblacklist = Attribute("blacklist", "Take into account the file plblacklist \
173 #                        in the user's home directory under .nepi directory. This file \
174 #                        contains a list of PL nodes to blacklist, and at the end \
175 #                        of the experiment execution the new blacklisted nodes are added.",
176 #                    type = Types.Bool,
177 #                    default = True,
178 #                    flags = Flags.ReadOnly)
179 #
180
181         cls._register_attribute(ip)
182         cls._register_attribute(pl_url)
183         cls._register_attribute(pl_ptn)
184         cls._register_attribute(pl_user)
185         cls._register_attribute(pl_password)
186         #cls._register_attribute(site)
187         cls._register_attribute(city)
188         cls._register_attribute(country)
189         cls._register_attribute(region)
190         cls._register_attribute(architecture)
191         cls._register_attribute(operating_system)
192         cls._register_attribute(min_reliability)
193         cls._register_attribute(max_reliability)
194         cls._register_attribute(min_bandwidth)
195         cls._register_attribute(max_bandwidth)
196         cls._register_attribute(min_load)
197         cls._register_attribute(max_load)
198         cls._register_attribute(min_cpu)
199         cls._register_attribute(max_cpu)
200         cls._register_attribute(timeframe)
201
202     def __init__(self, ec, guid):
203         super(PlanetlabNode, self).__init__(ec, guid)
204
205         self._plapi = None
206         self._node_to_provision = None
207         self._slicenode = False
208         self._hostname = False
209
210         if self.get("gateway") or self.get("gatewayUser"):
211             self.set("gateway", None)
212             self.set("gatewayUser", None)
213
214     def _skip_provision(self):
215         pl_user = self.get("pluser")
216         pl_pass = self.get("plpassword")
217         if not pl_user and not pl_pass:
218             return True
219         else: return False
220     
221     @property
222     def plapi(self):
223         if not self._plapi:
224             pl_user = self.get("pluser")
225             pl_pass = self.get("plpassword")
226             pl_url = self.get("plcApiUrl")
227             pl_ptn = self.get("plcApiPattern")
228
229             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
230                 pl_ptn)
231             
232             if not self._plapi:
233                 self.fail_plapi()
234
235         return self._plapi
236
237     def do_discover(self):
238         """
239         Based on the attributes defined by the user, discover the suitable 
240         nodes for provision.
241         """
242         if self._skip_provision():
243             super(PlanetlabNode, self).do_discover()
244             return
245
246         hostname = self._get_hostname()
247         if hostname:
248             # the user specified one particular node to be provisioned
249             self._hostname = True
250             node_id = self._get_nodes_id({'hostname':hostname})
251             node_id = node_id.pop()['node_id']
252
253             # check that the node is not blacklisted or being provisioned
254             # by other RM
255             with PlanetlabNode.lock:
256                 plist = self.plapi.reserved()
257                 blist = self.plapi.blacklisted()
258                 if node_id not in blist and node_id not in plist:
259                 
260                     # check that is really alive, by performing ping
261                     ping_ok = self._do_ping(node_id)
262                     if not ping_ok:
263                         self._blacklist_node(node_id)
264                         self.fail_node_not_alive(hostname)
265                     else:
266                         if self._check_if_in_slice([node_id]):
267                             self._slicenode = True
268                         self._put_node_in_provision(node_id)
269                         self._node_to_provision = node_id
270                 else:
271                     self.fail_node_not_available(hostname)
272             super(PlanetlabNode, self).do_discover()
273         
274         else:
275             # the user specifies constraints based on attributes, zero, one or 
276             # more nodes can match these constraints 
277             nodes = self._filter_based_on_attributes()
278
279             # nodes that are already part of user's slice have the priority to
280             # provisioned
281             nodes_inslice = self._check_if_in_slice(nodes)
282             nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
283             
284             node_id = None
285             if nodes_inslice:
286                 node_id = self._choose_random_node(nodes_inslice)
287                 self._slicenode = True                
288                 
289             if not node_id:
290                 # Either there were no matching nodes in the user's slice, or
291                 # the nodes in the slice  were blacklisted or being provisioned
292                 # by other RM. Note nodes_not_inslice is never empty
293                 node_id = self._choose_random_node(nodes_not_inslice)
294                 self._slicenode = False
295
296             if node_id:
297                 self._node_to_provision = node_id
298                 try:
299                     self._set_hostname_attr(node_id)
300                     self.info(" Selected node to provision ")
301                     super(PlanetlabNode, self).do_discover()
302                 except:
303                     with PlanetlabNode.lock:
304                         self._blacklist_node(node_id)
305                     self.do_discover()
306             else:
307                self.fail_not_enough_nodes() 
308             
309     def do_provision(self):
310         """
311         Add node to user's slice after verifing that the node is functioning
312         correctly
313         """
314         if self._skip_provision():
315             super(PlanetlabNode, self).do_provision()
316             return
317
318         provision_ok = False
319         ssh_ok = False
320         proc_ok = False
321         timeout = 3600
322
323         while not provision_ok:
324             node = self._node_to_provision
325             if not self._slicenode:
326                 self._add_node_to_slice(node)
327                 self.info( " Node added to slice ")
328             
329                 # check ssh connection
330                 t = 0 
331                 while t < timeout and not ssh_ok:
332
333                     cmd = 'echo \'GOOD NODE\''
334                     ((out, err), proc) = self.execute(cmd)
335                     if out.find("GOOD NODE") < 0:
336                         self.info(" No SSH login, sleeping for 60 seconds ")
337                         t = t + 60
338                         time.sleep(60)
339                         continue
340                     else:
341                         self.info(" SSH login OK ")
342                         ssh_ok = True
343                         continue
344             else:
345                 cmd = 'echo \'GOOD NODE\''
346                 ((out, err), proc) = self.execute(cmd)
347                 if not out.find("GOOD NODE") < 0:
348                     ssh_ok = True
349
350             if not ssh_ok:
351                 # the timeout was reach without establishing ssh connection
352                 # the node is blacklisted, deleted from the slice, and a new
353                 # node to provision is discovered
354                 with PlanetlabNode.lock:
355                     self.warning(" Could not SSH login ")
356                     self._blacklist_node(node)
357                     #self._delete_node_from_slice(node)
358                 self.do_discover()
359                 continue
360             
361             # check /proc directory is mounted (ssh_ok = True)
362             # and file system is not read only
363             else:
364                 cmd = 'mount |grep proc'
365                 ((out1, err1), proc1) = self.execute(cmd)
366                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
367                 ((out2, err2), proc2) = self.execute(cmd)
368                 if out1.find("/proc type proc") < 0 or \
369                     "Read-only file system".lower() in err2.lower():
370                     with PlanetlabNode.lock:
371                         self.warning(" Corrupted file system ")
372                         self._blacklist_node(node)
373                         #self._delete_node_from_slice(node)
374                     self.do_discover()
375                     continue
376             
377                 else:
378                     provision_ok = True
379                     if not self.get('hostname'):
380                         self._set_hostname_attr(node)            
381                     # set IP attribute
382                     ip = self._get_ip(node)
383                     self.set("ip", ip)
384                     self.info(" Node provisioned ")            
385             
386         super(PlanetlabNode, self).do_provision()
387
388     def _filter_based_on_attributes(self):
389         """
390         Retrive the list of nodes ids that match user's constraints 
391         """
392         # Map user's defined attributes with tagnames of PlanetLab
393         timeframe = self.get("timeframe")[0]
394         attr_to_tags = {
395             'city' : 'city',
396             'country' : 'country',
397             'region' : 'region',
398             'architecture' : 'arch',
399             'operatingSystem' : 'fcdistro',
400             #'site' : 'pldistro',
401             'minReliability' : 'reliability%s' % timeframe,
402             'maxReliability' : 'reliability%s' % timeframe,
403             'minBandwidth' : 'bw%s' % timeframe,
404             'maxBandwidth' : 'bw%s' % timeframe,
405             'minLoad' : 'load%s' % timeframe,
406             'maxLoad' : 'load%s' % timeframe,
407             'minCpu' : 'cpu%s' % timeframe,
408             'maxCpu' : 'cpu%s' % timeframe,
409         }
410         
411         nodes_id = []
412         filters = {}
413
414         for attr_name, attr_obj in self._attrs.iteritems():
415             attr_value = self.get(attr_name)
416             
417             if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
418                 attr_name != 'timeframe':
419         
420                 attr_tag = attr_to_tags[attr_name]
421                 filters['tagname'] = attr_tag
422
423                 # filter nodes by fixed constraints e.g. operating system
424                 if not 'min' in attr_name and not 'max' in attr_name:
425                     filters['value'] = attr_value
426                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
427
428                 # filter nodes by range constraints e.g. max bandwidth
429                 elif ('min' or 'max') in attr_name:
430                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
431
432         if not filters:
433             nodes = self._get_nodes_id()
434             for node in nodes:
435                 nodes_id.append(node['node_id'])
436         return nodes_id
437                     
438     def _filter_by_fixed_attr(self, filters, nodes_id):
439         """
440         Query PLCAPI for nodes ids matching fixed attributes defined by the
441         user
442         """
443         node_tags = self.plapi.get_node_tags(filters)
444         if node_tags is not None:
445
446             if len(nodes_id) == 0:
447                 # first attribute being matched
448                 for node_tag in node_tags:
449                     nodes_id.append(node_tag['node_id'])
450             else:
451                 # remove the nodes ids that don't match the new attribute
452                 # that is being match
453
454                 nodes_id_tmp = []
455                 for node_tag in node_tags:
456                     if node_tag['node_id'] in nodes_id:
457                         nodes_id_tmp.append(node_tag['node_id'])
458
459                 if len(nodes_id_tmp):
460                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
461                 else:
462                     # no node from before match the new constraint
463                     self.fail_discovery()
464         else:
465             # no nodes match the filter applied
466             self.fail_discovery()
467
468         return nodes_id
469
470     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
471         """
472         Query PLCAPI for nodes ids matching attributes defined in a certain
473         range, by the user
474         """
475         node_tags = self.plapi.get_node_tags(filters)
476         if node_tags:
477             
478             if len(nodes_id) == 0:
479                 # first attribute being matched
480                 for node_tag in node_tags:
481  
482                    # check that matches the min or max restriction
483                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
484                         float(node_tag['value']) > attr_value:
485                         nodes_id.append(node_tag['node_id'])
486
487                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
488                         float(node_tag['value']) < attr_value:
489                         nodes_id.append(node_tag['node_id'])
490             else:
491
492                 # remove the nodes ids that don't match the new attribute
493                 # that is being match
494                 nodes_id_tmp = []
495                 for node_tag in node_tags:
496
497                     # check that matches the min or max restriction and was a
498                     # matching previous filters
499                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
500                         float(node_tag['value']) > attr_value and \
501                         node_tag['node_id'] in nodes_id:
502                         nodes_id_tmp.append(node_tag['node_id'])
503
504                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
505                         float(node_tag['value']) < attr_value and \
506                         node_tag['node_id'] in nodes_id:
507                         nodes_id_tmp.append(node_tag['node_id'])
508
509                 if len(nodes_id_tmp):
510                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
511                 else:
512                     # no node from before match the new constraint
513                     self.fail_discovery()
514
515         else: #TODO CHECK
516             # no nodes match the filter applied
517             self.fail_discovery()
518
519         return nodes_id
520         
521     def _choose_random_node(self, nodes):
522         """
523         From the possible nodes for provision, choose randomly to decrese the
524         probability of different RMs choosing the same node for provision
525         """
526         size = len(nodes)
527         while size:
528             size = size - 1
529             index = randint(0, size)
530             node_id = nodes[index]
531             nodes[index] = nodes[size]
532
533             # check the node is not blacklisted or being provision by other RM
534             # and perform ping to check that is really alive
535             with PlanetlabNode.lock:
536
537                 blist = self.plapi.blacklisted()
538                 plist = self.plapi.reserved()
539                 if node_id not in blist and node_id not in plist:
540                     ping_ok = self._do_ping(node_id)
541                     if not ping_ok:
542                         self._set_hostname_attr(node_id)
543                         self.warning(" Node not responding PING ")
544                         self._blacklist_node(node_id)
545                     else:
546                         # discovered node for provision, added to provision list
547                         self._put_node_in_provision(node_id)
548                         return node_id
549
550     def _get_nodes_id(self, filters=None):
551         return self.plapi.get_nodes(filters, fields=['node_id'])
552
553     def _add_node_to_slice(self, node_id):
554         self.info(" Adding node to slice ")
555         slicename = self.get("username")
556         with PlanetlabNode.lock:
557             slice_nodes = self.plapi.get_slice_nodes(slicename)
558             slice_nodes.append(node_id)
559             self.plapi.add_slice_nodes(slicename, slice_nodes)
560
561     def _delete_node_from_slice(self, node):
562         self.warning(" Deleting node from slice ")
563         slicename = self.get("username")
564         self.plapi.delete_slice_node(slicename, [node])
565
566     def _get_hostname(self):
567         hostname = self.get("hostname")
568         if hostname:
569             return hostname
570         ip = self.get("ip")
571         if ip:
572             hostname = socket.gethostbyaddr(ip)[0]
573             self.set('hostname', hostname)
574             return hostname
575         else:
576             return None
577
578     def _set_hostname_attr(self, node):
579         """
580         Query PLCAPI for the hostname of a certain node id and sets the
581         attribute hostname, it will over write the previous value
582         """
583         hostname = self.plapi.get_nodes(node, ['hostname'])
584         self.set("hostname", hostname[0]['hostname'])
585
586     def _check_if_in_slice(self, nodes_id):
587         """
588         Query PLCAPI to find out if any node id from nodes_id is in the user's
589         slice
590         """
591         slicename = self.get("username")
592         slice_nodes = self.plapi.get_slice_nodes(slicename)
593         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
594         return nodes_inslice
595
596     def _do_ping(self, node_id):
597         """
598         Perform ping command on node's IP matching node id
599         """
600         ping_ok = False
601         ip = self._get_ip(node_id)
602         if not ip: return ping_ok
603
604         command = "ping -c4 %s" % ip
605
606         (out, err) = lexec(command)
607         if not str(out).find("2 received") < 0 or not str(out).find("3 received") < 0 or not \
608             str(out).find("4 received") < 0:
609             ping_ok = True
610        
611         return ping_ok 
612
613     def _blacklist_node(self, node):
614         """
615         Add node mal functioning node to blacklist
616         """
617         self.warning(" Blacklisting malfunctioning node ")
618         self.plapi.blacklist_host(node)
619         if not self._hostname:
620             self.set('hostname', None)
621
622     def _put_node_in_provision(self, node):
623         """
624         Add node to the list of nodes being provisioned, in order for other RMs
625         to not try to provision the same one again
626         """
627         self.plapi.reserve_host(node)
628
629     def _get_ip(self, node_id):
630         """
631         Query PLCAPI for the IP of a node with certain node id
632         """
633         hostname = self.get("hostname") or \
634             self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname']
635         try:
636             ip = sshfuncs.gethostbyname(hostname)
637         except:
638             # Fail while trying to find the IP
639             return None
640         return ip
641
642     def fail_discovery(self):
643         msg = "Discovery failed. No candidates found for node"
644         self.error(msg)
645         raise RuntimeError, msg
646
647     def fail_node_not_alive(self, hostname=None):
648         msg = "Node %s not alive" % hostname
649         raise RuntimeError, msg
650     
651     def fail_node_not_available(self, hostname):
652         msg = "Node %s not available for provisioning" % hostname
653         raise RuntimeError, msg
654
655     def fail_not_enough_nodes(self):
656         msg = "Not enough nodes available for provisioning"
657         raise RuntimeError, msg
658
659     def fail_plapi(self):
660         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
661             " attributes pluser and plpassword."
662         raise RuntimeError, msg
663
664     def valid_connection(self, guid):
665         # TODO: Validate!
666         return True
667
668