Replacing _backend for _platform class attribute in ResourceManager
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import re
30 import os
31 import weakref
32 import time
33 import socket
34 import threading
35 import datetime
36
37 @clsinit_copy
38 class PlanetlabSfaNode(LinuxNode):
39     _rtype = "planetlab::sfa::Node"
40     _help = "Controls a PlanetLab host accessible using a SSH key " \
41             "and provisioned using SFA"
42     _platform = "planetlab"
43
44     @classmethod
45     def _register_attributes(cls):
46
47         sfa_user = Attribute("sfauser", "SFA user",
48                     flags = Flags.Credential)
49
50         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
51                             used to generate the user credential")
52
53         city = Attribute("city", "Constrain location (city) during resource \
54                 discovery. May use wildcards.",
55                 flags = Flags.Filter)
56
57         country = Attribute("country", "Constrain location (country) during \
58                     resource discovery. May use wildcards.",
59                     flags = Flags.Filter)
60
61         region = Attribute("region", "Constrain location (region) during \
62                     resource discovery. May use wildcards.",
63                     flags = Flags.Filter)
64
65         architecture = Attribute("architecture", "Constrain architecture \
66                         during resource discovery.",
67                         type = Types.Enumerate,
68                         allowed = ["x86_64", 
69                                     "i386"],
70                         flags = Flags.Filter)
71
72         operating_system = Attribute("operatingSystem", "Constrain operating \
73                             system during resource discovery.",
74                             type = Types.Enumerate,
75                             allowed =  ["f8",
76                                         "f12",
77                                         "f14",
78                                         "centos",
79                                         "other"],
80                             flags = Flags.Filter)
81
82         min_reliability = Attribute("minReliability", "Constrain reliability \
83                             while picking PlanetLab nodes. Specifies a lower \
84                             acceptable bound.",
85                             type = Types.Double,
86                             range = (1, 100),
87                             flags = Flags.Filter)
88
89         max_reliability = Attribute("maxReliability", "Constrain reliability \
90                             while picking PlanetLab nodes. Specifies an upper \
91                             acceptable bound.",
92                             type = Types.Double,
93                             range = (1, 100),
94                             flags = Flags.Filter)
95
96         min_bandwidth = Attribute("minBandwidth", "Constrain available \
97                             bandwidth while picking PlanetLab nodes. \
98                             Specifies a lower acceptable bound.",
99                             type = Types.Double,
100                             range = (0, 2**31),
101                             flags = Flags.Filter)
102
103         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
104                             bandwidth while picking PlanetLab nodes. \
105                             Specifies an upper acceptable bound.",
106                             type = Types.Double,
107                             range = (0, 2**31),
108                             flags = Flags.Filter)
109
110         min_load = Attribute("minLoad", "Constrain node load average while \
111                     picking PlanetLab nodes. Specifies a lower acceptable \
112                     bound.",
113                     type = Types.Double,
114                     range = (0, 2**31),
115                     flags = Flags.Filter)
116
117         max_load = Attribute("maxLoad", "Constrain node load average while \
118                     picking PlanetLab nodes. Specifies an upper acceptable \
119                     bound.",
120                     type = Types.Double,
121                     range = (0, 2**31),
122                     flags = Flags.Filter)
123
124         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
125                     picking PlanetLab nodes. Specifies a lower acceptable \
126                     bound.",
127                     type = Types.Double,
128                     range = (0, 100),
129                     flags = Flags.Filter)
130
131         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
132                     picking PlanetLab nodes. Specifies an upper acceptable \
133                     bound.",
134                     type = Types.Double,
135                     range = (0, 100),
136                     flags = Flags.Filter)
137
138         timeframe = Attribute("timeframe", "Past time period in which to check\
139                         information about the node. Values are year,month, \
140                         week, latest",
141                         default = "week",
142                         type = Types.Enumerate,
143                         allowed = ["latest",
144                                     "week",
145                                     "month",
146                                     "year"],
147                         flags = Flags.Filter)
148
149         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
150                         in the user's home directory under .nepi directory. This file \
151                         contains a list of PL nodes to blacklist, and at the end \
152                         of the experiment execution the new blacklisted nodes are added.",
153                     type = Types.Bool,
154                     default = False,
155                     flags = Flags.Global)
156
157         cls._register_attribute(sfa_user)
158         cls._register_attribute(sfa_private_key)
159         cls._register_attribute(city)
160         cls._register_attribute(country)
161         cls._register_attribute(region)
162         cls._register_attribute(architecture)
163         cls._register_attribute(operating_system)
164         cls._register_attribute(min_reliability)
165         cls._register_attribute(max_reliability)
166         cls._register_attribute(min_bandwidth)
167         cls._register_attribute(max_bandwidth)
168         cls._register_attribute(min_load)
169         cls._register_attribute(max_load)
170         cls._register_attribute(min_cpu)
171         cls._register_attribute(max_cpu)
172         cls._register_attribute(timeframe)
173         cls._register_attribute(plblacklist)
174
175     def __init__(self, ec, guid):
176         super(PlanetlabSfaNode, self).__init__(ec, guid)
177         
178         self._ecobj = weakref.ref(ec)
179         self._sfaapi = None
180         self._node_to_provision = None
181         self._slicenode = False
182         self._hostname = False
183
184         if self.get("gateway") or self.get("gatewayUser"):
185             self.set("gateway", None)
186             self.set("gatewayUser", None)
187
188         # Blacklist file for PL nodes
189         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
190         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
191         if not os.path.exists(plblacklist_file):
192             if os.path.isdir(nepi_home):
193                 open(plblacklist_file, 'w').close()
194             else:
195                 os.makedirs(nepi_home)
196                 open(plblacklist_file, 'w').close()
197
198     def _skip_provision(self):
199         sfa_user = self.get("sfauser")
200         if not sfa_user:
201             return True
202         else: return False
203     
204     @property
205     def sfaapi(self):
206         """
207         Property to instanciate the SFA API based in sfi client.
208         For each SFA method called this instance is used.
209         """
210         if not self._sfaapi:
211             sfa_user = self.get("sfauser")
212             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
213             sfa_auth = '.'.join(sfa_user.split('.')[:2])
214             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
215             sfa_private_key = self.get("sfaPrivateKey")
216
217             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
218                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
219
220             if not _sfaapi:
221                 self.fail_sfaapi()
222     
223             self._sfaapi = weakref.ref(_sfaapi)
224
225         return self._sfaapi()
226
227     def do_discover(self):
228         """
229         Based on the attributes defined by the user, discover the suitable 
230         nodes for provision.
231         """
232         if self._skip_provision():
233             super(PlanetlabSfaNode, self).do_discover()
234             return
235
236         nodes = self.sfaapi.get_resources_hrn()
237
238         hostname = self._get_hostname()
239         if hostname:
240             # the user specified one particular node to be provisioned
241             self._hostname = True
242             host_hrn = nodes[hostname]
243
244             # check that the node is not blacklisted or being provisioned
245             # by other RM
246             if not self._blacklisted(host_hrn) and not self._reserved(host_hrn):
247                 # Node in reservation
248                 ping_ok = self._do_ping(hostname)
249                 if not ping_ok:
250                     self._blacklist_node(host_hrn)
251                     self.fail_node_not_alive(hostname)
252                 else:
253                     if self._check_if_in_slice([host_hrn]):
254                         self.debug("The node %s is already in the slice" % hostname)
255                         self._slicenode = True
256                     self._node_to_provision = host_hrn
257             else:
258                 self.fail_node_not_available(hostname)
259             super(PlanetlabSfaNode, self).do_discover()
260
261         else:
262             hosts_hrn = nodes.values()
263             nodes_inslice = self._check_if_in_slice(hosts_hrn)
264             nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice))
265             host_hrn = None
266             if nodes_inslice:
267                 host_hrn = self._choose_random_node(nodes, nodes_inslice)
268                 self._slicenode = True          
269
270             if not host_hrn:
271                 # Either there were no matching nodes in the user's slice, or
272                 # the nodes in the slice  were blacklisted or being provisioned
273                 # by other RM. Note nodes_not_inslice is never empty
274                 host_hrn = self._choose_random_node(nodes, nodes_not_inslice)
275                 self._slicenode = False
276
277             if host_hrn:
278                 self._node_to_provision = host_hrn
279                 try:
280                     self._set_hostname_attr(host_hrn)
281                     self.info(" Selected node to provision ")
282                     super(PlanetlabSfaNode, self).do_discover()
283                 except:
284                     self._blacklist_node(host_hrn)
285                     self.do_discover()
286             else:
287                self.fail_not_enough_nodes() 
288     
289     def _blacklisted(self, host_hrn):
290         """
291         Check in the SFA API that the node is not in the blacklist.
292         """
293         if self.sfaapi.blacklisted(host_hrn):
294            return True
295         return False
296
297     def _reserved(self, host_hrn):
298         """
299         Check in the SFA API that the node is not in the reserved
300         list.
301         """
302         if self.sfaapi.reserved(host_hrn):
303             return True
304         return False
305             
306     def do_provision(self):
307         """
308         Add node to user's slice and verifing that the node is functioning
309         correctly. Check ssh, file system.
310         """
311         if self._skip_provision():
312             super(PlanetlabSfaNode, self).do_provision()
313             return
314
315         provision_ok = False
316         ssh_ok = False
317         proc_ok = False
318         timeout = 1800
319
320         while not provision_ok:
321             node = self._node_to_provision
322             if not self._slicenode:
323                 self._add_node_to_slice(node)
324             
325                 # check ssh connection
326                 t = 0 
327                 while t < timeout and not ssh_ok:
328
329                     cmd = 'echo \'GOOD NODE\''
330                     ((out, err), proc) = self.execute(cmd)
331                     if out.find("GOOD NODE") < 0:
332                         self.debug( "No SSH connection, waiting 60s" )
333                         t = t + 60
334                         time.sleep(60)
335                         continue
336                     else:
337                         self.debug( "SSH OK" )
338                         ssh_ok = True
339                         continue
340             else:
341                 cmd = 'echo \'GOOD NODE\''
342                 ((out, err), proc) = self.execute(cmd)
343                 if not out.find("GOOD NODE") < 0:
344                     ssh_ok = True
345
346             if not ssh_ok:
347                 # the timeout was reach without establishing ssh connection
348                 # the node is blacklisted, deleted from the slice, and a new
349                 # node to provision is discovered
350                 self.warning(" Could not SSH login ")
351                 self._blacklist_node(node)
352                 self.do_discover()
353                 continue
354             
355             # check /proc directory is mounted (ssh_ok = True)
356             # and file system is not read only
357             else:
358                 cmd = 'mount |grep proc'
359                 ((out1, err1), proc1) = self.execute(cmd)
360                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
361                 ((out2, err2), proc2) = self.execute(cmd)
362                 if out1.find("/proc type proc") < 0 or \
363                     "Read-only file system".lower() in err2.lower():
364                     self.warning(" Corrupted file system ")
365                     self._blacklist_node(node)
366                     self.do_discover()
367                     continue
368             
369                 else:
370                     provision_ok = True
371                     if not self.get('hostname'):
372                         self._set_hostname_attr(node)            
373                     self.info(" Node provisioned ")            
374             
375         super(PlanetlabSfaNode, self).do_provision()
376
377     def do_release(self):
378         super(PlanetlabSfaNode, self).do_release()
379         if self.state == ResourceState.RELEASED and not self._skip_provision():
380             self.debug(" Releasing SFA API ")
381             self.sfaapi.release()
382
383 #    def _filter_based_on_attributes(self):
384 #        """
385 #        Retrive the list of nodes hrn that match user's constraints 
386 #        """
387 #        # Map user's defined attributes with tagnames of PlanetLab
388 #        timeframe = self.get("timeframe")[0]
389 #        attr_to_tags = {
390 #            'city' : 'city',
391 #            'country' : 'country',
392 #            'region' : 'region',
393 #            'architecture' : 'arch',
394 #            'operatingSystem' : 'fcdistro',
395 #            'minReliability' : 'reliability%s' % timeframe,
396 #            'maxReliability' : 'reliability%s' % timeframe,
397 #            'minBandwidth' : 'bw%s' % timeframe,
398 #            'maxBandwidth' : 'bw%s' % timeframe,
399 #            'minLoad' : 'load%s' % timeframe,
400 #            'maxLoad' : 'load%s' % timeframe,
401 #            'minCpu' : 'cpu%s' % timeframe,
402 #            'maxCpu' : 'cpu%s' % timeframe,
403 #        }
404 #        
405 #        nodes_hrn = []
406 #        filters = {}
407 #
408 #        for attr_name, attr_obj in self._attrs.iteritems():
409 #            attr_value = self.get(attr_name)
410 #            
411 #            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
412 #                attr_name != 'timeframe':
413 #        
414 #                attr_tag = attr_to_tags[attr_name]
415 #                filters['tagname'] = attr_tag
416 #
417 #                # filter nodes by fixed constraints e.g. operating system
418 #                if not 'min' in attr_name and not 'max' in attr_name:
419 #                    filters['value'] = attr_value
420 #                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
421 #
422 #                # filter nodes by range constraints e.g. max bandwidth
423 #                elif ('min' or 'max') in attr_name:
424 #                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
425 #
426 #        if not filters:
427 #            nodes = self.sfaapi.get_resources_hrn()
428 #            for node in nodes:
429 #                nodes_hrn.append(node[node.key()])
430 #        return nodes_hrn
431 #                    
432 #    def _filter_by_fixed_attr(self, filters, nodes_hrn):
433 #        """
434 #        Query SFA API for nodes matching fixed attributes defined by the
435 #        user
436 #        """
437 #        pass
438 ##        node_tags = self.sfaapi.get_resources_tags(filters)
439 ##        if node_tags is not None:
440 ##
441 ##            if len(nodes_id) == 0:
442 ##                # first attribute being matched
443 ##                for node_tag in node_tags:
444 ##                    nodes_id.append(node_tag['node_id'])
445 ##            else:
446 ##                # remove the nodes ids that don't match the new attribute
447 ##                # that is being match
448 ##
449 ##                nodes_id_tmp = []
450 ##                for node_tag in node_tags:
451 ##                    if node_tag['node_id'] in nodes_id:
452 ##                        nodes_id_tmp.append(node_tag['node_id'])
453 ##
454 ##                if len(nodes_id_tmp):
455 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
456 ##                else:
457 ##                    # no node from before match the new constraint
458 ##                    self.fail_discovery()
459 ##        else:
460 ##            # no nodes match the filter applied
461 ##            self.fail_discovery()
462 ##
463 ##        return nodes_id
464 #
465 #    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
466 #        """
467 #        Query PLCAPI for nodes ids matching attributes defined in a certain
468 #        range, by the user
469 #        """
470 #        pass
471 ##        node_tags = self.plapi.get_node_tags(filters)
472 ##        if node_tags:
473 ##            
474 ##            if len(nodes_id) == 0:
475 ##                # first attribute being matched
476 ##                for node_tag in node_tags:
477 ## 
478 ##                   # check that matches the min or max restriction
479 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
480 ##                        float(node_tag['value']) > attr_value:
481 ##                        nodes_id.append(node_tag['node_id'])
482 ##
483 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
484 ##                        float(node_tag['value']) < attr_value:
485 ##                        nodes_id.append(node_tag['node_id'])
486 ##            else:
487 ##
488 ##                # remove the nodes ids that don't match the new attribute
489 ##                # that is being match
490 ##                nodes_id_tmp = []
491 ##                for node_tag in node_tags:
492 ##
493 ##                    # check that matches the min or max restriction and was a
494 ##                    # matching previous filters
495 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
496 ##                        float(node_tag['value']) > attr_value and \
497 ##                        node_tag['node_id'] in nodes_id:
498 ##                        nodes_id_tmp.append(node_tag['node_id'])
499 ##
500 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
501 ##                        float(node_tag['value']) < attr_value and \
502 ##                        node_tag['node_id'] in nodes_id:
503 ##                        nodes_id_tmp.append(node_tag['node_id'])
504 ##
505 ##                if len(nodes_id_tmp):
506 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
507 ##                else:
508 ##                    # no node from before match the new constraint
509 ##                    self.fail_discovery()
510 ##
511 ##        else: #TODO CHECK
512 ##            # no nodes match the filter applied
513 ##            self.fail_discovery()
514 ##
515 ##        return nodes_id
516         
517     def _choose_random_node(self, nodes, hosts_hrn):
518         """
519         From the possible nodes for provision, choose randomly to decrese the
520         probability of different RMs choosing the same node for provision
521         """
522         size = len(hosts_hrn)
523         while size:
524             size = size - 1
525             index = randint(0, size)
526             host_hrn = hosts_hrn[index]
527             hosts_hrn[index] = hosts_hrn[size]
528
529             # check the node is not blacklisted or being provision by other RM
530             # and perform ping to check that is really alive
531             if not self._blacklisted(host_hrn):
532                 if not self._reserved(host_hrn):
533                     print self.sfaapi._reserved ,self.guid
534                     for hostname, hrn in nodes.iteritems():
535                         if host_hrn == hrn:
536                             print 'hostname' ,hostname
537                             ping_ok = self._do_ping(hostname)
538                 
539                     if not ping_ok:
540                         self._set_hostname_attr(hostname)
541                         self.warning(" Node not responding PING ")
542                         self._blacklist_node(host_hrn)
543                     else:
544                         # discovered node for provision, added to provision list
545                         self._node_to_provision = host_hrn
546                         return host_hrn
547
548 #    def _get_nodes_id(self, filters=None):
549 #        return self.plapi.get_nodes(filters, fields=['node_id'])
550 #
551     def _add_node_to_slice(self, host_hrn):
552         """
553         Add node to slice, using SFA API.
554         """
555         self.info(" Adding node to slice ")
556         slicename = self.get("username").replace('_', '.')
557         slicename = 'ple.' + slicename
558         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
559
560     def _delete_from_slice(self):
561         """
562         Delete every node from slice, using SFA API.
563         Sfi client doesn't work for particular node urns.
564         """
565         self.warning(" Deleting node from slice ")
566         slicename = self.get("username").replace('_', '.')
567         slicename = 'ple.' + slicename
568         self.sfaapi.remove_all_from_slice(slicename)
569
570     def _get_hostname(self):
571         """
572         Get the attribute hostname.
573         """
574         hostname = self.get("hostname")
575         if hostname:
576             return hostname
577         else:
578             return None
579
580     def _set_hostname_attr(self, node):
581         """
582         Query SFAAPI for the hostname of a certain host hrn and sets the
583         attribute hostname, it will over write the previous value.
584         """
585         hosts_hrn = self.sfaapi.get_resources_hrn()
586         for hostname, hrn  in hosts_hrn.iteritems():
587             if hrn == node:
588                 self.set("hostname", hostname)
589
590     def _check_if_in_slice(self, hosts_hrn):
591         """
592         Check using SFA API if any host hrn from hosts_hrn is in the user's
593         slice.
594         """
595         slicename = self.get("username").replace('_', '.')
596         slicename = 'ple.' + slicename
597         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
598         if slice_nodes:
599             slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
600         else: slice_nodes_hrn = []
601         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
602         return nodes_inslice
603
604     def _do_ping(self, hostname):
605         """
606         Perform ping command on node's IP matching hostname.
607         """
608         ping_ok = False
609         ip = self._get_ip(hostname)
610         if ip:
611             command = "ping -c4 %s" % ip
612             (out, err) = lexec(command)
613
614             m = re.search("(\d+)% packet loss", str(out))
615             if m and int(m.groups()[0]) < 50:
616                 ping_ok = True
617
618         return ping_ok
619
620     def _blacklist_node(self, host_hrn):
621         """
622         Add mal functioning node to blacklist (in SFA API).
623         """
624         self.warning(" Blacklisting malfunctioning node ")
625         self.sfaapi.blacklist_resource(host_hrn)
626         if not self._hostname:
627             self.set('hostname', None)
628
629     def _reserve(self, host_hrn):
630         """
631         Add node to the list of nodes being provisioned, in order for other RMs
632         to not try to provision the same one again.
633         """
634         self.sfaapi.reserve_resource(host_hrn)
635
636     def _get_ip(self, hostname):
637         """
638         Query cache for the IP of a node with certain hostname
639         """
640         try:
641             ip = sshfuncs.gethostbyname(hostname)
642         except:
643             # Fail while trying to find the IP
644             return None
645         return ip
646
647     def fail_discovery(self):
648         msg = "Discovery failed. No candidates found for node"
649         self.error(msg)
650         raise RuntimeError, msg
651
652     def fail_node_not_alive(self, hostname=None):
653         msg = "Node %s not alive" % hostname
654         raise RuntimeError, msg
655     
656     def fail_node_not_available(self, hostname):
657         msg = "Node %s not available for provisioning" % hostname
658         raise RuntimeError, msg
659
660     def fail_not_enough_nodes(self):
661         msg = "Not enough nodes available for provisioning"
662         raise RuntimeError, msg
663
664     def fail_sfaapi(self):
665         msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
666             " attributes sfauser and sfaPrivateKey."
667         raise RuntimeError, msg
668
669     def valid_connection(self, guid):
670         # TODO: Validate!
671         return True
672
673