d18d248a536c2122dd892a8d39800f97d53424fa
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import re
30 import weakref
31 import time
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class PlanetlabSfaNode(LinuxNode):
38     _rtype = "PlanetlabSfaNode"
39     _help = "Controls a PlanetLab host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "planetlab"
42
43     @classmethod
44     def _register_attributes(cls):
45
46         sfa_user = Attribute("sfauser", "SFA user",
47                     flags = Flags.Credential)
48
49         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50                             used to generate the user credential")
51
52         city = Attribute("city", "Constrain location (city) during resource \
53                 discovery. May use wildcards.",
54                 flags = Flags.Filter)
55
56         country = Attribute("country", "Constrain location (country) during \
57                     resource discovery. May use wildcards.",
58                     flags = Flags.Filter)
59
60         region = Attribute("region", "Constrain location (region) during \
61                     resource discovery. May use wildcards.",
62                     flags = Flags.Filter)
63
64         architecture = Attribute("architecture", "Constrain architecture \
65                         during resource discovery.",
66                         type = Types.Enumerate,
67                         allowed = ["x86_64", 
68                                     "i386"],
69                         flags = Flags.Filter)
70
71         operating_system = Attribute("operatingSystem", "Constrain operating \
72                             system during resource discovery.",
73                             type = Types.Enumerate,
74                             allowed =  ["f8",
75                                         "f12",
76                                         "f14",
77                                         "centos",
78                                         "other"],
79                             flags = Flags.Filter)
80
81         min_reliability = Attribute("minReliability", "Constrain reliability \
82                             while picking PlanetLab nodes. Specifies a lower \
83                             acceptable bound.",
84                             type = Types.Double,
85                             range = (1, 100),
86                             flags = Flags.Filter)
87
88         max_reliability = Attribute("maxReliability", "Constrain reliability \
89                             while picking PlanetLab nodes. Specifies an upper \
90                             acceptable bound.",
91                             type = Types.Double,
92                             range = (1, 100),
93                             flags = Flags.Filter)
94
95         min_bandwidth = Attribute("minBandwidth", "Constrain available \
96                             bandwidth while picking PlanetLab nodes. \
97                             Specifies a lower acceptable bound.",
98                             type = Types.Double,
99                             range = (0, 2**31),
100                             flags = Flags.Filter)
101
102         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
103                             bandwidth while picking PlanetLab nodes. \
104                             Specifies an upper acceptable bound.",
105                             type = Types.Double,
106                             range = (0, 2**31),
107                             flags = Flags.Filter)
108
109         min_load = Attribute("minLoad", "Constrain node load average while \
110                     picking PlanetLab nodes. Specifies a lower acceptable \
111                     bound.",
112                     type = Types.Double,
113                     range = (0, 2**31),
114                     flags = Flags.Filter)
115
116         max_load = Attribute("maxLoad", "Constrain node load average while \
117                     picking PlanetLab nodes. Specifies an upper acceptable \
118                     bound.",
119                     type = Types.Double,
120                     range = (0, 2**31),
121                     flags = Flags.Filter)
122
123         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
124                     picking PlanetLab nodes. Specifies a lower acceptable \
125                     bound.",
126                     type = Types.Double,
127                     range = (0, 100),
128                     flags = Flags.Filter)
129
130         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
131                     picking PlanetLab nodes. Specifies an upper acceptable \
132                     bound.",
133                     type = Types.Double,
134                     range = (0, 100),
135                     flags = Flags.Filter)
136
137         timeframe = Attribute("timeframe", "Past time period in which to check\
138                         information about the node. Values are year,month, \
139                         week, latest",
140                         default = "week",
141                         type = Types.Enumerate,
142                         allowed = ["latest",
143                                     "week",
144                                     "month",
145                                     "year"],
146                         flags = Flags.Filter)
147
148         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
149                         in the user's home directory under .nepi directory. This file \
150                         contains a list of PL nodes to blacklist, and at the end \
151                         of the experiment execution the new blacklisted nodes are added.",
152                     type = Types.Bool,
153                     default = False,
154                     flags = Flags.Global)
155
156         cls._register_attribute(sfa_user)
157         cls._register_attribute(sfa_private_key)
158         cls._register_attribute(city)
159         cls._register_attribute(country)
160         cls._register_attribute(region)
161         cls._register_attribute(architecture)
162         cls._register_attribute(operating_system)
163         cls._register_attribute(min_reliability)
164         cls._register_attribute(max_reliability)
165         cls._register_attribute(min_bandwidth)
166         cls._register_attribute(max_bandwidth)
167         cls._register_attribute(min_load)
168         cls._register_attribute(max_load)
169         cls._register_attribute(min_cpu)
170         cls._register_attribute(max_cpu)
171         cls._register_attribute(timeframe)
172         cls._register_attribute(plblacklist)
173
174     def __init__(self, ec, guid):
175         super(PlanetlabSfaNode, self).__init__(ec, guid)
176         
177         self._ecobj = weakref.ref(ec)
178         self._sfaapi = None
179         self._node_to_provision = None
180         self._slicenode = False
181         self._hostname = False
182
183         if self.get("gateway") or self.get("gatewayUser"):
184             self.set("gateway", None)
185             self.set("gatewayUser", None)
186
187     def _skip_provision(self):
188         sfa_user = self.get("sfauser")
189         if not sfa_user:
190             return True
191         else: return False
192     
193     @property
194     def sfaapi(self):
195         """
196         Property to instanciate the SFA API based in sfi client.
197         For each SFA method called this instance is used.
198         """
199         if not self._sfaapi:
200             sfa_user = self.get("sfauser")
201             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
202             sfa_auth = '.'.join(sfa_user.split('.')[:2])
203             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
204             sfa_private_key = self.get("sfaPrivateKey")
205
206             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
207                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
208
209             if not _sfaapi:
210                 self.fail_sfaapi()
211     
212             self._sfaapi = weakref.ref(_sfaapi)
213
214         return self._sfaapi()
215
216     def do_discover(self):
217         """
218         Based on the attributes defined by the user, discover the suitable 
219         nodes for provision.
220         """
221         if self._skip_provision():
222             super(PlanetlabSfaNode, self).do_discover()
223             return
224
225         nodes = self.sfaapi.get_resources_hrn()
226
227         hostname = self._get_hostname()
228         if hostname:
229             # the user specified one particular node to be provisioned
230             self._hostname = True
231             host_hrn = nodes[hostname]
232
233             # check that the node is not blacklisted or being provisioned
234             # by other RM
235             if not self._blacklisted(host_hrn):
236                 if not self._reserved(host_hrn):
237                     # Node in reservation
238                     ping_ok = self._do_ping(hostname)
239                     if not ping_ok:
240                         self._blacklist_node(host_hrn)
241                         self.fail_node_not_alive(hostname)
242                     else:
243                         if self._check_if_in_slice([host_hrn]):
244                             self.debug("The node %s is already in the slice" % hostname)
245                             self._slicenode = True
246                         self._node_to_provision = host_hrn
247                         super(PlanetlabSfaNode, self).do_discover()
248         
249 #        else:
250 #            # the user specifies constraints based on attributes, zero, one or 
251 #            # more nodes can match these constraints 
252 #            nodes = self._filter_based_on_attributes()
253 #
254 #            # nodes that are already part of user's slice have the priority to
255 #            # provisioned
256 #            nodes_inslice = self._check_if_in_slice(nodes)
257 #            nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
258 #            
259 #            node_id = None
260 #            if nodes_inslice:
261 #                node_id = self._choose_random_node(nodes_inslice)
262 #                self._slicenode = True                
263 #                
264 #            if not node_id:
265 #                # Either there were no matching nodes in the user's slice, or
266 #                # the nodes in the slice  were blacklisted or being provisioned
267 #                # by other RM. Note nodes_not_inslice is never empty
268 #                node_id = self._choose_random_node(nodes_not_inslice)
269 #                self._slicenode = False
270 #
271 #            if node_id:
272 #                self._node_to_provision = node_id
273 #                try:
274 #                    self._set_hostname_attr(node_id)
275 #                    self.info(" Selected node to provision ")
276 #                    super(PlanetlabSfaNode, self).do_discover()
277 #                except:
278 #                    with PlanetlabSfaNode.lock:
279 #                        self._blacklist_node(node_id)
280 #                    self.do_discover()
281 #            else:
282 #               self.fail_not_enough_nodes() 
283 #    
284     def _blacklisted(self, host_hrn):
285         """
286         Check in the SFA API that the node is not in the blacklist.
287         """
288         if self.sfaapi.blacklisted(host_hrn):
289            self.fail_node_not_available(host_hrn)
290         return False
291
292     def _reserved(self, host_hrn):
293         """
294         Check in the SFA API that the node is not in the reserved
295         list.
296         """
297         if self.sfaapi.reserved(host_hrn):
298             self.fail_node_not_available(host_hrn)
299         return False
300             
301     def do_provision(self):
302         """
303         Add node to user's slice and verifing that the node is functioning
304         correctly. Check ssh, file system.
305         """
306         if self._skip_provision():
307             super(PlanetlabSfaNode, self).do_provision()
308             return
309
310         provision_ok = False
311         ssh_ok = False
312         proc_ok = False
313         timeout = 1800
314
315         while not provision_ok:
316             node = self._node_to_provision
317             if not self._slicenode:
318                 self._add_node_to_slice(node)
319             
320                 # check ssh connection
321                 t = 0 
322                 while t < timeout and not ssh_ok:
323
324                     cmd = 'echo \'GOOD NODE\''
325                     ((out, err), proc) = self.execute(cmd)
326                     if out.find("GOOD NODE") < 0:
327                         self.debug( "No SSH connection, waiting 60s" )
328                         t = t + 60
329                         time.sleep(60)
330                         continue
331                     else:
332                         self.debug( "SSH OK" )
333                         ssh_ok = True
334                         continue
335             else:
336                 cmd = 'echo \'GOOD NODE\''
337                 ((out, err), proc) = self.execute(cmd)
338                 if not out.find("GOOD NODE") < 0:
339                     ssh_ok = True
340
341             if not ssh_ok:
342                 # the timeout was reach without establishing ssh connection
343                 # the node is blacklisted, deleted from the slice, and a new
344                 # node to provision is discovered
345                 self.warning(" Could not SSH login ")
346                 self._blacklist_node(node)
347                 self.do_discover()
348                 continue
349             
350             # check /proc directory is mounted (ssh_ok = True)
351             # and file system is not read only
352             else:
353                 cmd = 'mount |grep proc'
354                 ((out1, err1), proc1) = self.execute(cmd)
355                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
356                 ((out2, err2), proc2) = self.execute(cmd)
357                 if out1.find("/proc type proc") < 0 or \
358                     "Read-only file system".lower() in err2.lower():
359                     self.warning(" Corrupted file system ")
360                     self._blacklist_node(node)
361                     self.do_discover()
362                     continue
363             
364                 else:
365                     provision_ok = True
366                     if not self.get('hostname'):
367                         self._set_hostname_attr(node)            
368                     self.info(" Node provisioned ")            
369             
370         super(PlanetlabSfaNode, self).do_provision()
371
372     def do_release(self):
373         super(PlanetlabSfaNode, self).do_release()
374         if self.state == ResourceState.RELEASED and not self._skip_provision():
375             self.debug(" Releasing SFA API ")
376             self.sfaapi.release()
377
378 #    def _filter_based_on_attributes(self):
379 #        """
380 #        Retrive the list of nodes hrn that match user's constraints 
381 #        """
382 #        # Map user's defined attributes with tagnames of PlanetLab
383 #        timeframe = self.get("timeframe")[0]
384 #        attr_to_tags = {
385 #            'city' : 'city',
386 #            'country' : 'country',
387 #            'region' : 'region',
388 #            'architecture' : 'arch',
389 #            'operatingSystem' : 'fcdistro',
390 #            'minReliability' : 'reliability%s' % timeframe,
391 #            'maxReliability' : 'reliability%s' % timeframe,
392 #            'minBandwidth' : 'bw%s' % timeframe,
393 #            'maxBandwidth' : 'bw%s' % timeframe,
394 #            'minLoad' : 'load%s' % timeframe,
395 #            'maxLoad' : 'load%s' % timeframe,
396 #            'minCpu' : 'cpu%s' % timeframe,
397 #            'maxCpu' : 'cpu%s' % timeframe,
398 #        }
399 #        
400 #        nodes_hrn = []
401 #        filters = {}
402 #
403 #        for attr_name, attr_obj in self._attrs.iteritems():
404 #            attr_value = self.get(attr_name)
405 #            
406 #            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
407 #                attr_name != 'timeframe':
408 #        
409 #                attr_tag = attr_to_tags[attr_name]
410 #                filters['tagname'] = attr_tag
411 #
412 #                # filter nodes by fixed constraints e.g. operating system
413 #                if not 'min' in attr_name and not 'max' in attr_name:
414 #                    filters['value'] = attr_value
415 #                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
416 #
417 #                # filter nodes by range constraints e.g. max bandwidth
418 #                elif ('min' or 'max') in attr_name:
419 #                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
420 #
421 #        if not filters:
422 #            nodes = self.sfaapi.get_resources_hrn()
423 #            for node in nodes:
424 #                nodes_hrn.append(node[node.key()])
425 #        return nodes_hrn
426 #                    
427 #    def _filter_by_fixed_attr(self, filters, nodes_hrn):
428 #        """
429 #        Query SFA API for nodes matching fixed attributes defined by the
430 #        user
431 #        """
432 #        pass
433 ##        node_tags = self.sfaapi.get_resources_tags(filters)
434 ##        if node_tags is not None:
435 ##
436 ##            if len(nodes_id) == 0:
437 ##                # first attribute being matched
438 ##                for node_tag in node_tags:
439 ##                    nodes_id.append(node_tag['node_id'])
440 ##            else:
441 ##                # remove the nodes ids that don't match the new attribute
442 ##                # that is being match
443 ##
444 ##                nodes_id_tmp = []
445 ##                for node_tag in node_tags:
446 ##                    if node_tag['node_id'] in nodes_id:
447 ##                        nodes_id_tmp.append(node_tag['node_id'])
448 ##
449 ##                if len(nodes_id_tmp):
450 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
451 ##                else:
452 ##                    # no node from before match the new constraint
453 ##                    self.fail_discovery()
454 ##        else:
455 ##            # no nodes match the filter applied
456 ##            self.fail_discovery()
457 ##
458 ##        return nodes_id
459 #
460 #    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
461 #        """
462 #        Query PLCAPI for nodes ids matching attributes defined in a certain
463 #        range, by the user
464 #        """
465 #        pass
466 ##        node_tags = self.plapi.get_node_tags(filters)
467 ##        if node_tags:
468 ##            
469 ##            if len(nodes_id) == 0:
470 ##                # first attribute being matched
471 ##                for node_tag in node_tags:
472 ## 
473 ##                   # check that matches the min or max restriction
474 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
475 ##                        float(node_tag['value']) > attr_value:
476 ##                        nodes_id.append(node_tag['node_id'])
477 ##
478 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
479 ##                        float(node_tag['value']) < attr_value:
480 ##                        nodes_id.append(node_tag['node_id'])
481 ##            else:
482 ##
483 ##                # remove the nodes ids that don't match the new attribute
484 ##                # that is being match
485 ##                nodes_id_tmp = []
486 ##                for node_tag in node_tags:
487 ##
488 ##                    # check that matches the min or max restriction and was a
489 ##                    # matching previous filters
490 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
491 ##                        float(node_tag['value']) > attr_value and \
492 ##                        node_tag['node_id'] in nodes_id:
493 ##                        nodes_id_tmp.append(node_tag['node_id'])
494 ##
495 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
496 ##                        float(node_tag['value']) < attr_value and \
497 ##                        node_tag['node_id'] in nodes_id:
498 ##                        nodes_id_tmp.append(node_tag['node_id'])
499 ##
500 ##                if len(nodes_id_tmp):
501 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
502 ##                else:
503 ##                    # no node from before match the new constraint
504 ##                    self.fail_discovery()
505 ##
506 ##        else: #TODO CHECK
507 ##            # no nodes match the filter applied
508 ##            self.fail_discovery()
509 ##
510 ##        return nodes_id
511 #        
512 #    def _choose_random_node(self, nodes):
513 #        """
514 #        From the possible nodes for provision, choose randomly to decrese the
515 #        probability of different RMs choosing the same node for provision
516 #        """
517 #        size = len(nodes)
518 #        while size:
519 #            size = size - 1
520 #            index = randint(0, size)
521 #            node_id = nodes[index]
522 #            nodes[index] = nodes[size]
523 #
524 #            # check the node is not blacklisted or being provision by other RM
525 #            # and perform ping to check that is really alive
526 #            with PlanetlabNode.lock:
527 #
528 #                blist = self.plapi.blacklisted()
529 #                plist = self.plapi.reserved()
530 #                if node_id not in blist and node_id not in plist:
531 #                    ping_ok = self._do_ping(node_id)
532 #                    if not ping_ok:
533 #                        self._set_hostname_attr(node_id)
534 #                        self.warn(" Node not responding PING ")
535 #                        self._blacklist_node(node_id)
536 #                    else:
537 #                        # discovered node for provision, added to provision list
538 #                        self._put_node_in_provision(node_id)
539 #                        return node_id
540 #
541 #    def _get_nodes_id(self, filters=None):
542 #        return self.plapi.get_nodes(filters, fields=['node_id'])
543 #
544     def _add_node_to_slice(self, host_hrn):
545         """
546         Add node to slice, using SFA API.
547         """
548         self.info(" Adding node to slice ")
549         slicename = self.get("username").replace('_', '.')
550         slicename = 'ple.' + slicename
551         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
552
553     def _delete_from_slice(self):
554         """
555         Delete every node from slice, using SFA API.
556         Sfi client doesn't work for particular node urns.
557         """
558         self.warning(" Deleting node from slice ")
559         slicename = self.get("username").replace('_', '.')
560         slicename = 'ple.' + slicename
561         self.sfaapi.remove_all_from_slice(slicename)
562
563     def _get_hostname(self):
564         """
565         Get the attribute hostname.
566         """
567         hostname = self.get("hostname")
568         if hostname:
569             return hostname
570         else:
571             return None
572
573     def _set_hostname_attr(self, node):
574         """
575         Query SFAAPI for the hostname of a certain host hrn and sets the
576         attribute hostname, it will over write the previous value.
577         """
578         hosts_hrn = self.sfaapi.get_resources_hrn()
579         for hostname, hrn  in hosts_hrn.iteritems():
580             if hrn == node:
581                 self.set("hostname", hostname)
582
583     def _check_if_in_slice(self, hosts_hrn):
584         """
585         Check using SFA API if any host hrn from hosts_hrn is in the user's
586         slice.
587         """
588         slicename = self.get("username").replace('_', '.')
589         slicename = 'ple.' + slicename
590         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
591         if slice_nodes:
592             slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
593         else: slice_nodes_hrn = []
594         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
595         return nodes_inslice
596
597     def _do_ping(self, hostname):
598         """
599         Perform ping command on node's IP matching hostname.
600         """
601         ping_ok = False
602         ip = self._get_ip(hostname)
603         if ip:
604             command = "ping -c4 %s" % ip
605             (out, err) = lexec(command)
606
607             m = re.search("(\d+)% packet loss", str(out))
608             if m and int(m.groups()[0]) < 50:
609                 ping_ok = True
610
611         return ping_ok
612
613     def _blacklist_node(self, host_hrn):
614         """
615         Add mal functioning node to blacklist (in SFA API).
616         """
617         self.warning(" Blacklisting malfunctioning node ")
618         self.sfaapi.blacklist_resource(host_hrn)
619         if not self._hostname:
620             self.set('hostname', None)
621
622     def _reserve(self, host_hrn):
623         """
624         Add node to the list of nodes being provisioned, in order for other RMs
625         to not try to provision the same one again.
626         """
627         self.sfaapi.reserve_resource(host_hrn)
628
629     def _get_ip(self, hostname):
630         """
631         Query cache for the IP of a node with certain hostname
632         """
633         try:
634             ip = sshfuncs.gethostbyname(hostname)
635         except:
636             # Fail while trying to find the IP
637             return None
638         return ip
639
640     def fail_discovery(self):
641         msg = "Discovery failed. No candidates found for node"
642         self.error(msg)
643         raise RuntimeError, msg
644
645     def fail_node_not_alive(self, hostname=None):
646         msg = "Node %s not alive" % hostname
647         raise RuntimeError, msg
648     
649     def fail_node_not_available(self, hostname):
650         msg = "Node %s not available for provisioning" % hostname
651         raise RuntimeError, msg
652
653     def fail_not_enough_nodes(self):
654         msg = "Not enough nodes available for provisioning"
655         raise RuntimeError, msg
656
657     def fail_sfaapi(self):
658         msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
659             " attributes sfauser and sfaPrivateKey."
660         raise RuntimeError, msg
661
662     def valid_connection(self, guid):
663         # TODO: Validate!
664         return True
665
666