Merge the OMF 6 branch
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import re
30 import weakref
31 import time
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class PlanetlabSfaNode(LinuxNode):
38     _rtype = "PlanetlabSfaNode"
39     _help = "Controls a PlanetLab host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "planetlab"
42
43     @classmethod
44     def _register_attributes(cls):
45
46         sfa_user = Attribute("sfauser", "SFA user",
47                     flags = Flags.Credential)
48
49         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50                             used to generate the user credential")
51
52         city = Attribute("city", "Constrain location (city) during resource \
53                 discovery. May use wildcards.",
54                 flags = Flags.Filter)
55
56         country = Attribute("country", "Constrain location (country) during \
57                     resource discovery. May use wildcards.",
58                     flags = Flags.Filter)
59
60         region = Attribute("region", "Constrain location (region) during \
61                     resource discovery. May use wildcards.",
62                     flags = Flags.Filter)
63
64         architecture = Attribute("architecture", "Constrain architecture \
65                         during resource discovery.",
66                         type = Types.Enumerate,
67                         allowed = ["x86_64", 
68                                     "i386"],
69                         flags = Flags.Filter)
70
71         operating_system = Attribute("operatingSystem", "Constrain operating \
72                             system during resource discovery.",
73                             type = Types.Enumerate,
74                             allowed =  ["f8",
75                                         "f12",
76                                         "f14",
77                                         "centos",
78                                         "other"],
79                             flags = Flags.Filter)
80
81         min_reliability = Attribute("minReliability", "Constrain reliability \
82                             while picking PlanetLab nodes. Specifies a lower \
83                             acceptable bound.",
84                             type = Types.Double,
85                             range = (1, 100),
86                             flags = Flags.Filter)
87
88         max_reliability = Attribute("maxReliability", "Constrain reliability \
89                             while picking PlanetLab nodes. Specifies an upper \
90                             acceptable bound.",
91                             type = Types.Double,
92                             range = (1, 100),
93                             flags = Flags.Filter)
94
95         min_bandwidth = Attribute("minBandwidth", "Constrain available \
96                             bandwidth while picking PlanetLab nodes. \
97                             Specifies a lower acceptable bound.",
98                             type = Types.Double,
99                             range = (0, 2**31),
100                             flags = Flags.Filter)
101
102         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
103                             bandwidth while picking PlanetLab nodes. \
104                             Specifies an upper acceptable bound.",
105                             type = Types.Double,
106                             range = (0, 2**31),
107                             flags = Flags.Filter)
108
109         min_load = Attribute("minLoad", "Constrain node load average while \
110                     picking PlanetLab nodes. Specifies a lower acceptable \
111                     bound.",
112                     type = Types.Double,
113                     range = (0, 2**31),
114                     flags = Flags.Filter)
115
116         max_load = Attribute("maxLoad", "Constrain node load average while \
117                     picking PlanetLab nodes. Specifies an upper acceptable \
118                     bound.",
119                     type = Types.Double,
120                     range = (0, 2**31),
121                     flags = Flags.Filter)
122
123         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
124                     picking PlanetLab nodes. Specifies a lower acceptable \
125                     bound.",
126                     type = Types.Double,
127                     range = (0, 100),
128                     flags = Flags.Filter)
129
130         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
131                     picking PlanetLab nodes. Specifies an upper acceptable \
132                     bound.",
133                     type = Types.Double,
134                     range = (0, 100),
135                     flags = Flags.Filter)
136
137         timeframe = Attribute("timeframe", "Past time period in which to check\
138                         information about the node. Values are year,month, \
139                         week, latest",
140                         default = "week",
141                         type = Types.Enumerate,
142                         allowed = ["latest",
143                                     "week",
144                                     "month",
145                                     "year"],
146                         flags = Flags.Filter)
147
148         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
149                         in the user's home directory under .nepi directory. This file \
150                         contains a list of PL nodes to blacklist, and at the end \
151                         of the experiment execution the new blacklisted nodes are added.",
152                     type = Types.Bool,
153                     default = False,
154                     flags = Flags.Global)
155
156         cls._register_attribute(sfa_user)
157         cls._register_attribute(sfa_private_key)
158         cls._register_attribute(city)
159         cls._register_attribute(country)
160         cls._register_attribute(region)
161         cls._register_attribute(architecture)
162         cls._register_attribute(operating_system)
163         cls._register_attribute(min_reliability)
164         cls._register_attribute(max_reliability)
165         cls._register_attribute(min_bandwidth)
166         cls._register_attribute(max_bandwidth)
167         cls._register_attribute(min_load)
168         cls._register_attribute(max_load)
169         cls._register_attribute(min_cpu)
170         cls._register_attribute(max_cpu)
171         cls._register_attribute(timeframe)
172         cls._register_attribute(plblacklist)
173
174     def __init__(self, ec, guid):
175         super(PlanetlabSfaNode, self).__init__(ec, guid)
176         
177         self._ecobj = weakref.ref(ec)
178         self._sfaapi = None
179         self._node_to_provision = None
180         self._slicenode = False
181         self._hostname = False
182
183         if self.get("gateway") or self.get("gatewayUser"):
184             self.set("gateway", None)
185             self.set("gatewayUser", None)
186
187     def _skip_provision(self):
188         sfa_user = self.get("sfauser")
189         if not sfa_user:
190             return True
191         else: return False
192     
193     @property
194     def sfaapi(self):
195         """
196         Property to instanciate the SFA API based in sfi client.
197         For each SFA method called this instance is used.
198         """
199         if not self._sfaapi:
200             sfa_user = self.get("sfauser")
201             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
202             sfa_auth = '.'.join(sfa_user.split('.')[:2])
203             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
204             sfa_private_key = self.get("sfaPrivateKey")
205
206             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
207                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
208
209             if not _sfaapi:
210                 self.fail_sfaapi()
211     
212             self._sfaapi = weakref.ref(_sfaapi)
213
214         return self._sfaapi()
215
216     def do_discover(self):
217         """
218         Based on the attributes defined by the user, discover the suitable 
219         nodes for provision.
220         """
221         if self._skip_provision():
222             super(PlanetlabSfaNode, self).do_discover()
223             return
224
225         nodes = self.sfaapi.get_resources_hrn()
226
227         hostname = self._get_hostname()
228         if hostname:
229             # the user specified one particular node to be provisioned
230             self._hostname = True
231             host_hrn = nodes[hostname]
232
233             # check that the node is not blacklisted or being provisioned
234             # by other RM
235             if not self._blacklisted(host_hrn):
236                 if not self._reserved(host_hrn):
237                     # Node in reservation
238                     ping_ok = self._do_ping(hostname)
239                     if not ping_ok:
240                         self._blacklist_node(host_hrn)
241                         self.fail_node_not_alive(hostname)
242                     else:
243                         if self._check_if_in_slice([host_hrn]):
244                             self.debug("The node %s is already in the slice" % hostname)
245                             self._slicenode = True
246                         self._node_to_provision = host_hrn
247                         super(PlanetlabSfaNode, self).do_discover()
248         
249 #        else:
250 #            # the user specifies constraints based on attributes, zero, one or 
251 #            # more nodes can match these constraints 
252 #            nodes = self._filter_based_on_attributes()
253 #
254 #            # nodes that are already part of user's slice have the priority to
255 #            # provisioned
256 #            nodes_inslice = self._check_if_in_slice(nodes)
257 #            nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
258 #            
259 #            node_id = None
260 #            if nodes_inslice:
261 #                node_id = self._choose_random_node(nodes_inslice)
262 #                self._slicenode = True                
263 #                
264 #            if not node_id:
265 #                # Either there were no matching nodes in the user's slice, or
266 #                # the nodes in the slice  were blacklisted or being provisioned
267 #                # by other RM. Note nodes_not_inslice is never empty
268 #                node_id = self._choose_random_node(nodes_not_inslice)
269 #                self._slicenode = False
270 #
271 #            if node_id:
272 #                self._node_to_provision = node_id
273 #                try:
274 #                    self._set_hostname_attr(node_id)
275 #                    self.info(" Selected node to provision ")
276 #                    super(PlanetlabSfaNode, self).do_discover()
277 #                except:
278 #                    with PlanetlabSfaNode.lock:
279 #                        self._blacklist_node(node_id)
280 #                    self.do_discover()
281 #            else:
282 #               self.fail_not_enough_nodes() 
283 #    
284     def _blacklisted(self, host_hrn):
285         """
286         Check in the SFA API that the node is not in the blacklist.
287         """
288         if self.sfaapi.blacklisted(host_hrn):
289            self.fail_node_not_available(host_hrn)
290         return False
291
292     def _reserved(self, host_hrn):
293         """
294         Check in the SFA API that the node is not in the reserved
295         list.
296         """
297         if self.sfaapi.reserved(host_hrn):
298             self.fail_node_not_available(host_hrn)
299         return False
300             
301     def do_provision(self):
302         """
303         Add node to user's slice and verifing that the node is functioning
304         correctly. Check ssh, file system.
305         """
306         if self._skip_provision():
307             super(PlanetlabSfaNode, self).do_provision()
308             return
309
310         provision_ok = False
311         ssh_ok = False
312         proc_ok = False
313         timeout = 1800
314
315         while not provision_ok:
316             node = self._node_to_provision
317             if not self._slicenode:
318                 self._add_node_to_slice(node)
319             
320                 # check ssh connection
321                 t = 0 
322                 while t < timeout and not ssh_ok:
323
324                     cmd = 'echo \'GOOD NODE\''
325                     ((out, err), proc) = self.execute(cmd)
326                     if out.find("GOOD NODE") < 0:
327                         self.debug( "No SSH connection, waiting 60s" )
328                         t = t + 60
329                         time.sleep(60)
330                         continue
331                     else:
332                         self.debug( "SSH OK" )
333                         ssh_ok = True
334                         continue
335             else:
336                 cmd = 'echo \'GOOD NODE\''
337                 ((out, err), proc) = self.execute(cmd)
338                 if not out.find("GOOD NODE") < 0:
339                     ssh_ok = True
340
341             if not ssh_ok:
342                 # the timeout was reach without establishing ssh connection
343                 # the node is blacklisted, deleted from the slice, and a new
344                 # node to provision is discovered
345                 self.warning(" Could not SSH login ")
346                 self._blacklist_node(node)
347                 self.do_discover()
348                 continue
349             
350             # check /proc directory is mounted (ssh_ok = True)
351             # and file system is not read only
352             else:
353                 cmd = 'mount |grep proc'
354                 ((out1, err1), proc1) = self.execute(cmd)
355                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
356                 ((out2, err2), proc2) = self.execute(cmd)
357                 if out1.find("/proc type proc") < 0 or \
358                     "Read-only file system".lower() in err2.lower():
359                     self.warning(" Corrupted file system ")
360                     self._blacklist_node(node)
361                     self.do_discover()
362                     continue
363             
364                 else:
365                     provision_ok = True
366                     if not self.get('hostname'):
367                         self._set_hostname_attr(node)            
368                     self.info(" Node provisioned ")            
369             
370         super(PlanetlabSfaNode, self).do_provision()
371
372 #    def _filter_based_on_attributes(self):
373 #        """
374 #        Retrive the list of nodes hrn that match user's constraints 
375 #        """
376 #        # Map user's defined attributes with tagnames of PlanetLab
377 #        timeframe = self.get("timeframe")[0]
378 #        attr_to_tags = {
379 #            'city' : 'city',
380 #            'country' : 'country',
381 #            'region' : 'region',
382 #            'architecture' : 'arch',
383 #            'operatingSystem' : 'fcdistro',
384 #            'minReliability' : 'reliability%s' % timeframe,
385 #            'maxReliability' : 'reliability%s' % timeframe,
386 #            'minBandwidth' : 'bw%s' % timeframe,
387 #            'maxBandwidth' : 'bw%s' % timeframe,
388 #            'minLoad' : 'load%s' % timeframe,
389 #            'maxLoad' : 'load%s' % timeframe,
390 #            'minCpu' : 'cpu%s' % timeframe,
391 #            'maxCpu' : 'cpu%s' % timeframe,
392 #        }
393 #        
394 #        nodes_hrn = []
395 #        filters = {}
396 #
397 #        for attr_name, attr_obj in self._attrs.iteritems():
398 #            attr_value = self.get(attr_name)
399 #            
400 #            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
401 #                attr_name != 'timeframe':
402 #        
403 #                attr_tag = attr_to_tags[attr_name]
404 #                filters['tagname'] = attr_tag
405 #
406 #                # filter nodes by fixed constraints e.g. operating system
407 #                if not 'min' in attr_name and not 'max' in attr_name:
408 #                    filters['value'] = attr_value
409 #                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
410 #
411 #                # filter nodes by range constraints e.g. max bandwidth
412 #                elif ('min' or 'max') in attr_name:
413 #                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
414 #
415 #        if not filters:
416 #            nodes = self.sfaapi.get_resources_hrn()
417 #            for node in nodes:
418 #                nodes_hrn.append(node[node.key()])
419 #        return nodes_hrn
420 #                    
421 #    def _filter_by_fixed_attr(self, filters, nodes_hrn):
422 #        """
423 #        Query SFA API for nodes matching fixed attributes defined by the
424 #        user
425 #        """
426 #        pass
427 ##        node_tags = self.sfaapi.get_resources_tags(filters)
428 ##        if node_tags is not None:
429 ##
430 ##            if len(nodes_id) == 0:
431 ##                # first attribute being matched
432 ##                for node_tag in node_tags:
433 ##                    nodes_id.append(node_tag['node_id'])
434 ##            else:
435 ##                # remove the nodes ids that don't match the new attribute
436 ##                # that is being match
437 ##
438 ##                nodes_id_tmp = []
439 ##                for node_tag in node_tags:
440 ##                    if node_tag['node_id'] in nodes_id:
441 ##                        nodes_id_tmp.append(node_tag['node_id'])
442 ##
443 ##                if len(nodes_id_tmp):
444 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
445 ##                else:
446 ##                    # no node from before match the new constraint
447 ##                    self.fail_discovery()
448 ##        else:
449 ##            # no nodes match the filter applied
450 ##            self.fail_discovery()
451 ##
452 ##        return nodes_id
453 #
454 #    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
455 #        """
456 #        Query PLCAPI for nodes ids matching attributes defined in a certain
457 #        range, by the user
458 #        """
459 #        pass
460 ##        node_tags = self.plapi.get_node_tags(filters)
461 ##        if node_tags:
462 ##            
463 ##            if len(nodes_id) == 0:
464 ##                # first attribute being matched
465 ##                for node_tag in node_tags:
466 ## 
467 ##                   # check that matches the min or max restriction
468 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
469 ##                        float(node_tag['value']) > attr_value:
470 ##                        nodes_id.append(node_tag['node_id'])
471 ##
472 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
473 ##                        float(node_tag['value']) < attr_value:
474 ##                        nodes_id.append(node_tag['node_id'])
475 ##            else:
476 ##
477 ##                # remove the nodes ids that don't match the new attribute
478 ##                # that is being match
479 ##                nodes_id_tmp = []
480 ##                for node_tag in node_tags:
481 ##
482 ##                    # check that matches the min or max restriction and was a
483 ##                    # matching previous filters
484 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
485 ##                        float(node_tag['value']) > attr_value and \
486 ##                        node_tag['node_id'] in nodes_id:
487 ##                        nodes_id_tmp.append(node_tag['node_id'])
488 ##
489 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
490 ##                        float(node_tag['value']) < attr_value and \
491 ##                        node_tag['node_id'] in nodes_id:
492 ##                        nodes_id_tmp.append(node_tag['node_id'])
493 ##
494 ##                if len(nodes_id_tmp):
495 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
496 ##                else:
497 ##                    # no node from before match the new constraint
498 ##                    self.fail_discovery()
499 ##
500 ##        else: #TODO CHECK
501 ##            # no nodes match the filter applied
502 ##            self.fail_discovery()
503 ##
504 ##        return nodes_id
505 #        
506 #    def _choose_random_node(self, nodes):
507 #        """
508 #        From the possible nodes for provision, choose randomly to decrese the
509 #        probability of different RMs choosing the same node for provision
510 #        """
511 #        size = len(nodes)
512 #        while size:
513 #            size = size - 1
514 #            index = randint(0, size)
515 #            node_id = nodes[index]
516 #            nodes[index] = nodes[size]
517 #
518 #            # check the node is not blacklisted or being provision by other RM
519 #            # and perform ping to check that is really alive
520 #            with PlanetlabNode.lock:
521 #
522 #                blist = self.plapi.blacklisted()
523 #                plist = self.plapi.reserved()
524 #                if node_id not in blist and node_id not in plist:
525 #                    ping_ok = self._do_ping(node_id)
526 #                    if not ping_ok:
527 #                        self._set_hostname_attr(node_id)
528 #                        self.warn(" Node not responding PING ")
529 #                        self._blacklist_node(node_id)
530 #                    else:
531 #                        # discovered node for provision, added to provision list
532 #                        self._put_node_in_provision(node_id)
533 #                        return node_id
534 #
535 #    def _get_nodes_id(self, filters=None):
536 #        return self.plapi.get_nodes(filters, fields=['node_id'])
537 #
538     def _add_node_to_slice(self, host_hrn):
539         """
540         Add node to slice, using SFA API.
541         """
542         self.info(" Adding node to slice ")
543         slicename = self.get("username").replace('_', '.')
544         slicename = 'ple.' + slicename
545         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
546
547     def _delete_from_slice(self):
548         """
549         Delete every node from slice, using SFA API.
550         Sfi client doesn't work for particular node urns.
551         """
552         self.warning(" Deleting node from slice ")
553         slicename = self.get("username").replace('_', '.')
554         slicename = 'ple.' + slicename
555         self.sfaapi.remove_all_from_slice(slicename)
556
557     def _get_hostname(self):
558         """
559         Get the attribute hostname.
560         """
561         hostname = self.get("hostname")
562         if hostname:
563             return hostname
564         else:
565             return None
566
567     def _set_hostname_attr(self, node):
568         """
569         Query SFAAPI for the hostname of a certain host hrn and sets the
570         attribute hostname, it will over write the previous value.
571         """
572         hosts_hrn = self.sfaapi.get_resources_hrn()
573         for hostname, hrn  in hosts_hrn.iteritems():
574             if hrn == node:
575                 self.set("hostname", hostname)
576
577     def _check_if_in_slice(self, hosts_hrn):
578         """
579         Check using SFA API if any host hrn from hosts_hrn is in the user's
580         slice.
581         """
582         slicename = self.get("username").replace('_', '.')
583         slicename = 'ple.' + slicename
584         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
585         if slice_nodes:
586             slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
587         else: slice_nodes_hrn = []
588         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
589         return nodes_inslice
590
591     def _do_ping(self, hostname):
592         """
593         Perform ping command on node's IP matching hostname.
594         """
595         ping_ok = False
596         ip = self._get_ip(hostname)
597         if ip:
598             command = "ping -c4 %s" % ip
599             (out, err) = lexec(command)
600
601             m = re.search("(\d+)% packet loss", str(out))
602             if m and int(m.groups()[0]) < 50:
603                 ping_ok = True
604
605         return ping_ok
606
607     def _blacklist_node(self, host_hrn):
608         """
609         Add mal functioning node to blacklist (in SFA API).
610         """
611         self.warning(" Blacklisting malfunctioning node ")
612         self.sfaapi.blacklist_resource(host_hrn)
613         if not self._hostname:
614             self.set('hostname', None)
615
616     def _reserve(self, host_hrn):
617         """
618         Add node to the list of nodes being provisioned, in order for other RMs
619         to not try to provision the same one again.
620         """
621         self.sfaapi.reserve_resource(host_hrn)
622
623     def _get_ip(self, hostname):
624         """
625         Query cache for the IP of a node with certain hostname
626         """
627         try:
628             ip = sshfuncs.gethostbyname(hostname)
629         except:
630             # Fail while trying to find the IP
631             return None
632         return ip
633
634     def fail_discovery(self):
635         msg = "Discovery failed. No candidates found for node"
636         self.error(msg)
637         raise RuntimeError, msg
638
639     def fail_node_not_alive(self, hostname=None):
640         msg = "Node %s not alive" % hostname
641         raise RuntimeError, msg
642     
643     def fail_node_not_available(self, hostname):
644         msg = "Node %s not available for provisioning" % hostname
645         raise RuntimeError, msg
646
647     def fail_not_enough_nodes(self):
648         msg = "Not enough nodes available for provisioning"
649         raise RuntimeError, msg
650
651     def fail_sfaapi(self):
652         msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
653             " attributes sfauser and sfaPrivateKey."
654         raise RuntimeError, msg
655
656     def valid_connection(self, guid):
657         # TODO: Validate!
658         return True
659
660