01b8a859efae38e72b96b2011b6b9f1f98913f7a
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import re
31 import os
32 import time
33 import socket
34 import threading
35 import datetime
36 import weakref
37
38 @clsinit_copy
39 class PlanetlabNode(LinuxNode):
40     _rtype = "planetlab::Node"
41     _help = "Controls a PlanetLab host accessible using a SSH key " \
42             "associated to a PlanetLab user account"
43     _backend = "planetlab"
44
45     lock = threading.Lock()
46
47     @classmethod
48     def _register_attributes(cls):
49         ip = Attribute("ip", "PlanetLab host public IP address",
50                     flags = Flags.Design)
51
52         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
53                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
54                     default = "www.planet-lab.eu",
55                     flags = Flags.Credential)
56
57         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
58                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
59                     default = "https://%(hostname)s:443/PLCAPI/",
60                     flags = Flags.Design)
61     
62         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
63                     authenticate in the website) ",
64                     flags = Flags.Credential)
65
66         pl_password = Attribute("plpassword", 
67                         "PlanetLab account password, as \
68                         the one to authenticate in the website) ",
69                         flags = Flags.Credential)
70
71         city = Attribute("city", "Constrain location (city) during resource \
72                 discovery. May use wildcards.",
73                 flags = Flags.Filter)
74
75         country = Attribute("country", "Constrain location (country) during \
76                     resource discovery. May use wildcards.",
77                     flags = Flags.Filter)
78
79         region = Attribute("region", "Constrain location (region) during \
80                     resource discovery. May use wildcards.",
81                     flags = Flags.Filter)
82
83         architecture = Attribute("architecture", "Constrain architecture \
84                         during resource discovery.",
85                         type = Types.Enumerate,
86                         allowed = ["x86_64", 
87                                     "i386"],
88                         flags = Flags.Filter)
89
90         operating_system = Attribute("operatingSystem", "Constrain operating \
91                             system during resource discovery.",
92                             type = Types.Enumerate,
93                             allowed =  ["f8",
94                                         "f12",
95                                         "f14",
96                                         "centos",
97                                         "other"],
98                             flags = Flags.Filter)
99
100         min_reliability = Attribute("minReliability", "Constrain reliability \
101                             while picking PlanetLab nodes. Specifies a lower \
102                             acceptable bound.",
103                             type = Types.Double,
104                             range = (1, 100),
105                             flags = Flags.Filter)
106
107         max_reliability = Attribute("maxReliability", "Constrain reliability \
108                             while picking PlanetLab nodes. Specifies an upper \
109                             acceptable bound.",
110                             type = Types.Double,
111                             range = (1, 100),
112                             flags = Flags.Filter)
113
114         min_bandwidth = Attribute("minBandwidth", "Constrain available \
115                             bandwidth while picking PlanetLab nodes. \
116                             Specifies a lower acceptable bound.",
117                             type = Types.Double,
118                             range = (0, 2**31),
119                             flags = Flags.Filter)
120
121         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
122                             bandwidth while picking PlanetLab nodes. \
123                             Specifies an upper acceptable bound.",
124                             type = Types.Double,
125                             range = (0, 2**31),
126                             flags = Flags.Filter)
127
128         min_load = Attribute("minLoad", "Constrain node load average while \
129                     picking PlanetLab nodes. Specifies a lower acceptable \
130                     bound.",
131                     type = Types.Double,
132                     range = (0, 2**31),
133                     flags = Flags.Filter)
134
135         max_load = Attribute("maxLoad", "Constrain node load average while \
136                     picking PlanetLab nodes. Specifies an upper acceptable \
137                     bound.",
138                     type = Types.Double,
139                     range = (0, 2**31),
140                     flags = Flags.Filter)
141
142         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
143                     picking PlanetLab nodes. Specifies a lower acceptable \
144                     bound.",
145                     type = Types.Double,
146                     range = (0, 100),
147                     flags = Flags.Filter)
148
149         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
150                     picking PlanetLab nodes. Specifies an upper acceptable \
151                     bound.",
152                     type = Types.Double,
153                     range = (0, 100),
154                     flags = Flags.Filter)
155
156         timeframe = Attribute("timeframe", "Past time period in which to check\
157                         information about the node. Values are year,month, \
158                         week, latest",
159                         default = "week",
160                         type = Types.Enumerate,
161                         allowed = ["latest",
162                                     "week",
163                                     "month",
164                                     "year"],
165                         flags = Flags.Filter)
166
167         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
168                         in the user's home directory under .nepi directory. This file \
169                         contains a list of PL nodes to blacklist, and at the end \
170                         of the experiment execution the new blacklisted nodes are added.",
171                     type = Types.Bool,
172                     default = False,
173                     flags = Flags.Global)
174
175         cls._register_attribute(ip)
176         cls._register_attribute(pl_url)
177         cls._register_attribute(pl_ptn)
178         cls._register_attribute(pl_user)
179         cls._register_attribute(pl_password)
180         cls._register_attribute(city)
181         cls._register_attribute(country)
182         cls._register_attribute(region)
183         cls._register_attribute(architecture)
184         cls._register_attribute(operating_system)
185         cls._register_attribute(min_reliability)
186         cls._register_attribute(max_reliability)
187         cls._register_attribute(min_bandwidth)
188         cls._register_attribute(max_bandwidth)
189         cls._register_attribute(min_load)
190         cls._register_attribute(max_load)
191         cls._register_attribute(min_cpu)
192         cls._register_attribute(max_cpu)
193         cls._register_attribute(timeframe)
194         cls._register_attribute(plblacklist)
195
196     def __init__(self, ec, guid):
197         super(PlanetlabNode, self).__init__(ec, guid)
198
199         self._ecobj = weakref.ref(ec)
200         self._plapi = None
201         self._node_to_provision = None
202         self._slicenode = False
203         self._hostname = False
204
205         if self.get("gateway") or self.get("gatewayUser"):
206             self.set("gateway", None)
207             self.set("gatewayUser", None)
208
209         # Blacklist file
210         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
211         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
212         if not os.path.exists(plblacklist_file):
213             if os.path.isdir(nepi_home):
214                 open(plblacklist_file, 'w').close()
215             else:
216                 os.makedirs(nepi_home)
217                 open(plblacklist_file, 'w').close()
218
219     def _skip_provision(self):
220         pl_user = self.get("pluser")
221         pl_pass = self.get("plpassword")
222         if not pl_user and not pl_pass:
223             return True
224         else: return False
225     
226     @property
227     def plapi(self):
228         if not self._plapi:
229             pl_user = self.get("pluser")
230             pl_pass = self.get("plpassword")
231             pl_url = self.get("plcApiUrl")
232             pl_ptn = self.get("plcApiPattern")
233             _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
234                 pl_ptn, self._ecobj())
235             
236             if not _plapi:
237                 self.fail_plapi()
238         
239             self._plapi = weakref.ref(_plapi)
240
241         return self._plapi()
242
243     def do_discover(self):
244         """
245         Based on the attributes defined by the user, discover the suitable 
246         nodes for provision.
247         """
248         if self._skip_provision():
249             super(PlanetlabNode, self).do_discover()
250             return
251
252         hostname = self._get_hostname()
253         if hostname:
254             # the user specified one particular node to be provisioned
255             self._hostname = True
256             node_id = self._get_nodes_id({'hostname':hostname})
257             node_id = node_id.pop()['node_id']
258
259             # check that the node is not blacklisted or being provisioned
260             # by other RM
261             with PlanetlabNode.lock:
262                 plist = self.plapi.reserved()
263                 blist = self.plapi.blacklisted()
264                 if node_id not in blist and node_id not in plist:
265                 
266                     # check that is really alive, by performing ping
267                     ping_ok = self._do_ping(node_id)
268                     if not ping_ok:
269                         self._blacklist_node(node_id)
270                         self.fail_node_not_alive(hostname)
271                     else:
272                         if self._check_if_in_slice([node_id]):
273                             self._slicenode = True
274                         self._put_node_in_provision(node_id)
275                         self._node_to_provision = node_id
276                 else:
277                     self.fail_node_not_available(hostname)
278             super(PlanetlabNode, self).do_discover()
279         
280         else:
281             # the user specifies constraints based on attributes, zero, one or 
282             # more nodes can match these constraints 
283             nodes = self._filter_based_on_attributes()
284
285             # nodes that are already part of user's slice have the priority to
286             # provisioned
287             nodes_inslice = self._check_if_in_slice(nodes)
288             nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
289             
290             node_id = None
291             if nodes_inslice:
292                 node_id = self._choose_random_node(nodes_inslice)
293                 self._slicenode = True                
294                 
295             if not node_id:
296                 # Either there were no matching nodes in the user's slice, or
297                 # the nodes in the slice  were blacklisted or being provisioned
298                 # by other RM. Note nodes_not_inslice is never empty
299                 node_id = self._choose_random_node(nodes_not_inslice)
300                 self._slicenode = False
301
302             if node_id:
303                 self._node_to_provision = node_id
304                 try:
305                     self._set_hostname_attr(node_id)
306                     self.info(" Selected node to provision ")
307                     super(PlanetlabNode, self).do_discover()
308                 except:
309                     with PlanetlabNode.lock:
310                         self._blacklist_node(node_id)
311                     self.do_discover()
312             else:
313                self.fail_not_enough_nodes() 
314             
315     def do_provision(self):
316         """
317         Add node to user's slice after verifing that the node is functioning
318         correctly
319         """
320         if self._skip_provision():
321             super(PlanetlabNode, self).do_provision()
322             return
323
324         provision_ok = False
325         ssh_ok = False
326         proc_ok = False
327         timeout = 1800
328
329         while not provision_ok:
330             node = self._node_to_provision
331             if not self._slicenode:
332                 self._add_node_to_slice(node)
333                 if self._check_if_in_slice([node]):
334                     self.debug( "Node added to slice" )
335                 else:
336                     self.warning(" Could not add to slice ")
337                     with PlanetlabNode.lock:
338                         self._blacklist_node(node)
339                     self.do_discover()
340                     continue
341
342                 # check ssh connection
343                 t = 0 
344                 while t < timeout and not ssh_ok:
345
346                     cmd = 'echo \'GOOD NODE\''
347                     ((out, err), proc) = self.execute(cmd)
348                     if out.find("GOOD NODE") < 0:
349                         self.debug( "No SSH connection, waiting 60s" )
350                         t = t + 60
351                         time.sleep(60)
352                         continue
353                     else:
354                         self.debug( "SSH OK" )
355                         ssh_ok = True
356                         continue
357             else:
358                 cmd = 'echo \'GOOD NODE\''
359                 ((out, err), proc) = self.execute(cmd)
360                 if not out.find("GOOD NODE") < 0:
361                     ssh_ok = True
362
363             if not ssh_ok:
364                 # the timeout was reach without establishing ssh connection
365                 # the node is blacklisted, deleted from the slice, and a new
366                 # node to provision is discovered
367                 with PlanetlabNode.lock:
368                     self.warning(" Could not SSH login ")
369                     self._blacklist_node(node)
370                     #self._delete_node_from_slice(node)
371                 self.do_discover()
372                 continue
373             
374             # check /proc directory is mounted (ssh_ok = True)
375             # and file system is not read only
376             else:
377                 cmd = 'mount |grep proc'
378                 ((out1, err1), proc1) = self.execute(cmd)
379                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
380                 ((out2, err2), proc2) = self.execute(cmd)
381                 if out1.find("/proc type proc") < 0 or \
382                     "Read-only file system".lower() in err2.lower():
383                     with PlanetlabNode.lock:
384                         self.warning(" Corrupted file system ")
385                         self._blacklist_node(node)
386                         #self._delete_node_from_slice(node)
387                     self.do_discover()
388                     continue
389             
390                 else:
391                     provision_ok = True
392                     if not self.get('hostname'):
393                         self._set_hostname_attr(node)            
394                     # set IP attribute
395                     ip = self._get_ip(node)
396                     self.set("ip", ip)
397                     self.info(" Node provisioned ")            
398             
399         super(PlanetlabNode, self).do_provision()
400
401     def do_release(self):
402         super(PlanetlabNode, self).do_release()
403         if self.state == ResourceState.RELEASED and not self._skip_provision():
404             self.debug(" Releasing PLC API ")
405             self.plapi.release()
406
407     def _filter_based_on_attributes(self):
408         """
409         Retrive the list of nodes ids that match user's constraints 
410         """
411         # Map user's defined attributes with tagnames of PlanetLab
412         timeframe = self.get("timeframe")[0]
413         attr_to_tags = {
414             'city' : 'city',
415             'country' : 'country',
416             'region' : 'region',
417             'architecture' : 'arch',
418             'operatingSystem' : 'fcdistro',
419             'minReliability' : 'reliability%s' % timeframe,
420             'maxReliability' : 'reliability%s' % timeframe,
421             'minBandwidth' : 'bw%s' % timeframe,
422             'maxBandwidth' : 'bw%s' % timeframe,
423             'minLoad' : 'load%s' % timeframe,
424             'maxLoad' : 'load%s' % timeframe,
425             'minCpu' : 'cpu%s' % timeframe,
426             'maxCpu' : 'cpu%s' % timeframe,
427         }
428         
429         nodes_id = []
430         filters = {}
431
432         for attr_name, attr_obj in self._attrs.iteritems():
433             attr_value = self.get(attr_name)
434             
435             if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
436                 attr_name != 'timeframe':
437         
438                 attr_tag = attr_to_tags[attr_name]
439                 filters['tagname'] = attr_tag
440
441                 # filter nodes by fixed constraints e.g. operating system
442                 if not 'min' in attr_name and not 'max' in attr_name:
443                     filters['value'] = attr_value
444                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
445
446                 # filter nodes by range constraints e.g. max bandwidth
447                 elif ('min' or 'max') in attr_name:
448                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
449
450         if not filters:
451             nodes = self._get_nodes_id()
452             for node in nodes:
453                 nodes_id.append(node['node_id'])
454         return nodes_id
455                     
456     def _filter_by_fixed_attr(self, filters, nodes_id):
457         """
458         Query PLCAPI for nodes ids matching fixed attributes defined by the
459         user
460         """
461         node_tags = self.plapi.get_node_tags(filters)
462         if node_tags is not None:
463
464             if len(nodes_id) == 0:
465                 # first attribute being matched
466                 for node_tag in node_tags:
467                     nodes_id.append(node_tag['node_id'])
468             else:
469                 # remove the nodes ids that don't match the new attribute
470                 # that is being match
471
472                 nodes_id_tmp = []
473                 for node_tag in node_tags:
474                     if node_tag['node_id'] in nodes_id:
475                         nodes_id_tmp.append(node_tag['node_id'])
476
477                 if len(nodes_id_tmp):
478                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
479                 else:
480                     # no node from before match the new constraint
481                     self.fail_discovery()
482         else:
483             # no nodes match the filter applied
484             self.fail_discovery()
485
486         return nodes_id
487
488     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
489         """
490         Query PLCAPI for nodes ids matching attributes defined in a certain
491         range, by the user
492         """
493         node_tags = self.plapi.get_node_tags(filters)
494         if node_tags:
495             
496             if len(nodes_id) == 0:
497                 # first attribute being matched
498                 for node_tag in node_tags:
499  
500                    # check that matches the min or max restriction
501                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
502                         float(node_tag['value']) > attr_value:
503                         nodes_id.append(node_tag['node_id'])
504
505                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
506                         float(node_tag['value']) < attr_value:
507                         nodes_id.append(node_tag['node_id'])
508             else:
509
510                 # remove the nodes ids that don't match the new attribute
511                 # that is being match
512                 nodes_id_tmp = []
513                 for node_tag in node_tags:
514
515                     # check that matches the min or max restriction and was a
516                     # matching previous filters
517                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
518                         float(node_tag['value']) > attr_value and \
519                         node_tag['node_id'] in nodes_id:
520                         nodes_id_tmp.append(node_tag['node_id'])
521
522                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
523                         float(node_tag['value']) < attr_value and \
524                         node_tag['node_id'] in nodes_id:
525                         nodes_id_tmp.append(node_tag['node_id'])
526
527                 if len(nodes_id_tmp):
528                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
529                 else:
530                     # no node from before match the new constraint
531                     self.fail_discovery()
532
533         else: #TODO CHECK
534             # no nodes match the filter applied
535             self.fail_discovery()
536
537         return nodes_id
538         
539     def _choose_random_node(self, nodes):
540         """
541         From the possible nodes for provision, choose randomly to decrese the
542         probability of different RMs choosing the same node for provision
543         """
544         size = len(nodes)
545         while size:
546             size = size - 1
547             index = randint(0, size)
548             node_id = nodes[index]
549             nodes[index] = nodes[size]
550
551             # check the node is not blacklisted or being provision by other RM
552             # and perform ping to check that is really alive
553             with PlanetlabNode.lock:
554
555                 blist = self.plapi.blacklisted()
556                 plist = self.plapi.reserved()
557                 if node_id not in blist and node_id not in plist:
558                     ping_ok = self._do_ping(node_id)
559                     if not ping_ok:
560                         self._set_hostname_attr(node_id)
561                         self.warning(" Node not responding PING ")
562                         self._blacklist_node(node_id)
563                     else:
564                         # discovered node for provision, added to provision list
565                         self._put_node_in_provision(node_id)
566                         return node_id
567
568     def _get_nodes_id(self, filters=None):
569         return self.plapi.get_nodes(filters, fields=['node_id'])
570
571     def _add_node_to_slice(self, node_id):
572         self.info(" Adding node to slice ")
573         slicename = self.get("username")
574         with PlanetlabNode.lock:
575             slice_nodes = self.plapi.get_slice_nodes(slicename)
576             self.debug(" Previous slice nodes %s " % slice_nodes)
577             slice_nodes.append(node_id)
578             self.plapi.add_slice_nodes(slicename, slice_nodes)
579
580     def _delete_node_from_slice(self, node):
581         self.warning(" Deleting node from slice ")
582         slicename = self.get("username")
583         self.plapi.delete_slice_node(slicename, [node])
584
585     def _get_hostname(self):
586         hostname = self.get("hostname")
587         if hostname:
588             return hostname
589         ip = self.get("ip")
590         if ip:
591             hostname = socket.gethostbyaddr(ip)[0]
592             self.set('hostname', hostname)
593             return hostname
594         else:
595             return None
596
597     def _set_hostname_attr(self, node):
598         """
599         Query PLCAPI for the hostname of a certain node id and sets the
600         attribute hostname, it will over write the previous value
601         """
602         hostname = self.plapi.get_nodes(node, ['hostname'])
603         self.set("hostname", hostname[0]['hostname'])
604
605     def _check_if_in_slice(self, nodes_id):
606         """
607         Query PLCAPI to find out if any node id from nodes_id is in the user's
608         slice
609         """
610         slicename = self.get("username")
611         slice_nodes = self.plapi.get_slice_nodes(slicename)
612         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
613         return nodes_inslice
614
615     def _do_ping(self, node_id):
616         """
617         Perform ping command on node's IP matching node id
618         """
619         ping_ok = False
620         ip = self._get_ip(node_id)
621         if ip:
622             command = "ping -c4 %s" % ip
623             (out, err) = lexec(command)
624
625             m = re.search("(\d+)% packet loss", str(out))
626             if m and int(m.groups()[0]) < 50:
627                 ping_ok = True
628        
629         return ping_ok 
630
631     def _blacklist_node(self, node):
632         """
633         Add node mal functioning node to blacklist
634         """
635         self.warning(" Blacklisting malfunctioning node ")
636         self.plapi.blacklist_host(node)
637         if not self._hostname:
638             self.set('hostname', None)
639
640     def _put_node_in_provision(self, node):
641         """
642         Add node to the list of nodes being provisioned, in order for other RMs
643         to not try to provision the same one again
644         """
645         self.plapi.reserve_host(node)
646
647     def _get_ip(self, node_id):
648         """
649         Query PLCAPI for the IP of a node with certain node id
650         """
651         hostname = self.get("hostname") or \
652             self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname']
653         try:
654             ip = sshfuncs.gethostbyname(hostname)
655         except:
656             # Fail while trying to find the IP
657             return None
658         return ip
659
660     def fail_discovery(self):
661         msg = "Discovery failed. No candidates found for node"
662         self.error(msg)
663         raise RuntimeError, msg
664
665     def fail_node_not_alive(self, hostname=None):
666         msg = "Node %s not alive" % hostname
667         raise RuntimeError, msg
668     
669     def fail_node_not_available(self, hostname):
670         msg = "Node %s not available for provisioning" % hostname
671         raise RuntimeError, msg
672
673     def fail_not_enough_nodes(self):
674         msg = "Not enough nodes available for provisioning"
675         raise RuntimeError, msg
676
677     def fail_plapi(self):
678         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
679             " attributes pluser and plpassword."
680         raise RuntimeError, msg
681
682     def valid_connection(self, guid):
683         # TODO: Validate!
684         return True
685
686