use print() - import print_function - should be fine for both py2 and py3
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
18
19 from __future__ import print_function
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.util.sfaapi import SFAAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import re
31 import os
32 import weakref
33 import time
34 import socket
35 import threading
36 import datetime
37
38 @clsinit_copy
39 class PlanetlabSfaNode(LinuxNode):
40     _rtype = "planetlab::sfa::Node"
41     _help = "Controls a PlanetLab host accessible using a SSH key " \
42             "and provisioned using SFA"
43     _platform = "planetlab"
44
45     @classmethod
46     def _register_attributes(cls):
47
48         sfa_user = Attribute("sfauser", "SFA user",
49                     flags = Flags.Credential)
50
51         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
52                             used to generate the user credential")
53
54         city = Attribute("city", "Constrain location (city) during resource \
55                 discovery. May use wildcards.",
56                 flags = Flags.Filter)
57
58         country = Attribute("country", "Constrain location (country) during \
59                     resource discovery. May use wildcards.",
60                     flags = Flags.Filter)
61
62         region = Attribute("region", "Constrain location (region) during \
63                     resource discovery. May use wildcards.",
64                     flags = Flags.Filter)
65
66         architecture = Attribute("architecture", "Constrain architecture \
67                         during resource discovery.",
68                         type = Types.Enumerate,
69                         allowed = ["x86_64", 
70                                     "i386"],
71                         flags = Flags.Filter)
72
73         operating_system = Attribute("operatingSystem", "Constrain operating \
74                             system during resource discovery.",
75                             type = Types.Enumerate,
76                             allowed =  ["f8",
77                                         "f12",
78                                         "f14",
79                                         "centos",
80                                         "other"],
81                             flags = Flags.Filter)
82
83         min_reliability = Attribute("minReliability", "Constrain reliability \
84                             while picking PlanetLab nodes. Specifies a lower \
85                             acceptable bound.",
86                             type = Types.Double,
87                             range = (1, 100),
88                             flags = Flags.Filter)
89
90         max_reliability = Attribute("maxReliability", "Constrain reliability \
91                             while picking PlanetLab nodes. Specifies an upper \
92                             acceptable bound.",
93                             type = Types.Double,
94                             range = (1, 100),
95                             flags = Flags.Filter)
96
97         min_bandwidth = Attribute("minBandwidth", "Constrain available \
98                             bandwidth while picking PlanetLab nodes. \
99                             Specifies a lower acceptable bound.",
100                             type = Types.Double,
101                             range = (0, 2**31),
102                             flags = Flags.Filter)
103
104         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
105                             bandwidth while picking PlanetLab nodes. \
106                             Specifies an upper acceptable bound.",
107                             type = Types.Double,
108                             range = (0, 2**31),
109                             flags = Flags.Filter)
110
111         min_load = Attribute("minLoad", "Constrain node load average while \
112                     picking PlanetLab nodes. Specifies a lower acceptable \
113                     bound.",
114                     type = Types.Double,
115                     range = (0, 2**31),
116                     flags = Flags.Filter)
117
118         max_load = Attribute("maxLoad", "Constrain node load average while \
119                     picking PlanetLab nodes. Specifies an upper acceptable \
120                     bound.",
121                     type = Types.Double,
122                     range = (0, 2**31),
123                     flags = Flags.Filter)
124
125         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
126                     picking PlanetLab nodes. Specifies a lower acceptable \
127                     bound.",
128                     type = Types.Double,
129                     range = (0, 100),
130                     flags = Flags.Filter)
131
132         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
133                     picking PlanetLab nodes. Specifies an upper acceptable \
134                     bound.",
135                     type = Types.Double,
136                     range = (0, 100),
137                     flags = Flags.Filter)
138
139         timeframe = Attribute("timeframe", "Past time period in which to check\
140                         information about the node. Values are year,month, \
141                         week, latest",
142                         default = "week",
143                         type = Types.Enumerate,
144                         allowed = ["latest",
145                                     "week",
146                                     "month",
147                                     "year"],
148                         flags = Flags.Filter)
149
150         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
151                         in the user's home directory under .nepi directory. This file \
152                         contains a list of PL nodes to blacklist, and at the end \
153                         of the experiment execution the new blacklisted nodes are added.",
154                     type = Types.Bool,
155                     default = False,
156                     flags = Flags.Global)
157
158         cls._register_attribute(sfa_user)
159         cls._register_attribute(sfa_private_key)
160         cls._register_attribute(city)
161         cls._register_attribute(country)
162         cls._register_attribute(region)
163         cls._register_attribute(architecture)
164         cls._register_attribute(operating_system)
165         cls._register_attribute(min_reliability)
166         cls._register_attribute(max_reliability)
167         cls._register_attribute(min_bandwidth)
168         cls._register_attribute(max_bandwidth)
169         cls._register_attribute(min_load)
170         cls._register_attribute(max_load)
171         cls._register_attribute(min_cpu)
172         cls._register_attribute(max_cpu)
173         cls._register_attribute(timeframe)
174         cls._register_attribute(plblacklist)
175
176     def __init__(self, ec, guid):
177         super(PlanetlabSfaNode, self).__init__(ec, guid)
178         
179         self._ecobj = weakref.ref(ec)
180         self._sfaapi = None
181         self._node_to_provision = None
182         self._slicenode = False
183         self._hostname = False
184
185         if self.get("gateway") or self.get("gatewayUser"):
186             self.set("gateway", None)
187             self.set("gatewayUser", None)
188
189         # Blacklist file for PL nodes
190         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
191         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
192         if not os.path.exists(plblacklist_file):
193             if os.path.isdir(nepi_home):
194                 open(plblacklist_file, 'w').close()
195             else:
196                 os.makedirs(nepi_home)
197                 open(plblacklist_file, 'w').close()
198
199     def _skip_provision(self):
200         sfa_user = self.get("sfauser")
201         if not sfa_user:
202             return True
203         else: return False
204     
205     @property
206     def sfaapi(self):
207         """
208         Property to instanciate the SFA API based in sfi client.
209         For each SFA method called this instance is used.
210         """
211         if not self._sfaapi:
212             sfa_user = self.get("sfauser")
213             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
214             sfa_auth = '.'.join(sfa_user.split('.')[:2])
215             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
216             sfa_private_key = self.get("sfaPrivateKey")
217
218             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
219                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
220
221             if not _sfaapi:
222                 self.fail_sfaapi()
223     
224             self._sfaapi = weakref.ref(_sfaapi)
225
226         return self._sfaapi()
227
228     def do_discover(self):
229         """
230         Based on the attributes defined by the user, discover the suitable 
231         nodes for provision.
232         """
233         if self._skip_provision():
234             super(PlanetlabSfaNode, self).do_discover()
235             return
236
237         nodes = self.sfaapi.get_resources_hrn()
238
239         hostname = self._get_hostname()
240         if hostname:
241             # the user specified one particular node to be provisioned
242             self._hostname = True
243             host_hrn = nodes[hostname]
244
245             # check that the node is not blacklisted or being provisioned
246             # by other RM
247             if not self._blacklisted(host_hrn) and not self._reserved(host_hrn):
248                 # Node in reservation
249                 ping_ok = self._do_ping(hostname)
250                 if not ping_ok:
251                     self._blacklist_node(host_hrn)
252                     self.fail_node_not_alive(hostname)
253                 else:
254                     if self._check_if_in_slice([host_hrn]):
255                         self.debug("The node %s is already in the slice" % hostname)
256                         self._slicenode = True
257                     self._node_to_provision = host_hrn
258             else:
259                 self.fail_node_not_available(hostname)
260             super(PlanetlabSfaNode, self).do_discover()
261
262         else:
263             hosts_hrn = nodes.values()
264             nodes_inslice = self._check_if_in_slice(hosts_hrn)
265             nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice))
266             host_hrn = None
267             if nodes_inslice:
268                 host_hrn = self._choose_random_node(nodes, nodes_inslice)
269                 self._slicenode = True          
270
271             if not host_hrn:
272                 # Either there were no matching nodes in the user's slice, or
273                 # the nodes in the slice  were blacklisted or being provisioned
274                 # by other RM. Note nodes_not_inslice is never empty
275                 host_hrn = self._choose_random_node(nodes, nodes_not_inslice)
276                 self._slicenode = False
277
278             if host_hrn:
279                 self._node_to_provision = host_hrn
280                 try:
281                     self._set_hostname_attr(host_hrn)
282                     self.info(" Selected node to provision ")
283                     super(PlanetlabSfaNode, self).do_discover()
284                 except:
285                     self._blacklist_node(host_hrn)
286                     self.do_discover()
287             else:
288                self.fail_not_enough_nodes() 
289     
290     def _blacklisted(self, host_hrn):
291         """
292         Check in the SFA API that the node is not in the blacklist.
293         """
294         if self.sfaapi.blacklisted(host_hrn):
295            return True
296         return False
297
298     def _reserved(self, host_hrn):
299         """
300         Check in the SFA API that the node is not in the reserved
301         list.
302         """
303         if self.sfaapi.reserved(host_hrn):
304             return True
305         return False
306             
307     def do_provision(self):
308         """
309         Add node to user's slice and verifing that the node is functioning
310         correctly. Check ssh, file system.
311         """
312         if self._skip_provision():
313             super(PlanetlabSfaNode, self).do_provision()
314             return
315
316         provision_ok = False
317         ssh_ok = False
318         proc_ok = False
319         timeout = 1800
320
321         while not provision_ok:
322             node = self._node_to_provision
323             if not self._slicenode:
324                 self._add_node_to_slice(node)
325             
326                 # check ssh connection
327                 t = 0 
328                 while t < timeout and not ssh_ok:
329
330                     cmd = 'echo \'GOOD NODE\''
331                     ((out, err), proc) = self.execute(cmd)
332                     if out.find("GOOD NODE") < 0:
333                         self.debug( "No SSH connection, waiting 60s" )
334                         t = t + 60
335                         time.sleep(60)
336                         continue
337                     else:
338                         self.debug( "SSH OK" )
339                         ssh_ok = True
340                         continue
341             else:
342                 cmd = 'echo \'GOOD NODE\''
343                 ((out, err), proc) = self.execute(cmd)
344                 if not out.find("GOOD NODE") < 0:
345                     ssh_ok = True
346
347             if not ssh_ok:
348                 # the timeout was reach without establishing ssh connection
349                 # the node is blacklisted, deleted from the slice, and a new
350                 # node to provision is discovered
351                 self.warning(" Could not SSH login ")
352                 self._blacklist_node(node)
353                 self.do_discover()
354                 continue
355             
356             # check /proc directory is mounted (ssh_ok = True)
357             # and file system is not read only
358             else:
359                 cmd = 'mount |grep proc'
360                 ((out1, err1), proc1) = self.execute(cmd)
361                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
362                 ((out2, err2), proc2) = self.execute(cmd)
363                 if out1.find("/proc type proc") < 0 or \
364                     "Read-only file system".lower() in err2.lower():
365                     self.warning(" Corrupted file system ")
366                     self._blacklist_node(node)
367                     self.do_discover()
368                     continue
369             
370                 else:
371                     provision_ok = True
372                     if not self.get('hostname'):
373                         self._set_hostname_attr(node)            
374                     self.info(" Node provisioned ")            
375             
376         super(PlanetlabSfaNode, self).do_provision()
377
378     def do_release(self):
379         super(PlanetlabSfaNode, self).do_release()
380         if self.state == ResourceState.RELEASED and not self._skip_provision():
381             self.debug(" Releasing SFA API ")
382             self.sfaapi.release()
383
384 #    def _filter_based_on_attributes(self):
385 #        """
386 #        Retrive the list of nodes hrn that match user's constraints 
387 #        """
388 #        # Map user's defined attributes with tagnames of PlanetLab
389 #        timeframe = self.get("timeframe")[0]
390 #        attr_to_tags = {
391 #            'city' : 'city',
392 #            'country' : 'country',
393 #            'region' : 'region',
394 #            'architecture' : 'arch',
395 #            'operatingSystem' : 'fcdistro',
396 #            'minReliability' : 'reliability%s' % timeframe,
397 #            'maxReliability' : 'reliability%s' % timeframe,
398 #            'minBandwidth' : 'bw%s' % timeframe,
399 #            'maxBandwidth' : 'bw%s' % timeframe,
400 #            'minLoad' : 'load%s' % timeframe,
401 #            'maxLoad' : 'load%s' % timeframe,
402 #            'minCpu' : 'cpu%s' % timeframe,
403 #            'maxCpu' : 'cpu%s' % timeframe,
404 #        }
405 #        
406 #        nodes_hrn = []
407 #        filters = {}
408 #
409 #        for attr_name, attr_obj in self._attrs.iteritems():
410 #            attr_value = self.get(attr_name)
411 #            
412 #            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
413 #                attr_name != 'timeframe':
414 #        
415 #                attr_tag = attr_to_tags[attr_name]
416 #                filters['tagname'] = attr_tag
417 #
418 #                # filter nodes by fixed constraints e.g. operating system
419 #                if not 'min' in attr_name and not 'max' in attr_name:
420 #                    filters['value'] = attr_value
421 #                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
422 #
423 #                # filter nodes by range constraints e.g. max bandwidth
424 #                elif ('min' or 'max') in attr_name:
425 #                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
426 #
427 #        if not filters:
428 #            nodes = self.sfaapi.get_resources_hrn()
429 #            for node in nodes:
430 #                nodes_hrn.append(node[node.key()])
431 #        return nodes_hrn
432 #                    
433 #    def _filter_by_fixed_attr(self, filters, nodes_hrn):
434 #        """
435 #        Query SFA API for nodes matching fixed attributes defined by the
436 #        user
437 #        """
438 #        pass
439 ##        node_tags = self.sfaapi.get_resources_tags(filters)
440 ##        if node_tags is not None:
441 ##
442 ##            if len(nodes_id) == 0:
443 ##                # first attribute being matched
444 ##                for node_tag in node_tags:
445 ##                    nodes_id.append(node_tag['node_id'])
446 ##            else:
447 ##                # remove the nodes ids that don't match the new attribute
448 ##                # that is being match
449 ##
450 ##                nodes_id_tmp = []
451 ##                for node_tag in node_tags:
452 ##                    if node_tag['node_id'] in nodes_id:
453 ##                        nodes_id_tmp.append(node_tag['node_id'])
454 ##
455 ##                if len(nodes_id_tmp):
456 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
457 ##                else:
458 ##                    # no node from before match the new constraint
459 ##                    self.fail_discovery()
460 ##        else:
461 ##            # no nodes match the filter applied
462 ##            self.fail_discovery()
463 ##
464 ##        return nodes_id
465 #
466 #    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
467 #        """
468 #        Query PLCAPI for nodes ids matching attributes defined in a certain
469 #        range, by the user
470 #        """
471 #        pass
472 ##        node_tags = self.plapi.get_node_tags(filters)
473 ##        if node_tags:
474 ##            
475 ##            if len(nodes_id) == 0:
476 ##                # first attribute being matched
477 ##                for node_tag in node_tags:
478 ## 
479 ##                   # check that matches the min or max restriction
480 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
481 ##                        float(node_tag['value']) > attr_value:
482 ##                        nodes_id.append(node_tag['node_id'])
483 ##
484 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
485 ##                        float(node_tag['value']) < attr_value:
486 ##                        nodes_id.append(node_tag['node_id'])
487 ##            else:
488 ##
489 ##                # remove the nodes ids that don't match the new attribute
490 ##                # that is being match
491 ##                nodes_id_tmp = []
492 ##                for node_tag in node_tags:
493 ##
494 ##                    # check that matches the min or max restriction and was a
495 ##                    # matching previous filters
496 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
497 ##                        float(node_tag['value']) > attr_value and \
498 ##                        node_tag['node_id'] in nodes_id:
499 ##                        nodes_id_tmp.append(node_tag['node_id'])
500 ##
501 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
502 ##                        float(node_tag['value']) < attr_value and \
503 ##                        node_tag['node_id'] in nodes_id:
504 ##                        nodes_id_tmp.append(node_tag['node_id'])
505 ##
506 ##                if len(nodes_id_tmp):
507 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
508 ##                else:
509 ##                    # no node from before match the new constraint
510 ##                    self.fail_discovery()
511 ##
512 ##        else: #TODO CHECK
513 ##            # no nodes match the filter applied
514 ##            self.fail_discovery()
515 ##
516 ##        return nodes_id
517         
518     def _choose_random_node(self, nodes, hosts_hrn):
519         """
520         From the possible nodes for provision, choose randomly to decrese the
521         probability of different RMs choosing the same node for provision
522         """
523         size = len(hosts_hrn)
524         while size:
525             size = size - 1
526             index = randint(0, size)
527             host_hrn = hosts_hrn[index]
528             hosts_hrn[index] = hosts_hrn[size]
529
530             # check the node is not blacklisted or being provision by other RM
531             # and perform ping to check that is really alive
532             if not self._blacklisted(host_hrn):
533                 if not self._reserved(host_hrn):
534                     print(self.sfaapi._reserved ,self.guid)
535                     for hostname, hrn in nodes.iteritems():
536                         if host_hrn == hrn:
537                             print('hostname' ,hostname)
538                             ping_ok = self._do_ping(hostname)
539                 
540                     if not ping_ok:
541                         self._set_hostname_attr(hostname)
542                         self.warning(" Node not responding PING ")
543                         self._blacklist_node(host_hrn)
544                     else:
545                         # discovered node for provision, added to provision list
546                         self._node_to_provision = host_hrn
547                         return host_hrn
548
549 #    def _get_nodes_id(self, filters=None):
550 #        return self.plapi.get_nodes(filters, fields=['node_id'])
551 #
552     def _add_node_to_slice(self, host_hrn):
553         """
554         Add node to slice, using SFA API.
555         """
556         self.info(" Adding node to slice ")
557         slicename = self.get("username").replace('_', '.')
558         slicename = 'ple.' + slicename
559         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
560
561     def _delete_from_slice(self):
562         """
563         Delete every node from slice, using SFA API.
564         Sfi client doesn't work for particular node urns.
565         """
566         self.warning(" Deleting node from slice ")
567         slicename = self.get("username").replace('_', '.')
568         slicename = 'ple.' + slicename
569         self.sfaapi.remove_all_from_slice(slicename)
570
571     def _get_hostname(self):
572         """
573         Get the attribute hostname.
574         """
575         hostname = self.get("hostname")
576         if hostname:
577             return hostname
578         else:
579             return None
580
581     def _set_hostname_attr(self, node):
582         """
583         Query SFAAPI for the hostname of a certain host hrn and sets the
584         attribute hostname, it will over write the previous value.
585         """
586         hosts_hrn = self.sfaapi.get_resources_hrn()
587         for hostname, hrn  in hosts_hrn.iteritems():
588             if hrn == node:
589                 self.set("hostname", hostname)
590
591     def _check_if_in_slice(self, hosts_hrn):
592         """
593         Check using SFA API if any host hrn from hosts_hrn is in the user's
594         slice.
595         """
596         slicename = self.get("username").replace('_', '.')
597         slicename = 'ple.' + slicename
598         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
599         if slice_nodes:
600             slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
601         else: slice_nodes_hrn = []
602         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
603         return nodes_inslice
604
605     def _do_ping(self, hostname):
606         """
607         Perform ping command on node's IP matching hostname.
608         """
609         ping_ok = False
610         ip = self._get_ip(hostname)
611         if ip:
612             command = "ping -c4 %s" % ip
613             (out, err) = lexec(command)
614
615             m = re.search("(\d+)% packet loss", str(out))
616             if m and int(m.groups()[0]) < 50:
617                 ping_ok = True
618
619         return ping_ok
620
621     def _blacklist_node(self, host_hrn):
622         """
623         Add mal functioning node to blacklist (in SFA API).
624         """
625         self.warning(" Blacklisting malfunctioning node ")
626         self.sfaapi.blacklist_resource(host_hrn)
627         if not self._hostname:
628             self.set('hostname', None)
629
630     def _reserve(self, host_hrn):
631         """
632         Add node to the list of nodes being provisioned, in order for other RMs
633         to not try to provision the same one again.
634         """
635         self.sfaapi.reserve_resource(host_hrn)
636
637     def _get_ip(self, hostname):
638         """
639         Query cache for the IP of a node with certain hostname
640         """
641         try:
642             ip = sshfuncs.gethostbyname(hostname)
643         except:
644             # Fail while trying to find the IP
645             return None
646         return ip
647
648     def fail_discovery(self):
649         msg = "Discovery failed. No candidates found for node"
650         self.error(msg)
651         raise RuntimeError, msg
652
653     def fail_node_not_alive(self, hostname=None):
654         msg = "Node %s not alive" % hostname
655         raise RuntimeError, msg
656     
657     def fail_node_not_available(self, hostname):
658         msg = "Node %s not available for provisioning" % hostname
659         raise RuntimeError, msg
660
661     def fail_not_enough_nodes(self):
662         msg = "Not enough nodes available for provisioning"
663         raise RuntimeError, msg
664
665     def fail_sfaapi(self):
666         msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
667             " attributes sfauser and sfaPrivateKey."
668         raise RuntimeError, msg
669
670     def valid_connection(self, guid):
671         # TODO: Validate!
672         return True
673
674