fe7f058176d6fc8a7c843a5d903455e9ba602407
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
18 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState 
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import re
30 import os
31 import time
32 import socket
33 import threading
34 import datetime
35 import weakref
36
37 @clsinit_copy
38 class PlanetlabNode(LinuxNode):
39     _rtype = "planetlab::Node"
40     _help = "Controls a PlanetLab host accessible using a SSH key " \
41             "associated to a PlanetLab user account"
42     _platform = "planetlab"
43
44     lock = threading.Lock()
45
46     @classmethod
47     def _register_attributes(cls):
48         ip = Attribute("ip", "PlanetLab host public IP address",
49                     flags = Flags.Design)
50
51         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
52                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
53                     default = "www.planet-lab.eu",
54                     flags = Flags.Credential)
55
56         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
57                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
58                     default = "https://%(hostname)s:443/PLCAPI/",
59                     flags = Flags.Design)
60     
61         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
62                     authenticate in the website) ",
63                     flags = Flags.Credential)
64
65         pl_password = Attribute("plpassword", 
66                         "PlanetLab account password, as \
67                         the one to authenticate in the website) ",
68                         flags = Flags.Credential)
69
70         city = Attribute("city", "Constrain location (city) during resource \
71                 discovery. May use wildcards.",
72                 flags = Flags.Filter)
73
74         country = Attribute("country", "Constrain location (country) during \
75                     resource discovery. May use wildcards.",
76                     flags = Flags.Filter)
77
78         region = Attribute("region", "Constrain location (region) during \
79                     resource discovery. May use wildcards.",
80                     flags = Flags.Filter)
81
82         architecture = Attribute("architecture", "Constrain architecture \
83                         during resource discovery.",
84                         type = Types.Enumerate,
85                         allowed = ["x86_64", 
86                                     "i386"],
87                         flags = Flags.Filter)
88
89         operating_system = Attribute("operatingSystem", "Constrain operating \
90                             system during resource discovery.",
91                             type = Types.Enumerate,
92                             allowed =  ["f8",
93                                         "f12",
94                                         "f14",
95                                         "centos",
96                                         "other"],
97                             flags = Flags.Filter)
98
99         min_reliability = Attribute("minReliability", "Constrain reliability \
100                             while picking PlanetLab nodes. Specifies a lower \
101                             acceptable bound.",
102                             type = Types.Double,
103                             range = (1, 100),
104                             flags = Flags.Filter)
105
106         max_reliability = Attribute("maxReliability", "Constrain reliability \
107                             while picking PlanetLab nodes. Specifies an upper \
108                             acceptable bound.",
109                             type = Types.Double,
110                             range = (1, 100),
111                             flags = Flags.Filter)
112
113         min_bandwidth = Attribute("minBandwidth", "Constrain available \
114                             bandwidth while picking PlanetLab nodes. \
115                             Specifies a lower acceptable bound.",
116                             type = Types.Double,
117                             range = (0, 2**31),
118                             flags = Flags.Filter)
119
120         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
121                             bandwidth while picking PlanetLab nodes. \
122                             Specifies an upper acceptable bound.",
123                             type = Types.Double,
124                             range = (0, 2**31),
125                             flags = Flags.Filter)
126
127         min_load = Attribute("minLoad", "Constrain node load average while \
128                     picking PlanetLab nodes. Specifies a lower acceptable \
129                     bound.",
130                     type = Types.Double,
131                     range = (0, 2**31),
132                     flags = Flags.Filter)
133
134         max_load = Attribute("maxLoad", "Constrain node load average while \
135                     picking PlanetLab nodes. Specifies an upper acceptable \
136                     bound.",
137                     type = Types.Double,
138                     range = (0, 2**31),
139                     flags = Flags.Filter)
140
141         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
142                     picking PlanetLab nodes. Specifies a lower acceptable \
143                     bound.",
144                     type = Types.Double,
145                     range = (0, 100),
146                     flags = Flags.Filter)
147
148         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
149                     picking PlanetLab nodes. Specifies an upper acceptable \
150                     bound.",
151                     type = Types.Double,
152                     range = (0, 100),
153                     flags = Flags.Filter)
154
155         timeframe = Attribute("timeframe", "Past time period in which to check\
156                         information about the node. Values are year,month, \
157                         week, latest",
158                         default = "week",
159                         type = Types.Enumerate,
160                         allowed = ["latest",
161                                     "week",
162                                     "month",
163                                     "year"],
164                         flags = Flags.Filter)
165
166         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
167                         in the user's home directory under .nepi directory. This file \
168                         contains a list of PL nodes to blacklist, and at the end \
169                         of the experiment execution the new blacklisted nodes are added.",
170                     type = Types.Bool,
171                     default = False,
172                     flags = Flags.Global)
173
174         cls._register_attribute(ip)
175         cls._register_attribute(pl_url)
176         cls._register_attribute(pl_ptn)
177         cls._register_attribute(pl_user)
178         cls._register_attribute(pl_password)
179         cls._register_attribute(city)
180         cls._register_attribute(country)
181         cls._register_attribute(region)
182         cls._register_attribute(architecture)
183         cls._register_attribute(operating_system)
184         cls._register_attribute(min_reliability)
185         cls._register_attribute(max_reliability)
186         cls._register_attribute(min_bandwidth)
187         cls._register_attribute(max_bandwidth)
188         cls._register_attribute(min_load)
189         cls._register_attribute(max_load)
190         cls._register_attribute(min_cpu)
191         cls._register_attribute(max_cpu)
192         cls._register_attribute(timeframe)
193         cls._register_attribute(plblacklist)
194
195     def __init__(self, ec, guid):
196         super(PlanetlabNode, self).__init__(ec, guid)
197
198         self._ecobj = weakref.ref(ec)
199         self._plapi = None
200         self._node_to_provision = None
201         self._slicenode = False
202         self._hostname = False
203
204         if self.get("gateway") or self.get("gatewayUser"):
205             self.set("gateway", None)
206             self.set("gatewayUser", None)
207
208         # Blacklist file
209         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
210         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
211         if not os.path.exists(plblacklist_file):
212             if os.path.isdir(nepi_home):
213                 open(plblacklist_file, 'w').close()
214             else:
215                 os.makedirs(nepi_home)
216                 open(plblacklist_file, 'w').close()
217
218     def _skip_provision(self):
219         pl_user = self.get("pluser")
220         pl_pass = self.get("plpassword")
221         if not pl_user and not pl_pass:
222             return True
223         else: return False
224     
225     @property
226     def plapi(self):
227         if not self._plapi:
228             pl_user = self.get("pluser")
229             pl_pass = self.get("plpassword")
230             pl_url = self.get("plcApiUrl")
231             pl_ptn = self.get("plcApiPattern")
232             _plapi = PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
233                 pl_ptn, self._ecobj())
234             
235             if not _plapi:
236                 self.fail_plapi()
237         
238             self._plapi = weakref.ref(_plapi)
239
240         return self._plapi()
241
242     def do_discover(self):
243         """
244         Based on the attributes defined by the user, discover the suitable 
245         nodes for provision.
246         """
247         if self._skip_provision():
248             super(PlanetlabNode, self).do_discover()
249             return
250
251         hostname = self._get_hostname()
252         if hostname:
253             # the user specified one particular node to be provisioned
254             self._hostname = True
255             node_id = self._get_nodes_id({'hostname':hostname})
256             node_id = node_id.pop()['node_id']
257
258             # check that the node is not blacklisted or being provisioned
259             # by other RM
260             with PlanetlabNode.lock:
261                 plist = self.plapi.reserved()
262                 blist = self.plapi.blacklisted()
263                 if node_id not in blist and node_id not in plist:
264                 
265                     # check that is really alive, by performing ping
266                     ping_ok = self._do_ping(node_id)
267                     if not ping_ok:
268                         self._blacklist_node(node_id)
269                         self.fail_node_not_alive(hostname)
270                     else:
271                         if self._check_if_in_slice([node_id]):
272                             self._slicenode = True
273                         self._put_node_in_provision(node_id)
274                         self._node_to_provision = node_id
275                 else:
276                     self.fail_node_not_available(hostname)
277             super(PlanetlabNode, self).do_discover()
278         
279         else:
280             # the user specifies constraints based on attributes, zero, one or 
281             # more nodes can match these constraints 
282             nodes = self._filter_based_on_attributes()
283
284             # nodes that are already part of user's slice have the priority to
285             # provisioned
286             nodes_inslice = self._check_if_in_slice(nodes)
287             nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
288             
289             node_id = None
290             if nodes_inslice:
291                 node_id = self._choose_random_node(nodes_inslice)
292                 self._slicenode = True                
293                 
294             if not node_id:
295                 # Either there were no matching nodes in the user's slice, or
296                 # the nodes in the slice  were blacklisted or being provisioned
297                 # by other RM. Note nodes_not_inslice is never empty
298                 node_id = self._choose_random_node(nodes_not_inslice)
299                 self._slicenode = False
300
301             if node_id:
302                 self._node_to_provision = node_id
303                 try:
304                     self._set_hostname_attr(node_id)
305                     self.info(" Selected node to provision ")
306                     super(PlanetlabNode, self).do_discover()
307                 except:
308                     with PlanetlabNode.lock:
309                         self._blacklist_node(node_id)
310                     self.do_discover()
311             else:
312                self.fail_not_enough_nodes() 
313             
314     def do_provision(self):
315         """
316         Add node to user's slice after verifing that the node is functioning
317         correctly
318         """
319         if self._skip_provision():
320             super(PlanetlabNode, self).do_provision()
321             return
322
323         provision_ok = False
324         ssh_ok = False
325         proc_ok = False
326         timeout = 1800
327
328         while not provision_ok:
329             node = self._node_to_provision
330             if not self._slicenode:
331                 self._add_node_to_slice(node)
332                 if self._check_if_in_slice([node]):
333                     self.debug( "Node added to slice" )
334                 else:
335                     self.warning(" Could not add to slice ")
336                     with PlanetlabNode.lock:
337                         self._blacklist_node(node)
338                     self.do_discover()
339                     continue
340
341                 # check ssh connection
342                 t = 0 
343                 while t < timeout and not ssh_ok:
344
345                     cmd = 'echo \'GOOD NODE\''
346                     ((out, err), proc) = self.execute(cmd)
347                     if out.find("GOOD NODE") < 0:
348                         self.debug( "No SSH connection, waiting 60s" )
349                         t = t + 60
350                         time.sleep(60)
351                         continue
352                     else:
353                         self.debug( "SSH OK" )
354                         ssh_ok = True
355                         continue
356             else:
357                 cmd = 'echo \'GOOD NODE\''
358                 ((out, err), proc) = self.execute(cmd)
359                 if not out.find("GOOD NODE") < 0:
360                     ssh_ok = True
361
362             if not ssh_ok:
363                 # the timeout was reach without establishing ssh connection
364                 # the node is blacklisted, deleted from the slice, and a new
365                 # node to provision is discovered
366                 with PlanetlabNode.lock:
367                     self.warning(" Could not SSH login ")
368                     self._blacklist_node(node)
369                     #self._delete_node_from_slice(node)
370                 self.do_discover()
371                 continue
372             
373             # check /proc directory is mounted (ssh_ok = True)
374             # and file system is not read only
375             else:
376                 cmd = 'mount |grep proc'
377                 ((out1, err1), proc1) = self.execute(cmd)
378                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
379                 ((out2, err2), proc2) = self.execute(cmd)
380                 if out1.find("/proc type proc") < 0 or \
381                     "Read-only file system".lower() in err2.lower():
382                     with PlanetlabNode.lock:
383                         self.warning(" Corrupted file system ")
384                         self._blacklist_node(node)
385                         #self._delete_node_from_slice(node)
386                     self.do_discover()
387                     continue
388             
389                 else:
390                     provision_ok = True
391                     if not self.get('hostname'):
392                         self._set_hostname_attr(node)            
393                     # set IP attribute
394                     ip = self._get_ip(node)
395                     self.set("ip", ip)
396                     self.info(" Node provisioned ")            
397             
398         super(PlanetlabNode, self).do_provision()
399
400     def do_release(self):
401         super(PlanetlabNode, self).do_release()
402         if self.state == ResourceState.RELEASED and not self._skip_provision():
403             self.debug(" Releasing PLC API ")
404             self.plapi.release()
405
406     def _filter_based_on_attributes(self):
407         """
408         Retrive the list of nodes ids that match user's constraints 
409         """
410         # Map user's defined attributes with tagnames of PlanetLab
411         timeframe = self.get("timeframe")[0]
412         attr_to_tags = {
413             'city' : 'city',
414             'country' : 'country',
415             'region' : 'region',
416             'architecture' : 'arch',
417             'operatingSystem' : 'fcdistro',
418             'minReliability' : 'reliability%s' % timeframe,
419             'maxReliability' : 'reliability%s' % timeframe,
420             'minBandwidth' : 'bw%s' % timeframe,
421             'maxBandwidth' : 'bw%s' % timeframe,
422             'minLoad' : 'load%s' % timeframe,
423             'maxLoad' : 'load%s' % timeframe,
424             'minCpu' : 'cpu%s' % timeframe,
425             'maxCpu' : 'cpu%s' % timeframe,
426         }
427         
428         nodes_id = []
429         filters = {}
430
431         for attr_name, attr_obj in self._attrs.iteritems():
432             attr_value = self.get(attr_name)
433             
434             if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
435                 attr_name != 'timeframe':
436         
437                 attr_tag = attr_to_tags[attr_name]
438                 filters['tagname'] = attr_tag
439
440                 # filter nodes by fixed constraints e.g. operating system
441                 if not 'min' in attr_name and not 'max' in attr_name:
442                     filters['value'] = attr_value
443                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
444
445                 # filter nodes by range constraints e.g. max bandwidth
446                 elif ('min' or 'max') in attr_name:
447                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
448
449         if not filters:
450             nodes = self._get_nodes_id()
451             for node in nodes:
452                 nodes_id.append(node['node_id'])
453         return nodes_id
454                     
455     def _filter_by_fixed_attr(self, filters, nodes_id):
456         """
457         Query PLCAPI for nodes ids matching fixed attributes defined by the
458         user
459         """
460         node_tags = self.plapi.get_node_tags(filters)
461         if node_tags is not None:
462
463             if len(nodes_id) == 0:
464                 # first attribute being matched
465                 for node_tag in node_tags:
466                     nodes_id.append(node_tag['node_id'])
467             else:
468                 # remove the nodes ids that don't match the new attribute
469                 # that is being match
470
471                 nodes_id_tmp = []
472                 for node_tag in node_tags:
473                     if node_tag['node_id'] in nodes_id:
474                         nodes_id_tmp.append(node_tag['node_id'])
475
476                 if len(nodes_id_tmp):
477                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
478                 else:
479                     # no node from before match the new constraint
480                     self.fail_discovery()
481         else:
482             # no nodes match the filter applied
483             self.fail_discovery()
484
485         return nodes_id
486
487     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
488         """
489         Query PLCAPI for nodes ids matching attributes defined in a certain
490         range, by the user
491         """
492         node_tags = self.plapi.get_node_tags(filters)
493         if node_tags:
494             
495             if len(nodes_id) == 0:
496                 # first attribute being matched
497                 for node_tag in node_tags:
498  
499                    # check that matches the min or max restriction
500                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
501                         float(node_tag['value']) > attr_value:
502                         nodes_id.append(node_tag['node_id'])
503
504                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
505                         float(node_tag['value']) < attr_value:
506                         nodes_id.append(node_tag['node_id'])
507             else:
508
509                 # remove the nodes ids that don't match the new attribute
510                 # that is being match
511                 nodes_id_tmp = []
512                 for node_tag in node_tags:
513
514                     # check that matches the min or max restriction and was a
515                     # matching previous filters
516                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
517                         float(node_tag['value']) > attr_value and \
518                         node_tag['node_id'] in nodes_id:
519                         nodes_id_tmp.append(node_tag['node_id'])
520
521                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
522                         float(node_tag['value']) < attr_value and \
523                         node_tag['node_id'] in nodes_id:
524                         nodes_id_tmp.append(node_tag['node_id'])
525
526                 if len(nodes_id_tmp):
527                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
528                 else:
529                     # no node from before match the new constraint
530                     self.fail_discovery()
531
532         else: #TODO CHECK
533             # no nodes match the filter applied
534             self.fail_discovery()
535
536         return nodes_id
537         
538     def _choose_random_node(self, nodes):
539         """
540         From the possible nodes for provision, choose randomly to decrese the
541         probability of different RMs choosing the same node for provision
542         """
543         size = len(nodes)
544         while size:
545             size = size - 1
546             index = randint(0, size)
547             node_id = nodes[index]
548             nodes[index] = nodes[size]
549
550             # check the node is not blacklisted or being provision by other RM
551             # and perform ping to check that is really alive
552             with PlanetlabNode.lock:
553
554                 blist = self.plapi.blacklisted()
555                 plist = self.plapi.reserved()
556                 if node_id not in blist and node_id not in plist:
557                     ping_ok = self._do_ping(node_id)
558                     if not ping_ok:
559                         self._set_hostname_attr(node_id)
560                         self.warning(" Node not responding PING ")
561                         self._blacklist_node(node_id)
562                     else:
563                         # discovered node for provision, added to provision list
564                         self._put_node_in_provision(node_id)
565                         return node_id
566
567     def _get_nodes_id(self, filters=None):
568         return self.plapi.get_nodes(filters, fields=['node_id'])
569
570     def _add_node_to_slice(self, node_id):
571         self.info(" Adding node to slice ")
572         slicename = self.get("username")
573         with PlanetlabNode.lock:
574             slice_nodes = self.plapi.get_slice_nodes(slicename)
575             self.debug(" Previous slice nodes %s " % slice_nodes)
576             slice_nodes.append(node_id)
577             self.plapi.add_slice_nodes(slicename, slice_nodes)
578
579     def _delete_node_from_slice(self, node):
580         self.warning(" Deleting node from slice ")
581         slicename = self.get("username")
582         self.plapi.delete_slice_node(slicename, [node])
583
584     def _get_hostname(self):
585         hostname = self.get("hostname")
586         if hostname:
587             return hostname
588         ip = self.get("ip")
589         if ip:
590             hostname = socket.gethostbyaddr(ip)[0]
591             self.set('hostname', hostname)
592             return hostname
593         else:
594             return None
595
596     def _set_hostname_attr(self, node):
597         """
598         Query PLCAPI for the hostname of a certain node id and sets the
599         attribute hostname, it will over write the previous value
600         """
601         hostname = self.plapi.get_nodes(node, ['hostname'])
602         self.set("hostname", hostname[0]['hostname'])
603
604     def _check_if_in_slice(self, nodes_id):
605         """
606         Query PLCAPI to find out if any node id from nodes_id is in the user's
607         slice
608         """
609         slicename = self.get("username")
610         slice_nodes = self.plapi.get_slice_nodes(slicename)
611         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
612         return nodes_inslice
613
614     def _do_ping(self, node_id):
615         """
616         Perform ping command on node's IP matching node id
617         """
618         ping_ok = False
619         ip = self._get_ip(node_id)
620         if ip:
621             command = "ping -c4 %s" % ip
622             (out, err) = lexec(command)
623
624             m = re.search("(\d+)% packet loss", str(out))
625             if m and int(m.groups()[0]) < 50:
626                 ping_ok = True
627        
628         return ping_ok 
629
630     def _blacklist_node(self, node):
631         """
632         Add node mal functioning node to blacklist
633         """
634         self.warning(" Blacklisting malfunctioning node ")
635         self.plapi.blacklist_host(node)
636         if not self._hostname:
637             self.set('hostname', None)
638
639     def _put_node_in_provision(self, node):
640         """
641         Add node to the list of nodes being provisioned, in order for other RMs
642         to not try to provision the same one again
643         """
644         self.plapi.reserve_host(node)
645
646     def _get_ip(self, node_id):
647         """
648         Query PLCAPI for the IP of a node with certain node id
649         """
650         hostname = self.get("hostname") or \
651             self.plapi.get_nodes(node_id, ['hostname'])[0]['hostname']
652         try:
653             ip = sshfuncs.gethostbyname(hostname)
654         except:
655             # Fail while trying to find the IP
656             return None
657         return ip
658
659     def fail_discovery(self):
660         msg = "Discovery failed. No candidates found for node"
661         self.error(msg)
662         raise RuntimeError, msg
663
664     def fail_node_not_alive(self, hostname=None):
665         msg = "Node %s not alive" % hostname
666         raise RuntimeError, msg
667     
668     def fail_node_not_available(self, hostname):
669         msg = "Node %s not available for provisioning" % hostname
670         raise RuntimeError, msg
671
672     def fail_not_enough_nodes(self):
673         msg = "Not enough nodes available for provisioning"
674         raise RuntimeError, msg
675
676     def fail_plapi(self):
677         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
678             " attributes pluser and plpassword."
679         raise RuntimeError, msg
680
681     def valid_connection(self, guid):
682         # TODO: Validate!
683         return True
684
685