e00fbcb3376376be301338d3554e039fb2a290e8
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
18
19 from nepi.execution.attribute import Attribute, Flags, Types
20 from nepi.execution.resource import ResourceManager, clsinit_copy, \
21         ResourceState
22 from nepi.resources.linux.node import LinuxNode
23 from nepi.util.sfaapi import SFAAPIFactory 
24 from nepi.util.execfuncs import lexec
25 from nepi.util import sshfuncs
26
27 from random import randint
28 import re
29 import os
30 import weakref
31 import time
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class PlanetlabSfaNode(LinuxNode):
38     _rtype = "planetlab::sfa::Node"
39     _help = "Controls a PlanetLab host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _platform = "planetlab"
42
43     @classmethod
44     def _register_attributes(cls):
45
46         sfa_user = Attribute("sfauser", "SFA user",
47                     flags = Flags.Credential)
48
49         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50                             used to generate the user credential")
51
52         city = Attribute("city", "Constrain location (city) during resource \
53                 discovery. May use wildcards.",
54                 flags = Flags.Filter)
55
56         country = Attribute("country", "Constrain location (country) during \
57                     resource discovery. May use wildcards.",
58                     flags = Flags.Filter)
59
60         region = Attribute("region", "Constrain location (region) during \
61                     resource discovery. May use wildcards.",
62                     flags = Flags.Filter)
63
64         architecture = Attribute("architecture", "Constrain architecture \
65                         during resource discovery.",
66                         type = Types.Enumerate,
67                         allowed = ["x86_64", 
68                                     "i386"],
69                         flags = Flags.Filter)
70
71         operating_system = Attribute("operatingSystem", "Constrain operating \
72                             system during resource discovery.",
73                             type = Types.Enumerate,
74                             allowed =  ["f8",
75                                         "f12",
76                                         "f14",
77                                         "centos",
78                                         "other"],
79                             flags = Flags.Filter)
80
81         min_reliability = Attribute("minReliability", "Constrain reliability \
82                             while picking PlanetLab nodes. Specifies a lower \
83                             acceptable bound.",
84                             type = Types.Double,
85                             range = (1, 100),
86                             flags = Flags.Filter)
87
88         max_reliability = Attribute("maxReliability", "Constrain reliability \
89                             while picking PlanetLab nodes. Specifies an upper \
90                             acceptable bound.",
91                             type = Types.Double,
92                             range = (1, 100),
93                             flags = Flags.Filter)
94
95         min_bandwidth = Attribute("minBandwidth", "Constrain available \
96                             bandwidth while picking PlanetLab nodes. \
97                             Specifies a lower acceptable bound.",
98                             type = Types.Double,
99                             range = (0, 2**31),
100                             flags = Flags.Filter)
101
102         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
103                             bandwidth while picking PlanetLab nodes. \
104                             Specifies an upper acceptable bound.",
105                             type = Types.Double,
106                             range = (0, 2**31),
107                             flags = Flags.Filter)
108
109         min_load = Attribute("minLoad", "Constrain node load average while \
110                     picking PlanetLab nodes. Specifies a lower acceptable \
111                     bound.",
112                     type = Types.Double,
113                     range = (0, 2**31),
114                     flags = Flags.Filter)
115
116         max_load = Attribute("maxLoad", "Constrain node load average while \
117                     picking PlanetLab nodes. Specifies an upper acceptable \
118                     bound.",
119                     type = Types.Double,
120                     range = (0, 2**31),
121                     flags = Flags.Filter)
122
123         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
124                     picking PlanetLab nodes. Specifies a lower acceptable \
125                     bound.",
126                     type = Types.Double,
127                     range = (0, 100),
128                     flags = Flags.Filter)
129
130         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
131                     picking PlanetLab nodes. Specifies an upper acceptable \
132                     bound.",
133                     type = Types.Double,
134                     range = (0, 100),
135                     flags = Flags.Filter)
136
137         timeframe = Attribute("timeframe", "Past time period in which to check\
138                         information about the node. Values are year,month, \
139                         week, latest",
140                         default = "week",
141                         type = Types.Enumerate,
142                         allowed = ["latest",
143                                     "week",
144                                     "month",
145                                     "year"],
146                         flags = Flags.Filter)
147
148         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
149                         in the user's home directory under .nepi directory. This file \
150                         contains a list of PL nodes to blacklist, and at the end \
151                         of the experiment execution the new blacklisted nodes are added.",
152                     type = Types.Bool,
153                     default = False,
154                     flags = Flags.Global)
155
156         cls._register_attribute(sfa_user)
157         cls._register_attribute(sfa_private_key)
158         cls._register_attribute(city)
159         cls._register_attribute(country)
160         cls._register_attribute(region)
161         cls._register_attribute(architecture)
162         cls._register_attribute(operating_system)
163         cls._register_attribute(min_reliability)
164         cls._register_attribute(max_reliability)
165         cls._register_attribute(min_bandwidth)
166         cls._register_attribute(max_bandwidth)
167         cls._register_attribute(min_load)
168         cls._register_attribute(max_load)
169         cls._register_attribute(min_cpu)
170         cls._register_attribute(max_cpu)
171         cls._register_attribute(timeframe)
172         cls._register_attribute(plblacklist)
173
174     def __init__(self, ec, guid):
175         super(PlanetlabSfaNode, self).__init__(ec, guid)
176         
177         self._ecobj = weakref.ref(ec)
178         self._sfaapi = None
179         self._node_to_provision = None
180         self._slicenode = False
181         self._hostname = False
182
183         if self.get("gateway") or self.get("gatewayUser"):
184             self.set("gateway", None)
185             self.set("gatewayUser", None)
186
187         # Blacklist file for PL nodes
188         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
189         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
190         if not os.path.exists(plblacklist_file):
191             if os.path.isdir(nepi_home):
192                 open(plblacklist_file, 'w').close()
193             else:
194                 os.makedirs(nepi_home)
195                 open(plblacklist_file, 'w').close()
196
197     def _skip_provision(self):
198         sfa_user = self.get("sfauser")
199         if not sfa_user:
200             return True
201         else: return False
202     
203     @property
204     def sfaapi(self):
205         """
206         Property to instanciate the SFA API based in sfi client.
207         For each SFA method called this instance is used.
208         """
209         if not self._sfaapi:
210             sfa_user = self.get("sfauser")
211             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
212             sfa_auth = '.'.join(sfa_user.split('.')[:2])
213             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
214             sfa_private_key = self.get("sfaPrivateKey")
215
216             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
217                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
218
219             if not _sfaapi:
220                 self.fail_sfaapi()
221     
222             self._sfaapi = weakref.ref(_sfaapi)
223
224         return self._sfaapi()
225
226     def do_discover(self):
227         """
228         Based on the attributes defined by the user, discover the suitable 
229         nodes for provision.
230         """
231         if self._skip_provision():
232             super(PlanetlabSfaNode, self).do_discover()
233             return
234
235         nodes = self.sfaapi.get_resources_hrn()
236
237         hostname = self._get_hostname()
238         if hostname:
239             # the user specified one particular node to be provisioned
240             self._hostname = True
241             host_hrn = nodes[hostname]
242
243             # check that the node is not blacklisted or being provisioned
244             # by other RM
245             if not self._blacklisted(host_hrn) and not self._reserved(host_hrn):
246                 # Node in reservation
247                 ping_ok = self._do_ping(hostname)
248                 if not ping_ok:
249                     self._blacklist_node(host_hrn)
250                     self.fail_node_not_alive(hostname)
251                 else:
252                     if self._check_if_in_slice([host_hrn]):
253                         self.debug("The node %s is already in the slice" % hostname)
254                         self._slicenode = True
255                     self._node_to_provision = host_hrn
256             else:
257                 self.fail_node_not_available(hostname)
258             super(PlanetlabSfaNode, self).do_discover()
259
260         else:
261             hosts_hrn = nodes.values()
262             nodes_inslice = self._check_if_in_slice(hosts_hrn)
263             nodes_not_inslice = list(set(hosts_hrn) - set(nodes_inslice))
264             host_hrn = None
265             if nodes_inslice:
266                 host_hrn = self._choose_random_node(nodes, nodes_inslice)
267                 self._slicenode = True          
268
269             if not host_hrn:
270                 # Either there were no matching nodes in the user's slice, or
271                 # the nodes in the slice  were blacklisted or being provisioned
272                 # by other RM. Note nodes_not_inslice is never empty
273                 host_hrn = self._choose_random_node(nodes, nodes_not_inslice)
274                 self._slicenode = False
275
276             if host_hrn:
277                 self._node_to_provision = host_hrn
278                 try:
279                     self._set_hostname_attr(host_hrn)
280                     self.info(" Selected node to provision ")
281                     super(PlanetlabSfaNode, self).do_discover()
282                 except:
283                     self._blacklist_node(host_hrn)
284                     self.do_discover()
285             else:
286                self.fail_not_enough_nodes() 
287     
288     def _blacklisted(self, host_hrn):
289         """
290         Check in the SFA API that the node is not in the blacklist.
291         """
292         if self.sfaapi.blacklisted(host_hrn):
293            return True
294         return False
295
296     def _reserved(self, host_hrn):
297         """
298         Check in the SFA API that the node is not in the reserved
299         list.
300         """
301         if self.sfaapi.reserved(host_hrn):
302             return True
303         return False
304             
305     def do_provision(self):
306         """
307         Add node to user's slice and verifing that the node is functioning
308         correctly. Check ssh, file system.
309         """
310         if self._skip_provision():
311             super(PlanetlabSfaNode, self).do_provision()
312             return
313
314         provision_ok = False
315         ssh_ok = False
316         proc_ok = False
317         timeout = 1800
318
319         while not provision_ok:
320             node = self._node_to_provision
321             if not self._slicenode:
322                 self._add_node_to_slice(node)
323             
324                 # check ssh connection
325                 t = 0 
326                 while t < timeout and not ssh_ok:
327
328                     cmd = 'echo \'GOOD NODE\''
329                     ((out, err), proc) = self.execute(cmd)
330                     if out.find("GOOD NODE") < 0:
331                         self.debug( "No SSH connection, waiting 60s" )
332                         t = t + 60
333                         time.sleep(60)
334                         continue
335                     else:
336                         self.debug( "SSH OK" )
337                         ssh_ok = True
338                         continue
339             else:
340                 cmd = 'echo \'GOOD NODE\''
341                 ((out, err), proc) = self.execute(cmd)
342                 if not out.find("GOOD NODE") < 0:
343                     ssh_ok = True
344
345             if not ssh_ok:
346                 # the timeout was reach without establishing ssh connection
347                 # the node is blacklisted, deleted from the slice, and a new
348                 # node to provision is discovered
349                 self.warning(" Could not SSH login ")
350                 self._blacklist_node(node)
351                 self.do_discover()
352                 continue
353             
354             # check /proc directory is mounted (ssh_ok = True)
355             # and file system is not read only
356             else:
357                 cmd = 'mount |grep proc'
358                 ((out1, err1), proc1) = self.execute(cmd)
359                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
360                 ((out2, err2), proc2) = self.execute(cmd)
361                 if out1.find("/proc type proc") < 0 or \
362                     "Read-only file system".lower() in err2.lower():
363                     self.warning(" Corrupted file system ")
364                     self._blacklist_node(node)
365                     self.do_discover()
366                     continue
367             
368                 else:
369                     provision_ok = True
370                     if not self.get('hostname'):
371                         self._set_hostname_attr(node)            
372                     self.info(" Node provisioned ")            
373             
374         super(PlanetlabSfaNode, self).do_provision()
375
376     def do_release(self):
377         super(PlanetlabSfaNode, self).do_release()
378         if self.state == ResourceState.RELEASED and not self._skip_provision():
379             self.debug(" Releasing SFA API ")
380             self.sfaapi.release()
381
382 #    def _filter_based_on_attributes(self):
383 #        """
384 #        Retrive the list of nodes hrn that match user's constraints 
385 #        """
386 #        # Map user's defined attributes with tagnames of PlanetLab
387 #        timeframe = self.get("timeframe")[0]
388 #        attr_to_tags = {
389 #            'city' : 'city',
390 #            'country' : 'country',
391 #            'region' : 'region',
392 #            'architecture' : 'arch',
393 #            'operatingSystem' : 'fcdistro',
394 #            'minReliability' : 'reliability%s' % timeframe,
395 #            'maxReliability' : 'reliability%s' % timeframe,
396 #            'minBandwidth' : 'bw%s' % timeframe,
397 #            'maxBandwidth' : 'bw%s' % timeframe,
398 #            'minLoad' : 'load%s' % timeframe,
399 #            'maxLoad' : 'load%s' % timeframe,
400 #            'minCpu' : 'cpu%s' % timeframe,
401 #            'maxCpu' : 'cpu%s' % timeframe,
402 #        }
403 #        
404 #        nodes_hrn = []
405 #        filters = {}
406 #
407 #        for attr_name, attr_obj in self._attrs.iteritems():
408 #            attr_value = self.get(attr_name)
409 #            
410 #            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
411 #                attr_name != 'timeframe':
412 #        
413 #                attr_tag = attr_to_tags[attr_name]
414 #                filters['tagname'] = attr_tag
415 #
416 #                # filter nodes by fixed constraints e.g. operating system
417 #                if not 'min' in attr_name and not 'max' in attr_name:
418 #                    filters['value'] = attr_value
419 #                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
420 #
421 #                # filter nodes by range constraints e.g. max bandwidth
422 #                elif ('min' or 'max') in attr_name:
423 #                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
424 #
425 #        if not filters:
426 #            nodes = self.sfaapi.get_resources_hrn()
427 #            for node in nodes:
428 #                nodes_hrn.append(node[node.key()])
429 #        return nodes_hrn
430 #                    
431 #    def _filter_by_fixed_attr(self, filters, nodes_hrn):
432 #        """
433 #        Query SFA API for nodes matching fixed attributes defined by the
434 #        user
435 #        """
436 #        pass
437 ##        node_tags = self.sfaapi.get_resources_tags(filters)
438 ##        if node_tags is not None:
439 ##
440 ##            if len(nodes_id) == 0:
441 ##                # first attribute being matched
442 ##                for node_tag in node_tags:
443 ##                    nodes_id.append(node_tag['node_id'])
444 ##            else:
445 ##                # remove the nodes ids that don't match the new attribute
446 ##                # that is being match
447 ##
448 ##                nodes_id_tmp = []
449 ##                for node_tag in node_tags:
450 ##                    if node_tag['node_id'] in nodes_id:
451 ##                        nodes_id_tmp.append(node_tag['node_id'])
452 ##
453 ##                if len(nodes_id_tmp):
454 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
455 ##                else:
456 ##                    # no node from before match the new constraint
457 ##                    self.fail_discovery()
458 ##        else:
459 ##            # no nodes match the filter applied
460 ##            self.fail_discovery()
461 ##
462 ##        return nodes_id
463 #
464 #    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
465 #        """
466 #        Query PLCAPI for nodes ids matching attributes defined in a certain
467 #        range, by the user
468 #        """
469 #        pass
470 ##        node_tags = self.plapi.get_node_tags(filters)
471 ##        if node_tags:
472 ##            
473 ##            if len(nodes_id) == 0:
474 ##                # first attribute being matched
475 ##                for node_tag in node_tags:
476 ## 
477 ##                   # check that matches the min or max restriction
478 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
479 ##                        float(node_tag['value']) > attr_value:
480 ##                        nodes_id.append(node_tag['node_id'])
481 ##
482 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
483 ##                        float(node_tag['value']) < attr_value:
484 ##                        nodes_id.append(node_tag['node_id'])
485 ##            else:
486 ##
487 ##                # remove the nodes ids that don't match the new attribute
488 ##                # that is being match
489 ##                nodes_id_tmp = []
490 ##                for node_tag in node_tags:
491 ##
492 ##                    # check that matches the min or max restriction and was a
493 ##                    # matching previous filters
494 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
495 ##                        float(node_tag['value']) > attr_value and \
496 ##                        node_tag['node_id'] in nodes_id:
497 ##                        nodes_id_tmp.append(node_tag['node_id'])
498 ##
499 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
500 ##                        float(node_tag['value']) < attr_value and \
501 ##                        node_tag['node_id'] in nodes_id:
502 ##                        nodes_id_tmp.append(node_tag['node_id'])
503 ##
504 ##                if len(nodes_id_tmp):
505 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
506 ##                else:
507 ##                    # no node from before match the new constraint
508 ##                    self.fail_discovery()
509 ##
510 ##        else: #TODO CHECK
511 ##            # no nodes match the filter applied
512 ##            self.fail_discovery()
513 ##
514 ##        return nodes_id
515         
516     def _choose_random_node(self, nodes, hosts_hrn):
517         """
518         From the possible nodes for provision, choose randomly to decrese the
519         probability of different RMs choosing the same node for provision
520         """
521         size = len(hosts_hrn)
522         while size:
523             size = size - 1
524             index = randint(0, size)
525             host_hrn = hosts_hrn[index]
526             hosts_hrn[index] = hosts_hrn[size]
527
528             # check the node is not blacklisted or being provision by other RM
529             # and perform ping to check that is really alive
530             if not self._blacklisted(host_hrn):
531                 if not self._reserved(host_hrn):
532                     print self.sfaapi._reserved ,self.guid
533                     for hostname, hrn in nodes.iteritems():
534                         if host_hrn == hrn:
535                             print 'hostname' ,hostname
536                             ping_ok = self._do_ping(hostname)
537                 
538                     if not ping_ok:
539                         self._set_hostname_attr(hostname)
540                         self.warning(" Node not responding PING ")
541                         self._blacklist_node(host_hrn)
542                     else:
543                         # discovered node for provision, added to provision list
544                         self._node_to_provision = host_hrn
545                         return host_hrn
546
547 #    def _get_nodes_id(self, filters=None):
548 #        return self.plapi.get_nodes(filters, fields=['node_id'])
549 #
550     def _add_node_to_slice(self, host_hrn):
551         """
552         Add node to slice, using SFA API.
553         """
554         self.info(" Adding node to slice ")
555         slicename = self.get("username").replace('_', '.')
556         slicename = 'ple.' + slicename
557         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
558
559     def _delete_from_slice(self):
560         """
561         Delete every node from slice, using SFA API.
562         Sfi client doesn't work for particular node urns.
563         """
564         self.warning(" Deleting node from slice ")
565         slicename = self.get("username").replace('_', '.')
566         slicename = 'ple.' + slicename
567         self.sfaapi.remove_all_from_slice(slicename)
568
569     def _get_hostname(self):
570         """
571         Get the attribute hostname.
572         """
573         hostname = self.get("hostname")
574         if hostname:
575             return hostname
576         else:
577             return None
578
579     def _set_hostname_attr(self, node):
580         """
581         Query SFAAPI for the hostname of a certain host hrn and sets the
582         attribute hostname, it will over write the previous value.
583         """
584         hosts_hrn = self.sfaapi.get_resources_hrn()
585         for hostname, hrn  in hosts_hrn.iteritems():
586             if hrn == node:
587                 self.set("hostname", hostname)
588
589     def _check_if_in_slice(self, hosts_hrn):
590         """
591         Check using SFA API if any host hrn from hosts_hrn is in the user's
592         slice.
593         """
594         slicename = self.get("username").replace('_', '.')
595         slicename = 'ple.' + slicename
596         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
597         if slice_nodes:
598             slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
599         else: slice_nodes_hrn = []
600         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
601         return nodes_inslice
602
603     def _do_ping(self, hostname):
604         """
605         Perform ping command on node's IP matching hostname.
606         """
607         ping_ok = False
608         ip = self._get_ip(hostname)
609         if ip:
610             command = "ping -c4 %s" % ip
611             (out, err) = lexec(command)
612
613             m = re.search("(\d+)% packet loss", str(out))
614             if m and int(m.groups()[0]) < 50:
615                 ping_ok = True
616
617         return ping_ok
618
619     def _blacklist_node(self, host_hrn):
620         """
621         Add mal functioning node to blacklist (in SFA API).
622         """
623         self.warning(" Blacklisting malfunctioning node ")
624         self.sfaapi.blacklist_resource(host_hrn)
625         if not self._hostname:
626             self.set('hostname', None)
627
628     def _reserve(self, host_hrn):
629         """
630         Add node to the list of nodes being provisioned, in order for other RMs
631         to not try to provision the same one again.
632         """
633         self.sfaapi.reserve_resource(host_hrn)
634
635     def _get_ip(self, hostname):
636         """
637         Query cache for the IP of a node with certain hostname
638         """
639         try:
640             ip = sshfuncs.gethostbyname(hostname)
641         except:
642             # Fail while trying to find the IP
643             return None
644         return ip
645
646     def fail_discovery(self):
647         msg = "Discovery failed. No candidates found for node"
648         self.error(msg)
649         raise RuntimeError, msg
650
651     def fail_node_not_alive(self, hostname=None):
652         msg = "Node %s not alive" % hostname
653         raise RuntimeError, msg
654     
655     def fail_node_not_available(self, hostname):
656         msg = "Node %s not available for provisioning" % hostname
657         raise RuntimeError, msg
658
659     def fail_not_enough_nodes(self):
660         msg = "Not enough nodes available for provisioning"
661         raise RuntimeError, msg
662
663     def fail_sfaapi(self):
664         msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
665             " attributes sfauser and sfaPrivateKey."
666         raise RuntimeError, msg
667
668     def valid_connection(self, guid):
669         # TODO: Validate!
670         return True
671
672