Merging the openflow part to the nepi-3-dev branch
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import re
30 import weakref
31 import time
32 import socket
33 import threading
34 import datetime
35
36 @clsinit_copy
37 class PlanetlabSfaNode(LinuxNode):
38     _rtype = "PlanetlabSfaNode"
39     _help = "Controls a PlanetLab host accessible using a SSH key " \
40             "and provisioned using SFA"
41     _backend = "planetlab"
42
43     @classmethod
44     def _register_attributes(cls):
45
46         sfa_user = Attribute("sfauser", "SFA user",
47                     flags = Flags.Credential)
48
49         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50                             used to generate the user credential")
51
52         city = Attribute("city", "Constrain location (city) during resource \
53                 discovery. May use wildcards.",
54                 flags = Flags.Filter)
55
56         country = Attribute("country", "Constrain location (country) during \
57                     resource discovery. May use wildcards.",
58                     flags = Flags.Filter)
59
60         region = Attribute("region", "Constrain location (region) during \
61                     resource discovery. May use wildcards.",
62                     flags = Flags.Filter)
63
64         architecture = Attribute("architecture", "Constrain architecture \
65                         during resource discovery.",
66                         type = Types.Enumerate,
67                         allowed = ["x86_64", 
68                                     "i386"],
69                         flags = Flags.Filter)
70
71         operating_system = Attribute("operatingSystem", "Constrain operating \
72                             system during resource discovery.",
73                             type = Types.Enumerate,
74                             allowed =  ["f8",
75                                         "f12",
76                                         "f14",
77                                         "centos",
78                                         "other"],
79                             flags = Flags.Filter)
80
81         min_reliability = Attribute("minReliability", "Constrain reliability \
82                             while picking PlanetLab nodes. Specifies a lower \
83                             acceptable bound.",
84                             type = Types.Double,
85                             range = (1, 100),
86                             flags = Flags.Filter)
87
88         max_reliability = Attribute("maxReliability", "Constrain reliability \
89                             while picking PlanetLab nodes. Specifies an upper \
90                             acceptable bound.",
91                             type = Types.Double,
92                             range = (1, 100),
93                             flags = Flags.Filter)
94
95         min_bandwidth = Attribute("minBandwidth", "Constrain available \
96                             bandwidth while picking PlanetLab nodes. \
97                             Specifies a lower acceptable bound.",
98                             type = Types.Double,
99                             range = (0, 2**31),
100                             flags = Flags.Filter)
101
102         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
103                             bandwidth while picking PlanetLab nodes. \
104                             Specifies an upper acceptable bound.",
105                             type = Types.Double,
106                             range = (0, 2**31),
107                             flags = Flags.Filter)
108
109         min_load = Attribute("minLoad", "Constrain node load average while \
110                     picking PlanetLab nodes. Specifies a lower acceptable \
111                     bound.",
112                     type = Types.Double,
113                     range = (0, 2**31),
114                     flags = Flags.Filter)
115
116         max_load = Attribute("maxLoad", "Constrain node load average while \
117                     picking PlanetLab nodes. Specifies an upper acceptable \
118                     bound.",
119                     type = Types.Double,
120                     range = (0, 2**31),
121                     flags = Flags.Filter)
122
123         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
124                     picking PlanetLab nodes. Specifies a lower acceptable \
125                     bound.",
126                     type = Types.Double,
127                     range = (0, 100),
128                     flags = Flags.Filter)
129
130         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
131                     picking PlanetLab nodes. Specifies an upper acceptable \
132                     bound.",
133                     type = Types.Double,
134                     range = (0, 100),
135                     flags = Flags.Filter)
136
137         timeframe = Attribute("timeframe", "Past time period in which to check\
138                         information about the node. Values are year,month, \
139                         week, latest",
140                         default = "week",
141                         type = Types.Enumerate,
142                         allowed = ["latest",
143                                     "week",
144                                     "month",
145                                     "year"],
146                         flags = Flags.Filter)
147
148         plblacklist = Attribute("persist_blacklist", "Take into account the file plblacklist \
149                         in the user's home directory under .nepi directory. This file \
150                         contains a list of PL nodes to blacklist, and at the end \
151                         of the experiment execution the new blacklisted nodes are added.",
152                     type = Types.Bool,
153                     default = False,
154                     flags = Flags.Global)
155
156         cls._register_attribute(sfa_user)
157         cls._register_attribute(sfa_private_key)
158         cls._register_attribute(city)
159         cls._register_attribute(country)
160         cls._register_attribute(region)
161         cls._register_attribute(architecture)
162         cls._register_attribute(operating_system)
163         cls._register_attribute(min_reliability)
164         cls._register_attribute(max_reliability)
165         cls._register_attribute(min_bandwidth)
166         cls._register_attribute(max_bandwidth)
167         cls._register_attribute(min_load)
168         cls._register_attribute(max_load)
169         cls._register_attribute(min_cpu)
170         cls._register_attribute(max_cpu)
171         cls._register_attribute(timeframe)
172         cls._register_attribute(plblacklist)
173
174     def __init__(self, ec, guid):
175         super(PlanetlabSfaNode, self).__init__(ec, guid)
176         
177         self._ecobj = weakref.ref(ec)
178         self._sfaapi = None
179         self._node_to_provision = None
180         self._slicenode = False
181         self._hostname = False
182
183         if self.get("gateway") or self.get("gatewayUser"):
184             self.set("gateway", None)
185             self.set("gatewayUser", None)
186
187     def _skip_provision(self):
188         sfa_user = self.get("sfauser")
189         if not sfa_user:
190             return True
191         else: return False
192     
193     @property
194     def sfaapi(self):
195         if not self._sfaapi:
196             sfa_user = self.get("sfauser")
197             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
198             sfa_auth = '.'.join(sfa_user.split('.')[:2])
199             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
200             sfa_private_key = self.get("sfaPrivateKey")
201
202             _sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
203                 sfa_registry, sfa_sm, sfa_private_key, self._ecobj())
204
205             if not _sfaapi:
206                 self.fail_sfaapi()
207     
208             self._sfaapi = weakref.ref(_sfaapi)
209
210         return self._sfaapi()
211
212     def do_discover(self):
213         """
214         Based on the attributes defined by the user, discover the suitable 
215         nodes for provision.
216         """
217         if self._skip_provision():
218             super(PlanetlabSfaNode, self).do_discover()
219             return
220
221         nodes = self.sfaapi.get_resources_hrn()
222
223         hostname = self._get_hostname()
224         if hostname:
225             # the user specified one particular node to be provisioned
226             self._hostname = True
227             host_hrn = nodes[hostname]
228
229             # check that the node is not blacklisted or being provisioned
230             # by other RM
231             if not self._blacklisted(host_hrn):
232                 if not self._reserved(host_hrn):
233                     # Node in reservation
234                     ping_ok = self._do_ping(hostname)
235                     if not ping_ok:
236                         self._blacklist_node(host_hrn)
237                         self.fail_node_not_alive(hostname)
238                     else:
239                         if self._check_if_in_slice([host_hrn]):
240                             self._slicenode = True
241                         self._node_to_provision = host_hrn
242                         super(PlanetlabSfaNode, self).do_discover()
243         
244 #        else:
245 #            # the user specifies constraints based on attributes, zero, one or 
246 #            # more nodes can match these constraints 
247 #            nodes = self._filter_based_on_attributes()
248 #
249 #            # nodes that are already part of user's slice have the priority to
250 #            # provisioned
251 #            nodes_inslice = self._check_if_in_slice(nodes)
252 #            nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
253 #            
254 #            node_id = None
255 #            if nodes_inslice:
256 #                node_id = self._choose_random_node(nodes_inslice)
257 #                self._slicenode = True                
258 #                
259 #            if not node_id:
260 #                # Either there were no matching nodes in the user's slice, or
261 #                # the nodes in the slice  were blacklisted or being provisioned
262 #                # by other RM. Note nodes_not_inslice is never empty
263 #                node_id = self._choose_random_node(nodes_not_inslice)
264 #                self._slicenode = False
265 #
266 #            if node_id:
267 #                self._node_to_provision = node_id
268 #                try:
269 #                    self._set_hostname_attr(node_id)
270 #                    self.info(" Selected node to provision ")
271 #                    super(PlanetlabSfaNode, self).do_discover()
272 #                except:
273 #                    with PlanetlabSfaNode.lock:
274 #                        self._blacklist_node(node_id)
275 #                    self.do_discover()
276 #            else:
277 #               self.fail_not_enough_nodes() 
278 #    
279     def _blacklisted(self, host_hrn):
280         if self.sfaapi.blacklisted(host_hrn):
281            self.fail_node_not_available(hostname)
282         return False
283
284     def _reserved(self, host_hrn):
285         if self.sfaapi.reserved(host_hrn):
286             self.fail_node_not_available(hostname)
287         return False
288             
289     def do_provision(self):
290         """
291         Add node to user's slice after verifing that the node is functioning
292         correctly.
293         """
294         if self._skip_provision():
295             super(PlanetlabSfaNode, self).do_provision()
296             return
297
298         provision_ok = False
299         ssh_ok = False
300         proc_ok = False
301         timeout = 1800
302
303         while not provision_ok:
304             node = self._node_to_provision
305             if not self._slicenode:
306                 self._add_node_to_slice(node)
307             
308                 # check ssh connection
309                 t = 0 
310                 while t < timeout and not ssh_ok:
311
312                     cmd = 'echo \'GOOD NODE\''
313                     ((out, err), proc) = self.execute(cmd)
314                     if out.find("GOOD NODE") < 0:
315                         self.debug( "No SSH connection, waiting 60s" )
316                         t = t + 60
317                         time.sleep(60)
318                         continue
319                     else:
320                         self.debug( "SSH OK" )
321                         ssh_ok = True
322                         continue
323             else:
324                 cmd = 'echo \'GOOD NODE\''
325                 ((out, err), proc) = self.execute(cmd)
326                 if not out.find("GOOD NODE") < 0:
327                     ssh_ok = True
328
329             if not ssh_ok:
330                 # the timeout was reach without establishing ssh connection
331                 # the node is blacklisted, deleted from the slice, and a new
332                 # node to provision is discovered
333                 self.warn(" Could not SSH login ")
334                 self._blacklist_node(node)
335                 self.do_discover()
336                 continue
337             
338             # check /proc directory is mounted (ssh_ok = True)
339             # and file system is not read only
340             else:
341                 cmd = 'mount |grep proc'
342                 ((out1, err1), proc1) = self.execute(cmd)
343                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
344                 ((out2, err2), proc2) = self.execute(cmd)
345                 if out1.find("/proc type proc") < 0 or \
346                     "Read-only file system".lower() in err2.lower():
347                     self.warn(" Corrupted file system ")
348                     self._blacklist_node(node)
349                     self.do_discover()
350                     continue
351             
352                 else:
353                     provision_ok = True
354                     if not self.get('hostname'):
355                         self._set_hostname_attr(node)            
356                     self.info(" Node provisioned ")            
357             
358         super(PlanetlabSfaNode, self).do_provision()
359
360 #    def _filter_based_on_attributes(self):
361 #        """
362 #        Retrive the list of nodes hrn that match user's constraints 
363 #        """
364 #        # Map user's defined attributes with tagnames of PlanetLab
365 #        timeframe = self.get("timeframe")[0]
366 #        attr_to_tags = {
367 #            'city' : 'city',
368 #            'country' : 'country',
369 #            'region' : 'region',
370 #            'architecture' : 'arch',
371 #            'operatingSystem' : 'fcdistro',
372 #            'minReliability' : 'reliability%s' % timeframe,
373 #            'maxReliability' : 'reliability%s' % timeframe,
374 #            'minBandwidth' : 'bw%s' % timeframe,
375 #            'maxBandwidth' : 'bw%s' % timeframe,
376 #            'minLoad' : 'load%s' % timeframe,
377 #            'maxLoad' : 'load%s' % timeframe,
378 #            'minCpu' : 'cpu%s' % timeframe,
379 #            'maxCpu' : 'cpu%s' % timeframe,
380 #        }
381 #        
382 #        nodes_hrn = []
383 #        filters = {}
384 #
385 #        for attr_name, attr_obj in self._attrs.iteritems():
386 #            attr_value = self.get(attr_name)
387 #            
388 #            if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
389 #                attr_name != 'timeframe':
390 #        
391 #                attr_tag = attr_to_tags[attr_name]
392 #                filters['tagname'] = attr_tag
393 #
394 #                # filter nodes by fixed constraints e.g. operating system
395 #                if not 'min' in attr_name and not 'max' in attr_name:
396 #                    filters['value'] = attr_value
397 #                    nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
398 #
399 #                # filter nodes by range constraints e.g. max bandwidth
400 #                elif ('min' or 'max') in attr_name:
401 #                    nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
402 #
403 #        if not filters:
404 #            nodes = self.sfaapi.get_resources_hrn()
405 #            for node in nodes:
406 #                nodes_hrn.append(node[node.key()])
407 #        return nodes_hrn
408 #                    
409 #    def _filter_by_fixed_attr(self, filters, nodes_hrn):
410 #        """
411 #        Query SFA API for nodes matching fixed attributes defined by the
412 #        user
413 #        """
414 #        pass
415 ##        node_tags = self.sfaapi.get_resources_tags(filters)
416 ##        if node_tags is not None:
417 ##
418 ##            if len(nodes_id) == 0:
419 ##                # first attribute being matched
420 ##                for node_tag in node_tags:
421 ##                    nodes_id.append(node_tag['node_id'])
422 ##            else:
423 ##                # remove the nodes ids that don't match the new attribute
424 ##                # that is being match
425 ##
426 ##                nodes_id_tmp = []
427 ##                for node_tag in node_tags:
428 ##                    if node_tag['node_id'] in nodes_id:
429 ##                        nodes_id_tmp.append(node_tag['node_id'])
430 ##
431 ##                if len(nodes_id_tmp):
432 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
433 ##                else:
434 ##                    # no node from before match the new constraint
435 ##                    self.fail_discovery()
436 ##        else:
437 ##            # no nodes match the filter applied
438 ##            self.fail_discovery()
439 ##
440 ##        return nodes_id
441 #
442 #    def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
443 #        """
444 #        Query PLCAPI for nodes ids matching attributes defined in a certain
445 #        range, by the user
446 #        """
447 #        pass
448 ##        node_tags = self.plapi.get_node_tags(filters)
449 ##        if node_tags:
450 ##            
451 ##            if len(nodes_id) == 0:
452 ##                # first attribute being matched
453 ##                for node_tag in node_tags:
454 ## 
455 ##                   # check that matches the min or max restriction
456 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
457 ##                        float(node_tag['value']) > attr_value:
458 ##                        nodes_id.append(node_tag['node_id'])
459 ##
460 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
461 ##                        float(node_tag['value']) < attr_value:
462 ##                        nodes_id.append(node_tag['node_id'])
463 ##            else:
464 ##
465 ##                # remove the nodes ids that don't match the new attribute
466 ##                # that is being match
467 ##                nodes_id_tmp = []
468 ##                for node_tag in node_tags:
469 ##
470 ##                    # check that matches the min or max restriction and was a
471 ##                    # matching previous filters
472 ##                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
473 ##                        float(node_tag['value']) > attr_value and \
474 ##                        node_tag['node_id'] in nodes_id:
475 ##                        nodes_id_tmp.append(node_tag['node_id'])
476 ##
477 ##                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
478 ##                        float(node_tag['value']) < attr_value and \
479 ##                        node_tag['node_id'] in nodes_id:
480 ##                        nodes_id_tmp.append(node_tag['node_id'])
481 ##
482 ##                if len(nodes_id_tmp):
483 ##                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
484 ##                else:
485 ##                    # no node from before match the new constraint
486 ##                    self.fail_discovery()
487 ##
488 ##        else: #TODO CHECK
489 ##            # no nodes match the filter applied
490 ##            self.fail_discovery()
491 ##
492 ##        return nodes_id
493 #        
494 #    def _choose_random_node(self, nodes):
495 #        """
496 #        From the possible nodes for provision, choose randomly to decrese the
497 #        probability of different RMs choosing the same node for provision
498 #        """
499 #        size = len(nodes)
500 #        while size:
501 #            size = size - 1
502 #            index = randint(0, size)
503 #            node_id = nodes[index]
504 #            nodes[index] = nodes[size]
505 #
506 #            # check the node is not blacklisted or being provision by other RM
507 #            # and perform ping to check that is really alive
508 #            with PlanetlabNode.lock:
509 #
510 #                blist = self.plapi.blacklisted()
511 #                plist = self.plapi.reserved()
512 #                if node_id not in blist and node_id not in plist:
513 #                    ping_ok = self._do_ping(node_id)
514 #                    if not ping_ok:
515 #                        self._set_hostname_attr(node_id)
516 #                        self.warn(" Node not responding PING ")
517 #                        self._blacklist_node(node_id)
518 #                    else:
519 #                        # discovered node for provision, added to provision list
520 #                        self._put_node_in_provision(node_id)
521 #                        return node_id
522 #
523 #    def _get_nodes_id(self, filters=None):
524 #        return self.plapi.get_nodes(filters, fields=['node_id'])
525 #
526     def _add_node_to_slice(self, host_hrn):
527         self.info(" Adding node to slice ")
528         slicename = self.get("username").replace('_', '.')
529         slicename = 'ple.' + slicename
530         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
531
532 #    def _delete_node_from_slice(self, node):
533 #        self.warn(" Deleting node from slice ")
534 #        slicename = self.get("username")
535 #        self.plapi.delete_slice_node(slicename, [node])
536 #
537     def _get_hostname(self):
538         hostname = self.get("hostname")
539         if hostname:
540             return hostname
541         else:
542             return None
543
544     def _set_hostname_attr(self, node):
545         """
546         Query SFAAPI for the hostname of a certain host hrn and sets the
547         attribute hostname, it will over write the previous value
548         """
549         hosts_hrn = self.sfaapi.get_resources_hrn()
550         for hostname, hrn  in hosts_hrn.iteritems():
551             if hrn == node:
552                 self.set("hostname", hostname)
553
554     def _check_if_in_slice(self, hosts_hrn):
555         """
556         Check using SFA API if any host hrn from hosts_hrn is in the user's
557         slice
558         """
559         slicename = self.get("username").replace('_', '.')
560         slicename = 'ple.' + slicename
561         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
562         slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes)
563         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn.values()))
564         return nodes_inslice
565
566     def _do_ping(self, hostname):
567         """
568         Perform ping command on node's IP matching hostname
569         """
570         ping_ok = False
571         ip = self._get_ip(hostname)
572         if ip:
573             command = "ping -c4 %s" % ip
574             (out, err) = lexec(command)
575
576             m = re.search("(\d+)% packet loss", str(out))
577             if m and int(m.groups()[0]) < 50:
578                 ping_ok = True
579
580         return ping_ok
581
582     def _blacklist_node(self, host_hrn):
583         """
584         Add node mal functioning node to blacklist
585         """
586         self.warning(" Blacklisting malfunctioning node ")
587         self.sfaapi.blacklist_resource(host_hrn)
588         if not self._hostname:
589             self.set('hostname', None)
590
591     def _reserve(self, host_hrn):
592         """
593         Add node to the list of nodes being provisioned, in order for other RMs
594         to not try to provision the same one again.
595         """
596         self.sfaapi.reserve_resource(host_hrn)
597
598     def _get_ip(self, hostname):
599         """
600         Query cache for the IP of a node with certain hostname
601         """
602         try:
603             ip = sshfuncs.gethostbyname(hostname)
604         except:
605             # Fail while trying to find the IP
606             return None
607         return ip
608
609     def fail_discovery(self):
610         msg = "Discovery failed. No candidates found for node"
611         self.error(msg)
612         raise RuntimeError, msg
613
614     def fail_node_not_alive(self, hostname=None):
615         msg = "Node %s not alive" % hostname
616         raise RuntimeError, msg
617     
618     def fail_node_not_available(self, hostname):
619         msg = "Node %s not available for provisioning" % hostname
620         raise RuntimeError, msg
621
622     def fail_not_enough_nodes(self):
623         msg = "Not enough nodes available for provisioning"
624         raise RuntimeError, msg
625
626     def fail_sfaapi(self):
627         msg = "Failing while trying to instanciate the SFA API.\nSet the" + \
628             " attributes sfauser and sfaPrivateKey."
629         raise RuntimeError, msg
630
631     def valid_connection(self, guid):
632         # TODO: Validate!
633         return True
634
635