reinstate import of print_function in py3 so as to shorten the distance between both...
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
18
19 from __future__ import print_function
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sfaapi import SFAAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import re
31 import os
32 import weakref
33 import time
34 import socket
35 import threading
36 import datetime
37
38 @clsinit_copy
39 class PlanetlabSfaNode(LinuxNode):
40     _rtype = "planetlab::sfa::Node"
41     _help = "Controls a PlanetLab host accessible using a SSH key " \
42             "and provisioned using SFA"
43     _platform = "planetlab"
44
45     @classmethod
46     def _register_attributes(cls):
47
48         sfa_user = Attribute("sfauser", "SFA user",
49                     flags = Flags.Credential)
50
51         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
52                             used to generate the user credential")
53
54         city = Attribute("city", "Constrain location (city) during resource \
55                 discovery. May use wildcards.",
56                 flags = Flags.Filter)
57
58         country = Attribute("country", "Constrain location (country) during \
59                     resource discovery. May use wildcards.",
60                     flags = Flags.Filter)
61
62         region = Attribute("region", "Constrain location (region) during \
63                     resource discovery. May use wildcards.",
64                     flags = Flags.Filter)
65
66         architecture = Attribute("architecture", "Constrain architecture \
67                         during resource discovery.",
68                         type = Types.Enumerate,
69                         allowed = ["x86_64", 
70                                     "i386"],
71                         flags = Flags.Filter)
72
73         operating_system = Attribute("operatingSystem", "Constrain operating \
74                             system during resource discovery.",
75                             type = Types.Enumerate,
76                             allowed =  ["f8",
77                                         "f12",
78                                         "f14",
79                                         "centos",
80                                         "other"],
81                             flags = Flags.Filter)
82
83         min_reliability = Attribute("minReliability", "Constrain reliability \
84                             while picking PlanetLab nodes. Specifies a lower \
85                             acceptable bound.",
86                             type = Types.Double,
87                             range = (1, 100),
88                             flags = Flags.Filter)
89
90         max_reliability = Attribute("maxReliability", "Constrain reliability \
91                             while picking PlanetLab nodes. Specifies an upper \
92                             acceptable bound.",
93                             type = Types.Double,
94                             range = (1, 100),
95                             flags = Flags.Filter)
96
97         min_bandwidth = Attribute("minBandwidth", "Constrain available \
98                             bandwidth while picking PlanetLab nodes. \
99                             Specifies a lower acceptable bound.",
100                             type = Types.Double,
101                             range = (0, 2**31),
102                             flags = Flags.Filter)
103
104         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
105                             bandwidth while picking PlanetLab nodes. \
106                             Specifies an upper acceptable bound.",
107                             type = Types.Double,
108                             range = (0, 2**31),
109                             flags = Flags.Filter)
110
111         min_load = Attribute("minLoad", "Constrain node load average while \
112                     picking PlanetLab nodes. Specifies a lower acceptable \
113                     bound.",
114                     type = Types.Double,
115                     range = (0, 2**31),
116                     flags = Flags.Filter)
117
118         max_load = Attribute("maxLoad", "Constrain node load average while \
119                     picking PlanetLab nodes. Specifies an upper acceptable \
120                     bound.",
121                     type = Types.Double,
122                     range = (0, 2**31),
123                     flags = Flags.Filter)
124
125         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
126                     picking PlanetLab nodes. Specifies a lower acceptable \
127                     bound.",
128                     type = Types.Double,
129                     range = (0, 100),
130                     flags = Flags.Filter)
131
132         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
133                     picking PlanetLab nodes. Specifies an upper acceptable \
134                     bound.",
135                     type = Types.Double,
136                     range = (0, 100),
137                     flags = Flags.Filter)
138
139         timeframe = Attribute("timeframe", "Past time period in which to check\
140                         information about the node. Values are year,month, \
141                         week, latest",
142                         default = "week",
143                         type = Types.Enumerate,
144                         allowed = ["latest",
145                                     "week",
146                                     "month",
147                                     "year"],
148                         flags = Flags.Filter)
149
150         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
151                         in the user's home directory under .nepi directory. This file \
152                         contains a list of PL nodes to blacklist, and at the end \
153                         of the experiment execution the new blacklisted nodes are added.",
154                     type = Types.Bool,
155                     default = False,
156                     flags = Flags.Global)
157
158         cls._register_attribute(sfa_user)
159         cls._register_attribute(sfa_private_key)
160         cls._register_attribute(city)
161         cls._register_attribute(country)
162         cls._register_attribute(region)
163         cls._register_attribute(architecture)
164         cls._register_attribute(operating_system)
165         cls._register_attribute(min_reliability)
166         cls._register_attribute(max_reliability)
167         cls._register_attribute(min_bandwidth)
168         cls._register_attribute(max_bandwidth)
169         cls._register_attribute(min_load)
170         cls._register_attribute(max_load)
171         cls._register_attribute(min_cpu)
172         cls._register_attribute(max_cpu)
173         cls._register_attribute(timeframe)
174         cls._register_attribute(plblacklist)
175
176     def __init__(self, ec, guid):
177         super(PlanetlabSfaNode, self).__init__(ec, guid)
178         
179         self._ecobj = weakref.ref(ec)
180         self._sfaapi = None
181         self._node_to_provision = None
182         self._slicenode = False
183         self._hostname = False
184
185         if self.get("gateway") or self.get("gatewayUser"):
186             self.set("gateway", None)
187             self.set("gatewayUser", None)
188
189         # Blacklist file for PL nodes
190         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
191         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
192         if not os.path.exists(plblacklist_file):
193             if os.path.isdir(nepi_home):
194                 with open(plblacklist_file, 'w') as clear:
195                     pass
196             else:
197                 os.makedirs(nepi_home)
198                 with open(plblacklist_file, 'w') as clear:
199                     pass
200
201     def _skip_provision(self):
202         sfa_user = self.get("sfauser")
203         if not sfa_user:
204             return True
205         else: return False
206     
207     @property
208     def sfaapi(self):
209         """
210         Property to instanciate the SFA API based in sfi client.
211         For each SFA method called this instance is used.
212         """
213         if not self._sfaapi:
214             sfa_user = self.get("sfauser")
215             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
216             sfa_auth = '.'.join(sfa_user.split('.')[:2])
217             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
218             sfa_private_key = self.get("sfaPrivateKey")
219
220             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
221                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
222
223             if not _sfaapi:
224                 self.fail_sfaapi()
225     
226             self._sfaapi = weakref.ref(_sfaapi)
227
228         return self._sfaapi()
229
230     def do_discover(self):
231         """
232         Based on the attributes defined by the user, discover the suitable 
233         nodes for provision.
234         """
235         if self._skip_provision():
236             super(PlanetlabSfaNode, self).do_discover()
237             return
238
239         nodes = self.sfaapi.get_resources_hrn()
240
241         hostname = self._get_hostname()
242         if hostname:
243             # the user specified one particular node to be provisioned
244             self._hostname = True
245             host_hrn = nodes[hostname]
246
247             # check that the node is not blacklisted or being provisioned
248             # by other RM
249             if not self._blacklisted(host_hrn) and not self._reserved(host_hrn):
250                 # Node in reservation
251                 ping_ok = self._do_ping(hostname)
252                 if not ping_ok:
253                     self._blacklist_node(host_hrn)
254                     self.fail_node_not_alive(hostname)
255                 else:
256                     if self._check_if_in_slice([host_hrn]):
257                         self.debug("The node %s is already in the slice" % hostname)
258                         self._slicenode = True
259                     self._node_to_provision = host_hrn
260             else:
261                 self.fail_node_not_available(hostname)
262             super(PlanetlabSfaNode, self).do_discover()
263
264         else:
265             hosts_hrn = nodes.values()
266             nodes_inslice = self._check_if_in_slice(hosts_hrn)
267             nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice))
268             host_hrn = None
269             if nodes_inslice:
270                 host_hrn = self._choose_random_node(nodes, nodes_inslice)
271                 self._slicenode = True          
272
273             if not host_hrn:
274                 # Either there were no matching nodes in the user's slice, or
275                 # the nodes in the slice  were blacklisted or being provisioned
276                 # by other RM. Note nodes_not_inslice is never empty
277                 host_hrn = self._choose_random_node(nodes, nodes_not_inslice)
278                 self._slicenode = False
279
280             if host_hrn:
281                 self._node_to_provision = host_hrn
282                 try:
283                     self._set_hostname_attr(host_hrn)
284                     self.info(" Selected node to provision ")
285                     super(PlanetlabSfaNode, self).do_discover()
286                 except:
287                     self._blacklist_node(host_hrn)
288                     self.do_discover()
289             else:
290                self.fail_not_enough_nodes() 
291     
292     def _blacklisted(self, host_hrn):
293         """
294         Check in the SFA API that the node is not in the blacklist.
295         """
296         if self.sfaapi.blacklisted(host_hrn):
297            return True
298         return False
299
300     def _reserved(self, host_hrn):
301         """
302         Check in the SFA API that the node is not in the reserved
303         list.
304         """
305         if self.sfaapi.reserved(host_hrn):
306             return True
307         return False
308             
309     def do_provision(self):
310         """
311         Add node to user's slice and verifing that the node is functioning
312         correctly. Check ssh, file system.
313         """
314         if self._skip_provision():
315             super(PlanetlabSfaNode, self).do_provision()
316             return
317
318         provision_ok = False
319         ssh_ok = False
320         proc_ok = False
321         timeout = 1800
322
323         while not provision_ok:
324             node = self._node_to_provision
325             if not self._slicenode:
326                 self._add_node_to_slice(node)
327             
328                 # check ssh connection
329                 t = 0 
330                 while t < timeout and not ssh_ok:
331
332                     cmd = 'echo \'GOOD NODE\''
333                     ((out, err), proc) = self.execute(cmd)
334                     if out.find("GOOD NODE") < 0:
335                         self.debug( "No SSH connection, waiting 60s" )
336                         t = t + 60
337                         time.sleep(60)
338                         continue
339                     else:
340                         self.debug( "SSH OK" )
341                         ssh_ok = True
342                         continue
343             else:
344                 cmd = 'echo \'GOOD NODE\''
345                 ((out, err), proc) = self.execute(cmd)
346                 if not out.find("GOOD NODE") < 0:
347                     ssh_ok = True
348
349             if not ssh_ok:
350                 # the timeout was reach without establishing ssh connection
351                 # the node is blacklisted, deleted from the slice, and a new
352                 # node to provision is discovered
353                 self.warning(" Could not SSH login ")
354                 self._blacklist_node(node)
355                 self.do_discover()
356                 continue
357             
358             # check /proc directory is mounted (ssh_ok = True)
359             # and file system is not read only
360             else:
361                 cmd = 'mount |grep proc'
362                 ((out1, err1), proc1) = self.execute(cmd)
363                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
364                 ((out2, err2), proc2) = self.execute(cmd)
365                 if out1.find("/proc type proc") < 0 or \
366                     "Read-only file system".lower() in err2.lower():
367                     self.warning(" Corrupted file system ")
368                     self._blacklist_node(node)
369                     self.do_discover()
370                     continue
371             
372                 else:
373                     provision_ok = True
374                     if not self.get('hostname'):
375                         self._set_hostname_attr(node)            
376                     self.info(" Node provisioned ")            
377             
378         super(PlanetlabSfaNode, self).do_provision()
379
380     def do_release(self):
381         super(PlanetlabSfaNode, self).do_release()
382         if self.state == ResourceState.RELEASED and not self._skip_provision():
383             self.debug(" Releasing SFA API ")
384             self.sfaapi.release()
385
386 #    def _filter_based_on_attributes(self):
387 #        """
388 #        Retrive the list of nodes hrn that match user's constraints 
389 #        """
390 #        # Map user's defined attributes with tagnames of PlanetLab
391 #        timeframe = self.get("timeframe")[0]
392 #        attr_to_tags = {
393 #            'city' : 'city',
394 #            'country' : 'country',
395 #            'region' : 'region',
396 #            'architecture' : 'arch',
397 #            'operatingSystem' : 'fcdistro',
398 #            'minReliability' : 'reliability%s' % timeframe,
399 #            'maxReliability' : 'reliability%s' % timeframe,
400 #            'minBandwidth' : 'bw%s' % timeframe,
401 #            'maxBandwidth' : 'bw%s' % timeframe,
402 #            'minLoad' : 'load%s' % timeframe,
403 #            'maxLoad' : 'load%s' % timeframe,
404 #            'minCpu' : 'cpu%s' % timeframe,
405 #            'maxCpu' : 'cpu%s' % timeframe,
406 #        }
407 #        
408 #        nodes_hrn = []
409 #        filters = {}
410 #
411 #        for attr_name, attr_obj in self._attrs.iteritems():
412 #            attr_value = self.get(attr_name)
413 #            
414 #            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
415 #                attr_name != 'timeframe':
416 #        
417 #                attr_tag = attr_to_tags[attr_name]
418 #                filters['tagname'] = attr_tag
419 #
420 #                # filter nodes by fixed constraints e.g. operating system
421 #                if not 'min' in attr_name and not 'max' in attr_name:
422 #                    filters['value'] = attr_value
423 #                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
424 #
425 #                # filter nodes by range constraints e.g. max bandwidth
426 #                elif ('min' or 'max') in attr_name:
427 #                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
428 #
429 #        if not filters:
430 #            nodes = self.sfaapi.get_resources_hrn()
431 #            for node in nodes:
432 #                nodes_hrn.append(node[node.key()])
433 #        return nodes_hrn
434 #                    
435 #    def _filter_by_fixed_attr(self, filters, nodes_hrn):
436 #        """
437 #        Query SFA API for nodes matching fixed attributes defined by the
438 #        user
439 #        """
440 #        pass
441 ##        node_tags = self.sfaapi.get_resources_tags(filters)
442 ##        if node_tags is not None:
443 ##
444 ##            if len(nodes_id) == 0:
445 ##                # first attribute being matched
446 ##                for node_tag in node_tags:
447 ##                    nodes_id.append(node_tag['node_id'])
448 ##            else:
449 ##                # remove the nodes ids that don't match the new attribute
450 ##                # that is being match
451 ##
452 ##                nodes_id_tmp = []
453 ##                for node_tag in node_tags:
454 ##                    if node_tag['node_id'] in nodes_id:
455 ##                        nodes_id_tmp.append(node_tag['node_id'])
456 ##
457 ##                if len(nodes_id_tmp):
458 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
459 ##                else:
460 ##                    # no node from before match the new constraint
461 ##                    self.fail_discovery()
462 ##        else:
463 ##            # no nodes match the filter applied
464 ##            self.fail_discovery()
465 ##
466 ##        return nodes_id
467 #
468 #    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
469 #        """
470 #        Query PLCAPI for nodes ids matching attributes defined in a certain
471 #        range, by the user
472 #        """
473 #        pass
474 ##        node_tags = self.plapi.get_node_tags(filters)
475 ##        if node_tags:
476 ##            
477 ##            if len(nodes_id) == 0:
478 ##                # first attribute being matched
479 ##                for node_tag in node_tags:
480 ## 
481 ##                   # check that matches the min or max restriction
482 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
483 ##                        float(node_tag['value']) > attr_value:
484 ##                        nodes_id.append(node_tag['node_id'])
485 ##
486 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
487 ##                        float(node_tag['value']) < attr_value:
488 ##                        nodes_id.append(node_tag['node_id'])
489 ##            else:
490 ##
491 ##                # remove the nodes ids that don't match the new attribute
492 ##                # that is being match
493 ##                nodes_id_tmp = []
494 ##                for node_tag in node_tags:
495 ##
496 ##                    # check that matches the min or max restriction and was a
497 ##                    # matching previous filters
498 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
499 ##                        float(node_tag['value']) > attr_value and \
500 ##                        node_tag['node_id'] in nodes_id:
501 ##                        nodes_id_tmp.append(node_tag['node_id'])
502 ##
503 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
504 ##                        float(node_tag['value']) < attr_value and \
505 ##                        node_tag['node_id'] in nodes_id:
506 ##                        nodes_id_tmp.append(node_tag['node_id'])
507 ##
508 ##                if len(nodes_id_tmp):
509 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
510 ##                else:
511 ##                    # no node from before match the new constraint
512 ##                    self.fail_discovery()
513 ##
514 ##        else: #TODO CHECK
515 ##            # no nodes match the filter applied
516 ##            self.fail_discovery()
517 ##
518 ##        return nodes_id
519         
520     def _choose_random_node(self, nodes, hosts_hrn):
521         """
522         From the possible nodes for provision, choose randomly to decrese the
523         probability of different RMs choosing the same node for provision
524         """
525         size = len(hosts_hrn)
526         while size:
527             size = size - 1
528             index = randint(0, size)
529             host_hrn = hosts_hrn[index]
530             hosts_hrn[index] = hosts_hrn[size]
531
532             # check the node is not blacklisted or being provision by other RM
533             # and perform ping to check that is really alive
534             if not self._blacklisted(host_hrn):
535                 if not self._reserved(host_hrn):
536                     print(self.sfaapi._reserved ,self.guid)
537                     for hostname, hrn in nodes.items():
538                         if host_hrn == hrn:
539                             print('hostname' ,hostname)
540                             ping_ok = self._do_ping(hostname)
541                 
542                     if not ping_ok:
543                         self._set_hostname_attr(hostname)
544                         self.warning(" Node not responding PING ")
545                         self._blacklist_node(host_hrn)
546                     else:
547                         # discovered node for provision, added to provision list
548                         self._node_to_provision = host_hrn
549                         return host_hrn
550
551 #    def _get_nodes_id(self, filters=None):
552 #        return self.plapi.get_nodes(filters, fields=['node_id'])
553 #
554     def _add_node_to_slice(self, host_hrn):
555         """
556         Add node to slice, using SFA API.
557         """
558         self.info(" Adding node to slice ")
559         slicename = self.get("username").replace('_', '.')
560         slicename = 'ple.' + slicename
561         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
562
563     def _delete_from_slice(self):
564         """
565         Delete every node from slice, using SFA API.
566         Sfi client doesn't work for particular node urns.
567         """
568         self.warning(" Deleting node from slice ")
569         slicename = self.get("username").replace('_', '.')
570         slicename = 'ple.' + slicename
571         self.sfaapi.remove_all_from_slice(slicename)
572
573     def _get_hostname(self):
574         """
575         Get the attribute hostname.
576         """
577         hostname = self.get("hostname")
578         if hostname:
579             return hostname
580         else:
581             return None
582
583     def _set_hostname_attr(self, node):
584         """
585         Query SFAAPI for the hostname of a certain host hrn and sets the
586         attribute hostname, it will over write the previous value.
587         """
588         hosts_hrn = self.sfaapi.get_resources_hrn()
589         for hostname, hrn  in hosts_hrn.items():
590             if hrn == node:
591                 self.set("hostname", hostname)
592
593     def _check_if_in_slice(self, hosts_hrn):
594         """
595         Check using SFA API if any host hrn from hosts_hrn is in the user's
596         slice.
597         """
598         slicename = self.get("username").replace('_', '.')
599         slicename = 'ple.' + slicename
600         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
601         if slice_nodes:
602             slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
603         else: slice_nodes_hrn = []
604         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
605         return nodes_inslice
606
607     def _do_ping(self, hostname):
608         """
609         Perform ping command on node's IP matching hostname.
610         """
611         ping_ok = False
612         ip = self._get_ip(hostname)
613         if ip:
614             command = "ping -c4 %s" % ip
615             (out, err) = lexec(command)
616
617             m = re.search("(\d+)% packet loss", str(out))
618             if m and int(m.groups()[0]) < 50:
619                 ping_ok = True
620
621         return ping_ok
622
623     def _blacklist_node(self, host_hrn):
624         """
625         Add mal functioning node to blacklist (in SFA API).
626         """
627         self.warning(" Blacklisting malfunctioning node ")
628         self.sfaapi.blacklist_resource(host_hrn)
629         if not self._hostname:
630             self.set('hostname', None)
631
632     def _reserve(self, host_hrn):
633         """
634         Add node to the list of nodes being provisioned, in order for other RMs
635         to not try to provision the same one again.
636         """
637         self.sfaapi.reserve_resource(host_hrn)
638
639     def _get_ip(self, hostname):
640         """
641         Query cache for the IP of a node with certain hostname
642         """
643         try:
644             ip = sshfuncs.gethostbyname(hostname)
645         except:
646             # Fail while trying to find the IP
647             return None
648         return ip
649
650     def fail_discovery(self):
651         msg = "Discovery failed. No candidates found for node"
652         self.error(msg)
653         raise RuntimeError(msg)
654
655     def fail_node_not_alive(self, hostname=None):
656         msg = "Node %s not alive" % hostname
657         raise RuntimeError(msg)
658     
659     def fail_node_not_available(self, hostname):
660         msg = "Node %s not available for provisioning" % hostname
661         raise RuntimeError(msg)
662
663     def fail_not_enough_nodes(self):
664         msg = "Not enough nodes available for provisioning"
665         raise RuntimeError(msg)
666
667     def fail_sfaapi(self):
668         msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
669             " attributes sfauser and sfaPrivateKey."
670         raise RuntimeError(msg)
671
672     def valid_connection(self, guid):
673         # TODO: Validate!
674         return True
675
676