Adding sfa support ple using hostname
[nepi.git] / src / nepi / resources / planetlab / sfa_node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
19
20 from nepi.execution.attribute import Attribute, Flags, Types
21 from nepi.execution.resource import ResourceManager, clsinit_copy, \
22         ResourceState, reschedule_delay 
23 from nepi.resources.linux.node import LinuxNode
24 from nepi.util.sfaapi import SFAAPIFactory 
25 from nepi.util.execfuncs import lexec
26 from nepi.util import sshfuncs
27
28 from random import randint
29 import time
30 import socket
31 import threading
32 import datetime
33
34 @clsinit_copy
35 class PlanetlabSfaNode(LinuxNode):
36     _rtype = "PlanetlabSfaNode"
37     _help = "Controls a PlanetLab host accessible using a SSH key " \
38             "and provisioned using SFA"
39     _backend = "planetlab"
40
41     lock = threading.Lock()
42
43     @classmethod
44     def _register_attributes(cls):
45
46         sfa_user = Attribute("sfauser", "SFA user",
47                     flags = Flags.Credential)
48
49         sfa_private_key = Attribute("sfaPrivateKey", "SFA path to the private key \
50                             used to generate the user credential")
51
52         city = Attribute("city", "Constrain location (city) during resource \
53                 discovery. May use wildcards.",
54                 flags = Flags.Filter)
55
56         country = Attribute("country", "Constrain location (country) during \
57                     resource discovery. May use wildcards.",
58                     flags = Flags.Filter)
59
60         region = Attribute("region", "Constrain location (region) during \
61                     resource discovery. May use wildcards.",
62                     flags = Flags.Filter)
63
64         architecture = Attribute("architecture", "Constrain architecture \
65                         during resource discovery.",
66                         type = Types.Enumerate,
67                         allowed = ["x86_64", 
68                                     "i386"],
69                         flags = Flags.Filter)
70
71         operating_system = Attribute("operatingSystem", "Constrain operating \
72                             system during resource discovery.",
73                             type = Types.Enumerate,
74                             allowed =  ["f8",
75                                         "f12",
76                                         "f14",
77                                         "centos",
78                                         "other"],
79                             flags = Flags.Filter)
80
81         min_reliability = Attribute("minReliability", "Constrain reliability \
82                             while picking PlanetLab nodes. Specifies a lower \
83                             acceptable bound.",
84                             type = Types.Double,
85                             range = (1, 100),
86                             flags = Flags.Filter)
87
88         max_reliability = Attribute("maxReliability", "Constrain reliability \
89                             while picking PlanetLab nodes. Specifies an upper \
90                             acceptable bound.",
91                             type = Types.Double,
92                             range = (1, 100),
93                             flags = Flags.Filter)
94
95         min_bandwidth = Attribute("minBandwidth", "Constrain available \
96                             bandwidth while picking PlanetLab nodes. \
97                             Specifies a lower acceptable bound.",
98                             type = Types.Double,
99                             range = (0, 2**31),
100                             flags = Flags.Filter)
101
102         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
103                             bandwidth while picking PlanetLab nodes. \
104                             Specifies an upper acceptable bound.",
105                             type = Types.Double,
106                             range = (0, 2**31),
107                             flags = Flags.Filter)
108
109         min_load = Attribute("minLoad", "Constrain node load average while \
110                     picking PlanetLab nodes. Specifies a lower acceptable \
111                     bound.",
112                     type = Types.Double,
113                     range = (0, 2**31),
114                     flags = Flags.Filter)
115
116         max_load = Attribute("maxLoad", "Constrain node load average while \
117                     picking PlanetLab nodes. Specifies an upper acceptable \
118                     bound.",
119                     type = Types.Double,
120                     range = (0, 2**31),
121                     flags = Flags.Filter)
122
123         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
124                     picking PlanetLab nodes. Specifies a lower acceptable \
125                     bound.",
126                     type = Types.Double,
127                     range = (0, 100),
128                     flags = Flags.Filter)
129
130         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
131                     picking PlanetLab nodes. Specifies an upper acceptable \
132                     bound.",
133                     type = Types.Double,
134                     range = (0, 100),
135                     flags = Flags.Filter)
136
137         timeframe = Attribute("timeframe", "Past time period in which to check\
138                         information about the node. Values are year,month, \
139                         week, latest",
140                         default = "week",
141                         type = Types.Enumerate,
142                         allowed = ["latest",
143                                     "week",
144                                     "month",
145                                     "year"],
146                         flags = Flags.Filter)
147
148 #        plblacklist = Attribute("blacklist", "Take into account the file plblacklist \
149 #                        in the user's home directory under .nepi directory. This file \
150 #                        contains a list of PL nodes to blacklist, and at the end \
151 #                        of the experiment execution the new blacklisted nodes are added.",
152 #                    type = Types.Bool,
153 #                    default = True,
154 #                    flags = Flags.ReadOnly)
155 #
156
157         cls._register_attribute(sfa_user)
158         cls._register_attribute(sfa_private_key)
159         cls._register_attribute(city)
160         cls._register_attribute(country)
161         cls._register_attribute(region)
162         cls._register_attribute(architecture)
163         cls._register_attribute(operating_system)
164         cls._register_attribute(min_reliability)
165         cls._register_attribute(max_reliability)
166         cls._register_attribute(min_bandwidth)
167         cls._register_attribute(max_bandwidth)
168         cls._register_attribute(min_load)
169         cls._register_attribute(max_load)
170         cls._register_attribute(min_cpu)
171         cls._register_attribute(max_cpu)
172         cls._register_attribute(timeframe)
173
174     def __init__(self, ec, guid):
175         super(PlanetlabSfaNode, self).__init__(ec, guid)
176
177         self._sfaapi = None
178         self._node_to_provision = None
179         self._slicenode = False
180         self._hostname = False
181
182         if self.get("gateway") or self.get("gatewayUser"):
183             self.set("gateway", None)
184             self.set("gatewayUser", None)
185
186     def _skip_provision(self):
187         sfa_user = self.get("sfauser")
188         if not sfa_user:
189             return True
190         else: return False
191     
192     @property
193     def sfaapi(self):
194         if not self._sfaapi:
195             sfa_user = self.get("sfauser")
196             sfa_sm = "http://sfa3.planet-lab.eu:12346/"
197             sfa_auth = '.'.join(sfa_user.split('.')[:2])
198             sfa_registry = "http://sfa3.planet-lab.eu:12345/"
199             sfa_private_key = self.get("sfaPrivateKey")
200
201             self._sfaapi = SFAAPIFactory.get_api(sfa_user, sfa_auth, 
202                 sfa_registry, sfa_sm, sfa_private_key)
203             
204             if not self._sfaapi:
205                 self.fail_sfaapi()
206
207         return self._sfaapi
208
209     def do_discover(self):
210         """
211         Based on the attributes defined by the user, discover the suitable 
212         nodes for provision.
213         """
214         if self._skip_provision():
215             super(PlanetlabSfaNode, self).do_discover()
216             return
217
218         nodes = self.sfaapi.get_resources_hrn()
219
220         hostname = self._get_hostname()
221         if hostname:
222             # the user specified one particular node to be provisioned
223             self._hostname = True
224             host_hrn = nodes[hostname]
225             print host_hrn
226
227             # check that the node is not blacklisted or being provisioned
228             # by other RM
229             with PlanetlabSfaNode.lock:
230                 plist = self.sfaapi.reserved()
231                 blist = self.sfaapi.blacklisted()
232                 if host_hrn not in blist and host_hrn not in plist:
233                 
234                     # check that is really alive, by performing ping
235                     ping_ok = self._do_ping(hostname)
236                     if not ping_ok:
237                         self._blacklist_node(host_hrn)
238                         self.fail_node_not_alive(hostname)
239                     else:
240                         if self._check_if_in_slice([host_hrn]):
241                             self._slicenode = True
242                         self._put_node_in_provision(host_hrn)
243                         self._node_to_provision = host_hrn
244                 else:
245                     self.fail_node_not_available(hostname)
246             super(PlanetlabSfaNode, self).do_discover()
247         
248         else:
249             # the user specifies constraints based on attributes, zero, one or 
250             # more nodes can match these constraints 
251             nodes = self._filter_based_on_attributes()
252
253             # nodes that are already part of user's slice have the priority to
254             # provisioned
255             nodes_inslice = self._check_if_in_slice(nodes)
256             nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
257             
258             node_id = None
259             if nodes_inslice:
260                 node_id = self._choose_random_node(nodes_inslice)
261                 self._slicenode = True                
262                 
263             if not node_id:
264                 # Either there were no matching nodes in the user's slice, or
265                 # the nodes in the slice  were blacklisted or being provisioned
266                 # by other RM. Note nodes_not_inslice is never empty
267                 node_id = self._choose_random_node(nodes_not_inslice)
268                 self._slicenode = False
269
270             if node_id:
271                 self._node_to_provision = node_id
272                 try:
273                     self._set_hostname_attr(node_id)
274                     self.info(" Selected node to provision ")
275                     super(PlanetlabSfaNode, self).do_discover()
276                 except:
277                     with PlanetlabSfaNode.lock:
278                         self._blacklist_node(node_id)
279                     self.do_discover()
280             else:
281                self.fail_not_enough_nodes() 
282             
283     def do_provision(self):
284         """
285         Add node to user's slice after verifing that the node is functioning
286         correctly.
287         """
288         if self._skip_provision():
289             super(PlanetlabSfaNode, self).do_provision()
290             return
291
292         provision_ok = False
293         ssh_ok = False
294         proc_ok = False
295         timeout = 1800
296
297         while not provision_ok:
298             node = self._node_to_provision
299             if not self._slicenode:
300                 self._add_node_to_slice(node)
301             
302                 # check ssh connection
303                 t = 0 
304                 while t < timeout and not ssh_ok:
305
306                     cmd = 'echo \'GOOD NODE\''
307                     ((out, err), proc) = self.execute(cmd)
308                     if out.find("GOOD NODE") < 0:
309                         t = t + 60
310                         time.sleep(60)
311                         continue
312                     else:
313                         ssh_ok = True
314                         continue
315             else:
316                 cmd = 'echo \'GOOD NODE\''
317                 ((out, err), proc) = self.execute(cmd)
318                 if not out.find("GOOD NODE") < 0:
319                     ssh_ok = True
320
321             if not ssh_ok:
322                 # the timeout was reach without establishing ssh connection
323                 # the node is blacklisted, deleted from the slice, and a new
324                 # node to provision is discovered
325                 with PlanetlabSfaNode.lock:
326                     self.warn(" Could not SSH login ")
327                     self._blacklist_node(node)
328                     #self._delete_node_from_slice(node)
329                 self.do_discover()
330                 continue
331             
332             # check /proc directory is mounted (ssh_ok = True)
333             # and file system is not read only
334             else:
335                 cmd = 'mount |grep proc'
336                 ((out1, err1), proc1) = self.execute(cmd)
337                 cmd = 'touch /tmp/tmpfile; rm /tmp/tmpfile'
338                 ((out2, err2), proc2) = self.execute(cmd)
339                 if out1.find("/proc type proc") < 0 or \
340                     "Read-only file system".lower() in err2.lower():
341                     with PlanetlabNode.lock:
342                         self.warn(" Corrupted file system ")
343                         self._blacklist_node(node)
344                         #self._delete_node_from_slice(node)
345                     self.do_discover()
346                     continue
347             
348                 else:
349                     provision_ok = True
350                     if not self.get('hostname'):
351                         self._set_hostname_attr(node)            
352                     self.info(" Node provisioned ")            
353             
354         super(PlanetlabSfaNode, self).do_provision()
355
356     def _filter_based_on_attributes(self):
357         """
358         Retrive the list of nodes hrn that match user's constraints 
359         """
360         # Map user's defined attributes with tagnames of PlanetLab
361         timeframe = self.get("timeframe")[0]
362         attr_to_tags = {
363             'city' : 'city',
364             'country' : 'country',
365             'region' : 'region',
366             'architecture' : 'arch',
367             'operatingSystem' : 'fcdistro',
368             'minReliability' : 'reliability%s' % timeframe,
369             'maxReliability' : 'reliability%s' % timeframe,
370             'minBandwidth' : 'bw%s' % timeframe,
371             'maxBandwidth' : 'bw%s' % timeframe,
372             'minLoad' : 'load%s' % timeframe,
373             'maxLoad' : 'load%s' % timeframe,
374             'minCpu' : 'cpu%s' % timeframe,
375             'maxCpu' : 'cpu%s' % timeframe,
376         }
377         
378         nodes_hrn = []
379         filters = {}
380
381         for attr_name, attr_obj in self._attrs.iteritems():
382             attr_value = self.get(attr_name)
383             
384             if attr_value is not None and attr_obj.has_flag(Flags.Filter) and \
385                 attr_name != 'timeframe':
386         
387                 attr_tag = attr_to_tags[attr_name]
388                 filters['tagname'] = attr_tag
389
390                 # filter nodes by fixed constraints e.g. operating system
391                 if not 'min' in attr_name and not 'max' in attr_name:
392                     filters['value'] = attr_value
393                     nodes_hrn = self._filter_by_fixed_attr(filters, nodes_hrn)
394
395                 # filter nodes by range constraints e.g. max bandwidth
396                 elif ('min' or 'max') in attr_name:
397                     nodes_hrn = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_hrn)
398
399         if not filters:
400             nodes = self.sfaapi.get_resources_hrn()
401             for node in nodes:
402                 nodes_hrn.append(node[node.key()])
403         return nodes_hrn
404                     
405     def _filter_by_fixed_attr(self, filters, nodes_hrn):
406         """
407         Query SFA API for nodes matching fixed attributes defined by the
408         user
409         """
410         pass
411 #        node_tags = self.sfaapi.get_resources_tags(filters)
412 #        if node_tags is not None:
413 #
414 #            if len(nodes_id) == 0:
415 #                # first attribute being matched
416 #                for node_tag in node_tags:
417 #                    nodes_id.append(node_tag['node_id'])
418 #            else:
419 #                # remove the nodes ids that don't match the new attribute
420 #                # that is being match
421 #
422 #                nodes_id_tmp = []
423 #                for node_tag in node_tags:
424 #                    if node_tag['node_id'] in nodes_id:
425 #                        nodes_id_tmp.append(node_tag['node_id'])
426 #
427 #                if len(nodes_id_tmp):
428 #                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
429 #                else:
430 #                    # no node from before match the new constraint
431 #                    self.fail_discovery()
432 #        else:
433 #            # no nodes match the filter applied
434 #            self.fail_discovery()
435 #
436 #        return nodes_id
437
438     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
439         """
440         Query PLCAPI for nodes ids matching attributes defined in a certain
441         range, by the user
442         """
443         pass
444 #        node_tags = self.plapi.get_node_tags(filters)
445 #        if node_tags:
446 #            
447 #            if len(nodes_id) == 0:
448 #                # first attribute being matched
449 #                for node_tag in node_tags:
450
451 #                   # check that matches the min or max restriction
452 #                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
453 #                        float(node_tag['value']) > attr_value:
454 #                        nodes_id.append(node_tag['node_id'])
455 #
456 #                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
457 #                        float(node_tag['value']) < attr_value:
458 #                        nodes_id.append(node_tag['node_id'])
459 #            else:
460 #
461 #                # remove the nodes ids that don't match the new attribute
462 #                # that is being match
463 #                nodes_id_tmp = []
464 #                for node_tag in node_tags:
465 #
466 #                    # check that matches the min or max restriction and was a
467 #                    # matching previous filters
468 #                    if 'min' in attr_name and node_tag['value'] != 'n/a' and \
469 #                        float(node_tag['value']) > attr_value and \
470 #                        node_tag['node_id'] in nodes_id:
471 #                        nodes_id_tmp.append(node_tag['node_id'])
472 #
473 #                    elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
474 #                        float(node_tag['value']) < attr_value and \
475 #                        node_tag['node_id'] in nodes_id:
476 #                        nodes_id_tmp.append(node_tag['node_id'])
477 #
478 #                if len(nodes_id_tmp):
479 #                    nodes_id = set(nodes_id) & set(nodes_id_tmp)
480 #                else:
481 #                    # no node from before match the new constraint
482 #                    self.fail_discovery()
483 #
484 #        else: #TODO CHECK
485 #            # no nodes match the filter applied
486 #            self.fail_discovery()
487 #
488 #        return nodes_id
489         
490     def _choose_random_node(self, nodes):
491         """
492         From the possible nodes for provision, choose randomly to decrese the
493         probability of different RMs choosing the same node for provision
494         """
495         size = len(nodes)
496         while size:
497             size = size - 1
498             index = randint(0, size)
499             node_id = nodes[index]
500             nodes[index] = nodes[size]
501
502             # check the node is not blacklisted or being provision by other RM
503             # and perform ping to check that is really alive
504             with PlanetlabNode.lock:
505
506                 blist = self.plapi.blacklisted()
507                 plist = self.plapi.reserved()
508                 if node_id not in blist and node_id not in plist:
509                     ping_ok = self._do_ping(node_id)
510                     if not ping_ok:
511                         self._set_hostname_attr(node_id)
512                         self.warn(" Node not responding PING ")
513                         self._blacklist_node(node_id)
514                     else:
515                         # discovered node for provision, added to provision list
516                         self._put_node_in_provision(node_id)
517                         return node_id
518
519     def _get_nodes_id(self, filters=None):
520         return self.plapi.get_nodes(filters, fields=['node_id'])
521
522     def _add_node_to_slice(self, host_hrn):
523         self.info(" Adding node to slice ")
524         slicename = self.get("username").replace('_', '.')
525         slicename = 'ple.' + slicename
526         self.sfaapi.add_resource_to_slice(slicename, host_hrn)
527
528     def _delete_node_from_slice(self, node):
529         self.warn(" Deleting node from slice ")
530         slicename = self.get("username")
531         self.plapi.delete_slice_node(slicename, [node])
532
533     def _get_hostname(self):
534         hostname = self.get("hostname")
535         if hostname:
536             return hostname
537         else:
538             return None
539
540     def _set_hostname_attr(self, node):
541         """
542         Query PLCAPI for the hostname of a certain node id and sets the
543         attribute hostname, it will over write the previous value
544         """
545         hostname = self.plapi.get_nodes(node, ['hostname'])
546         self.set("hostname", hostname[0]['hostname'])
547
548     def _check_if_in_slice(self, hosts_hrn):
549         """
550         Check using SFA API if any host hrn from hosts_hrn is in the user's
551         slice
552         """
553         slicename = self.get("username").replace('_', '.')
554         slicename = 'ple.' + slicename
555         slice_nodes = self.sfaapi.get_slice_resources(slicename)['resource']
556         slice_nodes_hrn = self.sfaapi.get_resources_hrn(slice_nodes)
557         nodes_inslice = list(set(hosts_hrn) & set(slice_nodes_hrn))
558         return nodes_inslice
559
560     def _do_ping(self, hostname):
561         """
562         Perform ping command on node's IP matching hostname
563         """
564         ping_ok = False
565         ip = self._get_ip(hostname)
566         if not ip: return ping_ok
567
568         command = "ping -c4 %s" % ip
569
570         (out, err) = lexec(command)
571         if not str(out).find("2 received") < 0 or not str(out).find("3 received") < 0 or not \
572             str(out).find("4 received") < 0:
573             ping_ok = True
574        
575         return ping_ok 
576
577     def _blacklist_node(self, host_hrn):
578         """
579         Add node mal functioning node to blacklist
580         """
581         self.warn(" Blacklisting malfunctioning node ")
582         self.sfaapi.blacklist_resource(host_hrn)
583         if not self._hostname:
584             self.set('hostname', None)
585
586     def _put_node_in_provision(self, host_hrn):
587         """
588         Add node to the list of nodes being provisioned, in order for other RMs
589         to not try to provision the same one again
590         """
591         self.sfaapi.reserve_resource(host_hrn)
592
593     def _get_ip(self, hostname):
594         """
595         Query PLCAPI for the IP of a node with certain node id
596         """
597         try:
598             ip = sshfuncs.gethostbyname(hostname)
599         except:
600             # Fail while trying to find the IP
601             return None
602         return ip
603
604     def _get_nodes_hrn(self):
605         nodes = self.sfaapi.get_resouces_hrn()
606
607         
608
609     def fail_discovery(self):
610         msg = "Discovery failed. No candidates found for node"
611         self.error(msg)
612         raise RuntimeError, msg
613
614     def fail_node_not_alive(self, hostname=None):
615         msg = "Node %s not alive" % hostname
616         raise RuntimeError, msg
617     
618     def fail_node_not_available(self, hostname):
619         msg = "Node %s not available for provisioning" % hostname
620         raise RuntimeError, msg
621
622     def fail_not_enough_nodes(self):
623         msg = "Not enough nodes available for provisioning"
624         raise RuntimeError, msg
625
626     def fail_plapi(self):
627         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
628             " attributes pluser and plpassword."
629         raise RuntimeError, msg
630
631     def valid_connection(self, guid):
632         # TODO: Validate!
633         return True
634
635