Planetlab + sfa node changes
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import re
30 import weakref
31 import time
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class PlanetlabSfaNode(LinuxNode):
38     _rtype = "PlanetlabSfaNode"
39     _help = "Controls a PlanetLab host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "planetlab"
42
43     @classmethod
44     def _register_attributes(cls):
45
46         sfa_user = Attribute("sfauser", "SFA user",
47                     flags = Flags.Credential)
48
49         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50                             used to generate the user credential")
51
52         city = Attribute("city", "Constrain location (city) during resource \
53                 discovery. May use wildcards.",
54                 flags = Flags.Filter)
55
56         country = Attribute("country", "Constrain location (country) during \
57                     resource discovery. May use wildcards.",
58                     flags = Flags.Filter)
59
60         region = Attribute("region", "Constrain location (region) during \
61                     resource discovery. May use wildcards.",
62                     flags = Flags.Filter)
63
64         architecture = Attribute("architecture", "Constrain architecture \
65                         during resource discovery.",
66                         type = Types.Enumerate,
67                         allowed = ["x86_64", 
68                                     "i386"],
69                         flags = Flags.Filter)
70
71         operating_system = Attribute("operatingSystem", "Constrain operating \
72                             system during resource discovery.",
73                             type = Types.Enumerate,
74                             allowed =  ["f8",
75                                         "f12",
76                                         "f14",
77                                         "centos",
78                                         "other"],
79                             flags = Flags.Filter)
80
81         min_reliability = Attribute("minReliability", "Constrain reliability \
82                             while picking PlanetLab nodes. Specifies a lower \
83                             acceptable bound.",
84                             type = Types.Double,
85                             range = (1, 100),
86                             flags = Flags.Filter)
87
88         max_reliability = Attribute("maxReliability", "Constrain reliability \
89                             while picking PlanetLab nodes. Specifies an upper \
90                             acceptable bound.",
91                             type = Types.Double,
92                             range = (1, 100),
93                             flags = Flags.Filter)
94
95         min_bandwidth = Attribute("minBandwidth", "Constrain available \
96                             bandwidth while picking PlanetLab nodes. \
97                             Specifies a lower acceptable bound.",
98                             type = Types.Double,
99                             range = (0, 2**31),
100                             flags = Flags.Filter)
101
102         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
103                             bandwidth while picking PlanetLab nodes. \
104                             Specifies an upper acceptable bound.",
105                             type = Types.Double,
106                             range = (0, 2**31),
107                             flags = Flags.Filter)
108
109         min_load = Attribute("minLoad", "Constrain node load average while \
110                     picking PlanetLab nodes. Specifies a lower acceptable \
111                     bound.",
112                     type = Types.Double,
113                     range = (0, 2**31),
114                     flags = Flags.Filter)
115
116         max_load = Attribute("maxLoad", "Constrain node load average while \
117                     picking PlanetLab nodes. Specifies an upper acceptable \
118                     bound.",
119                     type = Types.Double,
120                     range = (0, 2**31),
121                     flags = Flags.Filter)
122
123         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
124                     picking PlanetLab nodes. Specifies a lower acceptable \
125                     bound.",
126                     type = Types.Double,
127                     range = (0, 100),
128                     flags = Flags.Filter)
129
130         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
131                     picking PlanetLab nodes. Specifies an upper acceptable \
132                     bound.",
133                     type = Types.Double,
134                     range = (0, 100),
135                     flags = Flags.Filter)
136
137         timeframe = Attribute("timeframe", "Past time period in which to check\
138                         information about the node. Values are year,month, \
139                         week, latest",
140                         default = "week",
141                         type = Types.Enumerate,
142                         allowed = ["latest",
143                                     "week",
144                                     "month",
145                                     "year"],
146                         flags = Flags.Filter)
147
148         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
149                         in the user's home directory under .nepi directory. This file \
150                         contains a list of PL nodes to blacklist, and at the end \
151                         of the experiment execution the new blacklisted nodes are added.",
152                     type = Types.Bool,
153                     default = False,
154                     flags = Flags.Global)
155
156         cls._register_attribute(sfa_user)
157         cls._register_attribute(sfa_private_key)
158         cls._register_attribute(city)
159         cls._register_attribute(country)
160         cls._register_attribute(region)
161         cls._register_attribute(architecture)
162         cls._register_attribute(operating_system)
163         cls._register_attribute(min_reliability)
164         cls._register_attribute(max_reliability)
165         cls._register_attribute(min_bandwidth)
166         cls._register_attribute(max_bandwidth)
167         cls._register_attribute(min_load)
168         cls._register_attribute(max_load)
169         cls._register_attribute(min_cpu)
170         cls._register_attribute(max_cpu)
171         cls._register_attribute(timeframe)
172         cls._register_attribute(plblacklist)
173
174     def __init__(self, ec, guid):
175         super(PlanetlabSfaNode, self).__init__(ec, guid)
176         
177         self._ecobj = weakref.ref(ec)
178         self._sfaapi = None
179         self._node_to_provision = None
180         self._slicenode = False
181         self._hostname = False
182
183         if self.get("gateway") or self.get("gatewayUser"):
184             self.set("gateway", None)
185             self.set("gatewayUser", None)
186
187     def _skip_provision(self):
188         sfa_user = self.get("sfauser")
189         if not sfa_user:
190             return True
191         else: return False
192     
193     @property
194     def sfaapi(self):
195         if not self._sfaapi:
196             sfa_user = self.get("sfauser")
197             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
198             sfa_auth = '.'.join(sfa_user.split('.')[:2])
199             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
200             sfa_private_key = self.get("sfaPrivateKey")
201
202             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
203                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
204
205             if not _sfaapi:
206                 self.fail_sfaapi()
207     
208             self._sfaapi = weakref.ref(_sfaapi)
209
210         return self._sfaapi()
211
212     def do_discover(self):
213         """
214         Based on the attributes defined by the user, discover the suitable 
215         nodes for provision.
216         """
217         if self._skip_provision():
218             super(PlanetlabSfaNode, self).do_discover()
219             return
220
221         nodes = self.sfaapi.get_resources_hrn()
222
223         hostname = self._get_hostname()
224         if hostname:
225             # the user specified one particular node to be provisioned
226             self._hostname = True
227             host_hrn = nodes[hostname]
228
229             # check that the node is not blacklisted or being provisioned
230             # by other RM
231             if not self._blacklisted(host_hrn):
232                 if not self._reserved(host_hrn):
233                     # Node in reservation
234                     ping_ok = self._do_ping(hostname)
235                     if not ping_ok:
236                         self._blacklist_node(host_hrn)
237                         self.fail_node_not_alive(hostname)
238                     else:
239                         if self._check_if_in_slice([host_hrn]):
240                             self.debug("The node %s is already in the slice" % hostname)
241                             self._slicenode = True
242                         self._node_to_provision = host_hrn
243                         super(PlanetlabSfaNode, self).do_discover()
244         
245 #        else:
246 #            # the user specifies constraints based on attributes, zero, one or 
247 #            # more nodes can match these constraints 
248 #            nodes = self._filter_based_on_attributes()
249 #
250 #            # nodes that are already part of user's slice have the priority to
251 #            # provisioned
252 #            nodes_inslice = self._check_if_in_slice(nodes)
253 #            nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
254 #            
255 #            node_id = None
256 #            if nodes_inslice:
257 #                node_id = self._choose_random_node(nodes_inslice)
258 #                self._slicenode = True                
259 #                
260 #            if not node_id:
261 #                # Either there were no matching nodes in the user's slice, or
262 #                # the nodes in the slice  were blacklisted or being provisioned
263 #                # by other RM. Note nodes_not_inslice is never empty
264 #                node_id = self._choose_random_node(nodes_not_inslice)
265 #                self._slicenode = False
266 #
267 #            if node_id:
268 #                self._node_to_provision = node_id
269 #                try:
270 #                    self._set_hostname_attr(node_id)
271 #                    self.info(" Selected node to provision ")
272 #                    super(PlanetlabSfaNode, self).do_discover()
273 #                except:
274 #                    with PlanetlabSfaNode.lock:
275 #                        self._blacklist_node(node_id)
276 #                    self.do_discover()
277 #            else:
278 #               self.fail_not_enough_nodes() 
279 #    
280     def _blacklisted(self, host_hrn):
281         if self.sfaapi.blacklisted(host_hrn):
282            self.fail_node_not_available(host_hrn)
283         return False
284
285     def _reserved(self, host_hrn):
286         if self.sfaapi.reserved(host_hrn):
287             self.fail_node_not_available(host_hrn)
288         return False
289             
290     def do_provision(self):
291         """
292         Add node to user's slice after verifing that the node is functioning
293         correctly.
294         """
295         if self._skip_provision():
296             super(PlanetlabSfaNode, self).do_provision()
297             return
298
299         provision_ok = False
300         ssh_ok = False
301         proc_ok = False
302         timeout = 1800
303
304         while not provision_ok:
305             node = self._node_to_provision
306             if not self._slicenode:
307                 self._add_node_to_slice(node)
308             
309                 # check ssh connection
310                 t = 0 
311                 while t < timeout and not ssh_ok:
312
313                     cmd = 'echo \'GOOD NODE\''
314                     ((out, err), proc) = self.execute(cmd)
315                     if out.find("GOOD NODE") < 0:
316                         self.debug( "No SSH connection, waiting 60s" )
317                         t = t + 60
318                         time.sleep(60)
319                         continue
320                     else:
321                         self.debug( "SSH OK" )
322                         ssh_ok = True
323                         continue
324             else:
325                 cmd = 'echo \'GOOD NODE\''
326                 ((out, err), proc) = self.execute(cmd)
327                 if not out.find("GOOD NODE") < 0:
328                     ssh_ok = True
329
330             if not ssh_ok:
331                 # the timeout was reach without establishing ssh connection
332                 # the node is blacklisted, deleted from the slice, and a new
333                 # node to provision is discovered
334                 self.warning(" Could not SSH login ")
335                 self._blacklist_node(node)
336                 self.do_discover()
337                 continue
338             
339             # check /proc directory is mounted (ssh_ok = True)
340             # and file system is not read only
341             else:
342                 cmd = 'mount |grep proc'
343                 ((out1, err1), proc1) = self.execute(cmd)
344                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
345                 ((out2, err2), proc2) = self.execute(cmd)
346                 if out1.find("/proc type proc") < 0 or \
347                     "Read-only file system".lower() in err2.lower():
348                     self.warning(" Corrupted file system ")
349                     self._blacklist_node(node)
350                     self.do_discover()
351                     continue
352             
353                 else:
354                     provision_ok = True
355                     if not self.get('hostname'):
356                         self._set_hostname_attr(node)            
357                     self.info(" Node provisioned ")            
358             
359         super(PlanetlabSfaNode, self).do_provision()
360
361 #    def _filter_based_on_attributes(self):
362 #        """
363 #        Retrive the list of nodes hrn that match user's constraints 
364 #        """
365 #        # Map user's defined attributes with tagnames of PlanetLab
366 #        timeframe = self.get("timeframe")[0]
367 #        attr_to_tags = {
368 #            'city' : 'city',
369 #            'country' : 'country',
370 #            'region' : 'region',
371 #            'architecture' : 'arch',
372 #            'operatingSystem' : 'fcdistro',
373 #            'minReliability' : 'reliability%s' % timeframe,
374 #            'maxReliability' : 'reliability%s' % timeframe,
375 #            'minBandwidth' : 'bw%s' % timeframe,
376 #            'maxBandwidth' : 'bw%s' % timeframe,
377 #            'minLoad' : 'load%s' % timeframe,
378 #            'maxLoad' : 'load%s' % timeframe,
379 #            'minCpu' : 'cpu%s' % timeframe,
380 #            'maxCpu' : 'cpu%s' % timeframe,
381 #        }
382 #        
383 #        nodes_hrn = []
384 #        filters = {}
385 #
386 #        for attr_name, attr_obj in self._attrs.iteritems():
387 #            attr_value = self.get(attr_name)
388 #            
389 #            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
390 #                attr_name != 'timeframe':
391 #        
392 #                attr_tag = attr_to_tags[attr_name]
393 #                filters['tagname'] = attr_tag
394 #
395 #                # filter nodes by fixed constraints e.g. operating system
396 #                if not 'min' in attr_name and not 'max' in attr_name:
397 #                    filters['value'] = attr_value
398 #                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
399 #
400 #                # filter nodes by range constraints e.g. max bandwidth
401 #                elif ('min' or 'max') in attr_name:
402 #                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
403 #
404 #        if not filters:
405 #            nodes = self.sfaapi.get_resources_hrn()
406 #            for node in nodes:
407 #                nodes_hrn.append(node[node.key()])
408 #        return nodes_hrn
409 #                    
410 #    def _filter_by_fixed_attr(self, filters, nodes_hrn):
411 #        """
412 #        Query SFA API for nodes matching fixed attributes defined by the
413 #        user
414 #        """
415 #        pass
416 ##        node_tags = self.sfaapi.get_resources_tags(filters)
417 ##        if node_tags is not None:
418 ##
419 ##            if len(nodes_id) == 0:
420 ##                # first attribute being matched
421 ##                for node_tag in node_tags:
422 ##                    nodes_id.append(node_tag['node_id'])
423 ##            else:
424 ##                # remove the nodes ids that don't match the new attribute
425 ##                # that is being match
426 ##
427 ##                nodes_id_tmp = []
428 ##                for node_tag in node_tags:
429 ##                    if node_tag['node_id'] in nodes_id:
430 ##                        nodes_id_tmp.append(node_tag['node_id'])
431 ##
432 ##                if len(nodes_id_tmp):
433 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
434 ##                else:
435 ##                    # no node from before match the new constraint
436 ##                    self.fail_discovery()
437 ##        else:
438 ##            # no nodes match the filter applied
439 ##            self.fail_discovery()
440 ##
441 ##        return nodes_id
442 #
443 #    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
444 #        """
445 #        Query PLCAPI for nodes ids matching attributes defined in a certain
446 #        range, by the user
447 #        """
448 #        pass
449 ##        node_tags = self.plapi.get_node_tags(filters)
450 ##        if node_tags:
451 ##            
452 ##            if len(nodes_id) == 0:
453 ##                # first attribute being matched
454 ##                for node_tag in node_tags:
455 ## 
456 ##                   # check that matches the min or max restriction
457 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
458 ##                        float(node_tag['value']) > attr_value:
459 ##                        nodes_id.append(node_tag['node_id'])
460 ##
461 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
462 ##                        float(node_tag['value']) < attr_value:
463 ##                        nodes_id.append(node_tag['node_id'])
464 ##            else:
465 ##
466 ##                # remove the nodes ids that don't match the new attribute
467 ##                # that is being match
468 ##                nodes_id_tmp = []
469 ##                for node_tag in node_tags:
470 ##
471 ##                    # check that matches the min or max restriction and was a
472 ##                    # matching previous filters
473 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
474 ##                        float(node_tag['value']) > attr_value and \
475 ##                        node_tag['node_id'] in nodes_id:
476 ##                        nodes_id_tmp.append(node_tag['node_id'])
477 ##
478 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
479 ##                        float(node_tag['value']) < attr_value and \
480 ##                        node_tag['node_id'] in nodes_id:
481 ##                        nodes_id_tmp.append(node_tag['node_id'])
482 ##
483 ##                if len(nodes_id_tmp):
484 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
485 ##                else:
486 ##                    # no node from before match the new constraint
487 ##                    self.fail_discovery()
488 ##
489 ##        else: #TODO CHECK
490 ##            # no nodes match the filter applied
491 ##            self.fail_discovery()
492 ##
493 ##        return nodes_id
494 #        
495 #    def _choose_random_node(self, nodes):
496 #        """
497 #        From the possible nodes for provision, choose randomly to decrese the
498 #        probability of different RMs choosing the same node for provision
499 #        """
500 #        size = len(nodes)
501 #        while size:
502 #            size = size - 1
503 #            index = randint(0, size)
504 #            node_id = nodes[index]
505 #            nodes[index] = nodes[size]
506 #
507 #            # check the node is not blacklisted or being provision by other RM
508 #            # and perform ping to check that is really alive
509 #            with PlanetlabNode.lock:
510 #
511 #                blist = self.plapi.blacklisted()
512 #                plist = self.plapi.reserved()
513 #                if node_id not in blist and node_id not in plist:
514 #                    ping_ok = self._do_ping(node_id)
515 #                    if not ping_ok:
516 #                        self._set_hostname_attr(node_id)
517 #                        self.warn(" Node not responding PING ")
518 #                        self._blacklist_node(node_id)
519 #                    else:
520 #                        # discovered node for provision, added to provision list
521 #                        self._put_node_in_provision(node_id)
522 #                        return node_id
523 #
524 #    def _get_nodes_id(self, filters=None):
525 #        return self.plapi.get_nodes(filters, fields=['node_id'])
526 #
527     def _add_node_to_slice(self, host_hrn):
528         self.info(" Adding node to slice ")
529         slicename = self.get("username").replace('_', '.')
530         slicename = 'ple.' + slicename
531         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
532
533     def _delete_from_slice(self):
534         self.warning(" Deleting node from slice ")
535         slicename = self.get("username").replace('_', '.')
536         slicename = 'ple.' + slicename
537         self.sfaapi.remove_all_from_slice(slicename)
538
539     def _get_hostname(self):
540         hostname = self.get("hostname")
541         if hostname:
542             return hostname
543         else:
544             return None
545
546     def _set_hostname_attr(self, node):
547         """
548         Query SFAAPI for the hostname of a certain host hrn and sets the
549         attribute hostname, it will over write the previous value
550         """
551         hosts_hrn = self.sfaapi.get_resources_hrn()
552         for hostname, hrn  in hosts_hrn.iteritems():
553             if hrn == node:
554                 self.set("hostname", hostname)
555
556     def _check_if_in_slice(self, hosts_hrn):
557         """
558         Check using SFA API if any host hrn from hosts_hrn is in the user's
559         slice
560         """
561         slicename = self.get("username").replace('_', '.')
562         slicename = 'ple.' + slicename
563         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
564         if slice_nodes:
565             slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes).values()
566         else: slice_nodes_hrn = []
567         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
568         return nodes_inslice
569
570     def _do_ping(self, hostname):
571         """
572         Perform ping command on node's IP matching hostname
573         """
574         ping_ok = False
575         ip = self._get_ip(hostname)
576         if ip:
577             command = "ping -c4 %s" % ip
578             (out, err) = lexec(command)
579
580             m = re.search("(\d+)% packet loss", str(out))
581             if m and int(m.groups()[0]) < 50:
582                 ping_ok = True
583
584         return ping_ok
585
586     def _blacklist_node(self, host_hrn):
587         """
588         Add node mal functioning node to blacklist
589         """
590         self.warning(" Blacklisting malfunctioning node ")
591         self.sfaapi.blacklist_resource(host_hrn)
592         if not self._hostname:
593             self.set('hostname', None)
594
595     def _reserve(self, host_hrn):
596         """
597         Add node to the list of nodes being provisioned, in order for other RMs
598         to not try to provision the same one again.
599         """
600         self.sfaapi.reserve_resource(host_hrn)
601
602     def _get_ip(self, hostname):
603         """
604         Query cache for the IP of a node with certain hostname
605         """
606         try:
607             ip = sshfuncs.gethostbyname(hostname)
608         except:
609             # Fail while trying to find the IP
610             return None
611         return ip
612
613     def fail_discovery(self):
614         msg = "Discovery failed. No candidates found for node"
615         self.error(msg)
616         raise RuntimeError, msg
617
618     def fail_node_not_alive(self, hostname=None):
619         msg = "Node %s not alive" % hostname
620         raise RuntimeError, msg
621     
622     def fail_node_not_available(self, hostname):
623         msg = "Node %s not available for provisioning" % hostname
624         raise RuntimeError, msg
625
626     def fail_not_enough_nodes(self):
627         msg = "Not enough nodes available for provisioning"
628         raise RuntimeError, msg
629
630     def fail_sfaapi(self):
631         msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
632             " attributes sfauser and sfaPrivateKey."
633         raise RuntimeError, msg
634
635     def valid_connection(self, guid):
636         # TODO: Validate!
637         return True
638
639