7a8d5a72bb604862de81aa690e0f852a85d85931
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import re
31 import time
32 import socket
33 import threading
34 import datetime
35 import weakref
36
37 @clsinit_copy
38 class PlanetlabNode(LinuxNode):
39     _rtype = "PlanetlabNode"
40     _help = "Controls a PlanetLab host accessible using a SSH key " \
41             "associated to a PlanetLab user account"
42     _backend = "planetlab"
43
44     lock = threading.Lock()
45
46     @classmethod
47     def _register_attributes(cls):
48         ip = Attribute("ip", "PlanetLab host public IP address",
49                     flags = Flags.Design)
50
51         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
52                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
53                     default = "www.planet-lab.eu",
54                     flags = Flags.Credential)
55
56         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
57                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
58                     default = "https://%(hostname)s:443/PLCAPI/",
59                     flags = Flags.Design)
60     
61         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
62                     authenticate in the website) ",
63                     flags = Flags.Credential)
64
65         pl_password = Attribute("plpassword", 
66                         "PlanetLab account password, as \
67                         the one to authenticate in the website) ",
68                         flags = Flags.Credential)
69
70         city = Attribute("city", "Constrain location (city) during resource \
71                 discovery. May use wildcards.",
72                 flags = Flags.Filter)
73
74         country = Attribute("country", "Constrain location (country) during \
75                     resource discovery. May use wildcards.",
76                     flags = Flags.Filter)
77
78         region = Attribute("region", "Constrain location (region) during \
79                     resource discovery. May use wildcards.",
80                     flags = Flags.Filter)
81
82         architecture = Attribute("architecture", "Constrain architecture \
83                         during resource discovery.",
84                         type = Types.Enumerate,
85                         allowed = ["x86_64", 
86                                     "i386"],
87                         flags = Flags.Filter)
88
89         operating_system = Attribute("operatingSystem", "Constrain operating \
90                             system during resource discovery.",
91                             type = Types.Enumerate,
92                             allowed =  ["f8",
93                                         "f12",
94                                         "f14",
95                                         "centos",
96                                         "other"],
97                             flags = Flags.Filter)
98
99         min_reliability = Attribute("minReliability", "Constrain reliability \
100                             while picking PlanetLab nodes. Specifies a lower \
101                             acceptable bound.",
102                             type = Types.Double,
103                             range = (1, 100),
104                             flags = Flags.Filter)
105
106         max_reliability = Attribute("maxReliability", "Constrain reliability \
107                             while picking PlanetLab nodes. Specifies an upper \
108                             acceptable bound.",
109                             type = Types.Double,
110                             range = (1, 100),
111                             flags = Flags.Filter)
112
113         min_bandwidth = Attribute("minBandwidth", "Constrain available \
114                             bandwidth while picking PlanetLab nodes. \
115                             Specifies a lower acceptable bound.",
116                             type = Types.Double,
117                             range = (0, 2**31),
118                             flags = Flags.Filter)
119
120         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
121                             bandwidth while picking PlanetLab nodes. \
122                             Specifies an upper acceptable bound.",
123                             type = Types.Double,
124                             range = (0, 2**31),
125                             flags = Flags.Filter)
126
127         min_load = Attribute("minLoad", "Constrain node load average while \
128                     picking PlanetLab nodes. Specifies a lower acceptable \
129                     bound.",
130                     type = Types.Double,
131                     range = (0, 2**31),
132                     flags = Flags.Filter)
133
134         max_load = Attribute("maxLoad", "Constrain node load average while \
135                     picking PlanetLab nodes. Specifies an upper acceptable \
136                     bound.",
137                     type = Types.Double,
138                     range = (0, 2**31),
139                     flags = Flags.Filter)
140
141         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
142                     picking PlanetLab nodes. Specifies a lower acceptable \
143                     bound.",
144                     type = Types.Double,
145                     range = (0, 100),
146                     flags = Flags.Filter)
147
148         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
149                     picking PlanetLab nodes. Specifies an upper acceptable \
150                     bound.",
151                     type = Types.Double,
152                     range = (0, 100),
153                     flags = Flags.Filter)
154
155         timeframe = Attribute("timeframe", "Past time period in which to check\
156                         information about the node. Values are year,month, \
157                         week, latest",
158                         default = "week",
159                         type = Types.Enumerate,
160                         allowed = ["latest",
161                                     "week",
162                                     "month",
163                                     "year"],
164                         flags = Flags.Filter)
165
166         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
167                         in the user's home directory under .nepi directory. This file \
168                         contains a list of PL nodes to blacklist, and at the end \
169                         of the experiment execution the new blacklisted nodes are added.",
170                     type = Types.Bool,
171                     default = False,
172                     flags = Flags.Global)
173
174
175         cls._register_attribute(ip)
176         cls._register_attribute(pl_url)
177         cls._register_attribute(pl_ptn)
178         cls._register_attribute(pl_user)
179         cls._register_attribute(pl_password)
180         cls._register_attribute(city)
181         cls._register_attribute(country)
182         cls._register_attribute(region)
183         cls._register_attribute(architecture)
184         cls._register_attribute(operating_system)
185         cls._register_attribute(min_reliability)
186         cls._register_attribute(max_reliability)
187         cls._register_attribute(min_bandwidth)
188         cls._register_attribute(max_bandwidth)
189         cls._register_attribute(min_load)
190         cls._register_attribute(max_load)
191         cls._register_attribute(min_cpu)
192         cls._register_attribute(max_cpu)
193         cls._register_attribute(timeframe)
194         cls._register_attribute(plblacklist)
195
196     def __init__(self, ec, guid):
197         super(PlanetlabNode, self).__init__(ec, guid)
198
199         self._ecobj = weakref.ref(ec)
200         self._plapi = None
201         self._node_to_provision = None
202         self._slicenode = False
203         self._hostname = False
204
205         if self.get("gateway") or self.get("gatewayUser"):
206             self.set("gateway", None)
207             self.set("gatewayUser", None)
208
209     def _skip_provision(self):
210         pl_user = self.get("pluser")
211         pl_pass = self.get("plpassword")
212         if not pl_user and not pl_pass:
213             return True
214         else: return False
215     
216     @property
217     def plapi(self):
218         if not self._plapi:
219             pl_user = self.get("pluser")
220             pl_pass = self.get("plpassword")
221             pl_url = self.get("plcApiUrl")
222             pl_ptn = self.get("plcApiPattern")
223             _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
224                 pl_ptn, self._ecobj())
225             
226             if not _plapi:
227                 self.fail_plapi()
228         
229             self._plapi = weakref.ref(_plapi)
230
231         return self._plapi()
232
233     def do_discover(self):
234         """
235         Based on the attributes defined by the user, discover the suitable 
236         nodes for provision.
237         """
238         if self._skip_provision():
239             super(PlanetlabNode, self).do_discover()
240             return
241
242         hostname = self._get_hostname()
243         if hostname:
244             # the user specified one particular node to be provisioned
245             self._hostname = True
246             node_id = self._get_nodes_id({'hostname':hostname})
247             node_id = node_id.pop()['node_id']
248
249             # check that the node is not blacklisted or being provisioned
250             # by other RM
251             with PlanetlabNode.lock:
252                 plist = self.plapi.reserved()
253                 blist = self.plapi.blacklisted()
254                 if node_id not in blist and node_id not in plist:
255                 
256                     # check that is really alive, by performing ping
257                     ping_ok = self._do_ping(node_id)
258                     if not ping_ok:
259                         self._blacklist_node(node_id)
260                         self.fail_node_not_alive(hostname)
261                     else:
262                         if self._check_if_in_slice([node_id]):
263                             self._slicenode = True
264                         self._put_node_in_provision(node_id)
265                         self._node_to_provision = node_id
266                 else:
267                     self.fail_node_not_available(hostname)
268             super(PlanetlabNode, self).do_discover()
269         
270         else:
271             # the user specifies constraints based on attributes, zero, one or 
272             # more nodes can match these constraints 
273             nodes = self._filter_based_on_attributes()
274
275             # nodes that are already part of user's slice have the priority to
276             # provisioned
277             nodes_inslice = self._check_if_in_slice(nodes)
278             nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
279             
280             node_id = None
281             if nodes_inslice:
282                 node_id = self._choose_random_node(nodes_inslice)
283                 self._slicenode = True                
284                 
285             if not node_id:
286                 # Either there were no matching nodes in the user's slice, or
287                 # the nodes in the slice  were blacklisted or being provisioned
288                 # by other RM. Note nodes_not_inslice is never empty
289                 node_id = self._choose_random_node(nodes_not_inslice)
290                 self._slicenode = False
291
292             if node_id:
293                 self._node_to_provision = node_id
294                 try:
295                     self._set_hostname_attr(node_id)
296                     self.info(" Selected node to provision ")
297                     super(PlanetlabNode, self).do_discover()
298                 except:
299                     with PlanetlabNode.lock:
300                         self._blacklist_node(node_id)
301                     self.do_discover()
302             else:
303                self.fail_not_enough_nodes() 
304             
305     def do_provision(self):
306         """
307         Add node to user's slice after verifing that the node is functioning
308         correctly
309         """
310         if self._skip_provision():
311             super(PlanetlabNode, self).do_provision()
312             return
313
314         provision_ok = False
315         ssh_ok = False
316         proc_ok = False
317         timeout = 1800
318
319         while not provision_ok:
320             node = self._node_to_provision
321             if not self._slicenode:
322                 self._add_node_to_slice(node)
323                 if self._check_if_in_slice([node]):
324                     self.debug( "Node added to slice" )
325                 else:
326                     self.warning(" Could not add to slice ")
327                     with PlanetlabNode.lock:
328                         self._blacklist_node(node)
329                     self.do_discover()
330                     continue
331
332                 # check ssh connection
333                 t = 0 
334                 while t < timeout and not ssh_ok:
335
336                     cmd = 'echo \'GOOD NODE\''
337                     ((out, err), proc) = self.execute(cmd)
338                     if out.find("GOOD NODE") < 0:
339                         self.debug( "No SSH connection, waiting 60s" )
340                         t = t + 60
341                         time.sleep(60)
342                         continue
343                     else:
344                         self.debug( "SSH OK" )
345                         ssh_ok = True
346                         continue
347             else:
348                 cmd = 'echo \'GOOD NODE\''
349                 ((out, err), proc) = self.execute(cmd)
350                 if not out.find("GOOD NODE") < 0:
351                     ssh_ok = True
352
353             if not ssh_ok:
354                 # the timeout was reach without establishing ssh connection
355                 # the node is blacklisted, deleted from the slice, and a new
356                 # node to provision is discovered
357                 with PlanetlabNode.lock:
358                     self.warning(" Could not SSH login ")
359                     self._blacklist_node(node)
360                     #self._delete_node_from_slice(node)
361                 self.do_discover()
362                 continue
363             
364             # check /proc directory is mounted (ssh_ok = True)
365             # and file system is not read only
366             else:
367                 cmd = 'mount |grep proc'
368                 ((out1, err1), proc1) = self.execute(cmd)
369                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
370                 ((out2, err2), proc2) = self.execute(cmd)
371                 if out1.find("/proc type proc") < 0 or \
372                     "Read-only file system".lower() in err2.lower():
373                     with PlanetlabNode.lock:
374                         self.warning(" Corrupted file system ")
375                         self._blacklist_node(node)
376                         #self._delete_node_from_slice(node)
377                     self.do_discover()
378                     continue
379             
380                 else:
381                     provision_ok = True
382                     if not self.get('hostname'):
383                         self._set_hostname_attr(node)            
384                     # set IP attribute
385                     ip = self._get_ip(node)
386                     self.set("ip", ip)
387                     self.info(" Node provisioned ")            
388             
389         super(PlanetlabNode, self).do_provision()
390
391     def do_release(self):
392         super(PlanetlabNode, self).do_release()
393         if self.state == ResourceState.RELEASED and not self._skip_provision():
394             self.debug(" Releasing PLC API ")
395             self.plapi.release()
396
397     def _filter_based_on_attributes(self):
398         """
399         Retrive the list of nodes ids that match user's constraints 
400         """
401         # Map user's defined attributes with tagnames of PlanetLab
402         timeframe = self.get("timeframe")[0]
403         attr_to_tags = {
404             'city' : 'city',
405             'country' : 'country',
406             'region' : 'region',
407             'architecture' : 'arch',
408             'operatingSystem' : 'fcdistro',
409             'minReliability' : 'reliability%s' % timeframe,
410             'maxReliability' : 'reliability%s' % timeframe,
411             'minBandwidth' : 'bw%s' % timeframe,
412             'maxBandwidth' : 'bw%s' % timeframe,
413             'minLoad' : 'load%s' % timeframe,
414             'maxLoad' : 'load%s' % timeframe,
415             'minCpu' : 'cpu%s' % timeframe,
416             'maxCpu' : 'cpu%s' % timeframe,
417         }
418         
419         nodes_id = []
420         filters = {}
421
422         for attr_name, attr_obj in self._attrs.iteritems():
423             attr_value = self.get(attr_name)
424             
425             if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
426                 attr_name != 'timeframe':
427         
428                 attr_tag = attr_to_tags[attr_name]
429                 filters['tagname'] = attr_tag
430
431                 # filter nodes by fixed constraints e.g. operating system
432                 if not 'min' in attr_name and not 'max' in attr_name:
433                     filters['value'] = attr_value
434                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
435
436                 # filter nodes by range constraints e.g. max bandwidth
437                 elif ('min' or 'max') in attr_name:
438                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
439
440         if not filters:
441             nodes = self._get_nodes_id()
442             for node in nodes:
443                 nodes_id.append(node['node_id'])
444         return nodes_id
445                     
446     def _filter_by_fixed_attr(self, filters, nodes_id):
447         """
448         Query PLCAPI for nodes ids matching fixed attributes defined by the
449         user
450         """
451         node_tags = self.plapi.get_node_tags(filters)
452         if node_tags is not None:
453
454             if len(nodes_id) == 0:
455                 # first attribute being matched
456                 for node_tag in node_tags:
457                     nodes_id.append(node_tag['node_id'])
458             else:
459                 # remove the nodes ids that don't match the new attribute
460                 # that is being match
461
462                 nodes_id_tmp = []
463                 for node_tag in node_tags:
464                     if node_tag['node_id'] in nodes_id:
465                         nodes_id_tmp.append(node_tag['node_id'])
466
467                 if len(nodes_id_tmp):
468                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
469                 else:
470                     # no node from before match the new constraint
471                     self.fail_discovery()
472         else:
473             # no nodes match the filter applied
474             self.fail_discovery()
475
476         return nodes_id
477
478     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
479         """
480         Query PLCAPI for nodes ids matching attributes defined in a certain
481         range, by the user
482         """
483         node_tags = self.plapi.get_node_tags(filters)
484         if node_tags:
485             
486             if len(nodes_id) == 0:
487                 # first attribute being matched
488                 for node_tag in node_tags:
489  
490                    # check that matches the min or max restriction
491                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
492                         float(node_tag['value']) > attr_value:
493                         nodes_id.append(node_tag['node_id'])
494
495                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
496                         float(node_tag['value']) < attr_value:
497                         nodes_id.append(node_tag['node_id'])
498             else:
499
500                 # remove the nodes ids that don't match the new attribute
501                 # that is being match
502                 nodes_id_tmp = []
503                 for node_tag in node_tags:
504
505                     # check that matches the min or max restriction and was a
506                     # matching previous filters
507                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
508                         float(node_tag['value']) > attr_value and \
509                         node_tag['node_id'] in nodes_id:
510                         nodes_id_tmp.append(node_tag['node_id'])
511
512                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
513                         float(node_tag['value']) < attr_value and \
514                         node_tag['node_id'] in nodes_id:
515                         nodes_id_tmp.append(node_tag['node_id'])
516
517                 if len(nodes_id_tmp):
518                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
519                 else:
520                     # no node from before match the new constraint
521                     self.fail_discovery()
522
523         else: #TODO CHECK
524             # no nodes match the filter applied
525             self.fail_discovery()
526
527         return nodes_id
528         
529     def _choose_random_node(self, nodes):
530         """
531         From the possible nodes for provision, choose randomly to decrese the
532         probability of different RMs choosing the same node for provision
533         """
534         size = len(nodes)
535         while size:
536             size = size - 1
537             index = randint(0, size)
538             node_id = nodes[index]
539             nodes[index] = nodes[size]
540
541             # check the node is not blacklisted or being provision by other RM
542             # and perform ping to check that is really alive
543             with PlanetlabNode.lock:
544
545                 blist = self.plapi.blacklisted()
546                 plist = self.plapi.reserved()
547                 if node_id not in blist and node_id not in plist:
548                     ping_ok = self._do_ping(node_id)
549                     if not ping_ok:
550                         self._set_hostname_attr(node_id)
551                         self.warning(" Node not responding PING ")
552                         self._blacklist_node(node_id)
553                     else:
554                         # discovered node for provision, added to provision list
555                         self._put_node_in_provision(node_id)
556                         return node_id
557
558     def _get_nodes_id(self, filters=None):
559         return self.plapi.get_nodes(filters, fields=['node_id'])
560
561     def _add_node_to_slice(self, node_id):
562         self.info(" Adding node to slice ")
563         slicename = self.get("username")
564         with PlanetlabNode.lock:
565             slice_nodes = self.plapi.get_slice_nodes(slicename)
566             self.debug(" Previous slice nodes %s " % slice_nodes)
567             slice_nodes.append(node_id)
568             self.plapi.add_slice_nodes(slicename, slice_nodes)
569
570     def _delete_node_from_slice(self, node):
571         self.warning(" Deleting node from slice ")
572         slicename = self.get("username")
573         self.plapi.delete_slice_node(slicename, [node])
574
575     def _get_hostname(self):
576         hostname = self.get("hostname")
577         if hostname:
578             return hostname
579         ip = self.get("ip")
580         if ip:
581             hostname = socket.gethostbyaddr(ip)[0]
582             self.set('hostname', hostname)
583             return hostname
584         else:
585             return None
586
587     def _set_hostname_attr(self, node):
588         """
589         Query PLCAPI for the hostname of a certain node id and sets the
590         attribute hostname, it will over write the previous value
591         """
592         hostname = self.plapi.get_nodes(node, ['hostname'])
593         self.set("hostname", hostname[0]['hostname'])
594
595     def _check_if_in_slice(self, nodes_id):
596         """
597         Query PLCAPI to find out if any node id from nodes_id is in the user's
598         slice
599         """
600         slicename = self.get("username")
601         slice_nodes = self.plapi.get_slice_nodes(slicename)
602         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
603         return nodes_inslice
604
605     def _do_ping(self, node_id):
606         """
607         Perform ping command on node's IP matching node id
608         """
609         ping_ok = False
610         ip = self._get_ip(node_id)
611         if ip:
612             command = "ping -c4 %s" % ip
613             (out, err) = lexec(command)
614
615             m = re.search("(\d+)% packet loss", str(out))
616             if m and int(m.groups()[0]) < 50:
617                 ping_ok = True
618        
619         return ping_ok 
620
621     def _blacklist_node(self, node):
622         """
623         Add node mal functioning node to blacklist
624         """
625         self.warning(" Blacklisting malfunctioning node ")
626         self.plapi.blacklist_host(node)
627         if not self._hostname:
628             self.set('hostname', None)
629
630     def _put_node_in_provision(self, node):
631         """
632         Add node to the list of nodes being provisioned, in order for other RMs
633         to not try to provision the same one again
634         """
635         self.plapi.reserve_host(node)
636
637     def _get_ip(self, node_id):
638         """
639         Query PLCAPI for the IP of a node with certain node id
640         """
641         hostname = self.get("hostname") or \
642             self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname']
643         try:
644             ip = sshfuncs.gethostbyname(hostname)
645         except:
646             # Fail while trying to find the IP
647             return None
648         return ip
649
650     def fail_discovery(self):
651         msg = "Discovery failed. No candidates found for node"
652         self.error(msg)
653         raise RuntimeError, msg
654
655     def fail_node_not_alive(self, hostname=None):
656         msg = "Node %s not alive" % hostname
657         raise RuntimeError, msg
658     
659     def fail_node_not_available(self, hostname):
660         msg = "Node %s not available for provisioning" % hostname
661         raise RuntimeError, msg
662
663     def fail_not_enough_nodes(self):
664         msg = "Not enough nodes available for provisioning"
665         raise RuntimeError, msg
666
667     def fail_plapi(self):
668         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
669             " attributes pluser and plpassword."
670         raise RuntimeError, msg
671
672     def valid_connection(self, guid):
673         # TODO: Validate!
674         return True
675
676