Setting gateway to None, change in command for lexec
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import socket
32 import threading
33 import datetime
34
35 @clsinit_copy
36 class PlanetlabNode(LinuxNode):
37     _rtype = "PlanetlabNode"
38     _help = "Controls a PlanetLab host accessible using a SSH key " \
39             "associated to a PlanetLab user account"
40     _backend = "planetlab"
41
42     lock = threading.Lock()
43
44     @classmethod
45     def _register_attributes(cls):
46         ip = Attribute("ip", "PlanetLab host public IP address",
47                 flags = Flags.ReadOnly)
48
49         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
50                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
51                     default = "www.planet-lab.eu",
52                     flags = Flags.Credential)
53
54         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
55                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
56                     default = "https://%(hostname)s:443/PLCAPI/",
57                     flags = Flags.ExecReadOnly)
58     
59         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
60                     authenticate in the website) ",
61                     flags = Flags.Credential)
62
63         pl_password = Attribute("plpassword", 
64                         "PlanetLab account password, as \
65                         the one to authenticate in the website) ",
66                         flags = Flags.Credential)
67
68         city = Attribute("city", "Constrain location (city) during resource \
69                 discovery. May use wildcards.",
70                 flags = Flags.Filter)
71
72         country = Attribute("country", "Constrain location (country) during \
73                     resource discovery. May use wildcards.",
74                     flags = Flags.Filter)
75
76         region = Attribute("region", "Constrain location (region) during \
77                     resource discovery. May use wildcards.",
78                     flags = Flags.Filter)
79
80         architecture = Attribute("architecture", "Constrain architecture \
81                         during resource discovery.",
82                         type = Types.Enumerate,
83                         allowed = ["x86_64", 
84                                     "i386"],
85                         flags = Flags.Filter)
86
87         operating_system = Attribute("operatingSystem", "Constrain operating \
88                             system during resource discovery.",
89                             type = Types.Enumerate,
90                             allowed =  ["f8",
91                                         "f12",
92                                         "f14",
93                                         "centos",
94                                         "other"],
95                             flags = Flags.Filter)
96
97         #site = Attribute("site", "Constrain the PlanetLab site this node \
98         #        should reside on.",
99         #        type = Types.Enumerate,
100         #        allowed = ["PLE",
101         #                    "PLC",
102         #                    "PLJ"],
103         #        flags = Flags.Filter)
104
105         min_reliability = Attribute("minReliability", "Constrain reliability \
106                             while picking PlanetLab nodes. Specifies a lower \
107                             acceptable bound.",
108                             type = Types.Double,
109                             range = (1, 100),
110                             flags = Flags.Filter)
111
112         max_reliability = Attribute("maxReliability", "Constrain reliability \
113                             while picking PlanetLab nodes. Specifies an upper \
114                             acceptable bound.",
115                             type = Types.Double,
116                             range = (1, 100),
117                             flags = Flags.Filter)
118
119         min_bandwidth = Attribute("minBandwidth", "Constrain available \
120                             bandwidth while picking PlanetLab nodes. \
121                             Specifies a lower acceptable bound.",
122                             type = Types.Double,
123                             range = (0, 2**31),
124                             flags = Flags.Filter)
125
126         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
127                             bandwidth while picking PlanetLab nodes. \
128                             Specifies an upper acceptable bound.",
129                             type = Types.Double,
130                             range = (0, 2**31),
131                             flags = Flags.Filter)
132
133         min_load = Attribute("minLoad", "Constrain node load average while \
134                     picking PlanetLab nodes. Specifies a lower acceptable \
135                     bound.",
136                     type = Types.Double,
137                     range = (0, 2**31),
138                     flags = Flags.Filter)
139
140         max_load = Attribute("maxLoad", "Constrain node load average while \
141                     picking PlanetLab nodes. Specifies an upper acceptable \
142                     bound.",
143                     type = Types.Double,
144                     range = (0, 2**31),
145                     flags = Flags.Filter)
146
147         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
148                     picking PlanetLab nodes. Specifies a lower acceptable \
149                     bound.",
150                     type = Types.Double,
151                     range = (0, 100),
152                     flags = Flags.Filter)
153
154         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
155                     picking PlanetLab nodes. Specifies an upper acceptable \
156                     bound.",
157                     type = Types.Double,
158                     range = (0, 100),
159                     flags = Flags.Filter)
160
161         timeframe = Attribute("timeframe", "Past time period in which to check\
162                         information about the node. Values are year,month, \
163                         week, latest",
164                         default = "week",
165                         type = Types.Enumerate,
166                         allowed = ["latest",
167                                     "week",
168                                     "month",
169                                     "year"],
170                         flags = Flags.Filter)
171
172         cls._register_attribute(ip)
173         cls._register_attribute(pl_url)
174         cls._register_attribute(pl_ptn)
175         cls._register_attribute(pl_user)
176         cls._register_attribute(pl_password)
177         #cls._register_attribute(site)
178         cls._register_attribute(city)
179         cls._register_attribute(country)
180         cls._register_attribute(region)
181         cls._register_attribute(architecture)
182         cls._register_attribute(operating_system)
183         cls._register_attribute(min_reliability)
184         cls._register_attribute(max_reliability)
185         cls._register_attribute(min_bandwidth)
186         cls._register_attribute(max_bandwidth)
187         cls._register_attribute(min_load)
188         cls._register_attribute(max_load)
189         cls._register_attribute(min_cpu)
190         cls._register_attribute(max_cpu)
191         cls._register_attribute(timeframe)
192
193     def __init__(self, ec, guid):
194         super(PlanetlabNode, self).__init__(ec, guid)
195
196         self._plapi = None
197         self._node_to_provision = None
198         self._slicenode = False
199         self._hostname = False
200
201         if self.get("gateway") or self.get("gatewayUser"):
202             self.set("gateway", None)
203             self.set("gatewayUser", None)
204
205     def _skip_provision(self):
206         pl_user = self.get("pluser")
207         pl_pass = self.get("plpassword")
208         if not pl_user and not pl_pass:
209             return True
210         else: return False
211     
212     @property
213     def plapi(self):
214         if not self._plapi:
215             pl_user = self.get("pluser")
216             pl_pass = self.get("plpassword")
217             pl_url = self.get("plcApiUrl")
218             pl_ptn = self.get("plcApiPattern")
219
220             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
221                 pl_ptn)
222             
223             if not self._plapi:
224                 self.fail_plapi()
225
226         return self._plapi
227
228     def do_discover(self):
229         """
230         Based on the attributes defined by the user, discover the suitable 
231         nodes for provision.
232         """
233         if self._skip_provision():
234             super(PlanetlabNode, self).do_discover()
235             return
236
237         hostname = self._get_hostname()
238         if hostname:
239             # the user specified one particular node to be provisioned
240             # check with PLCAPI if it is alive
241             self._hostname = True
242             node_id = self._query_if_alive(hostname=hostname)
243             node_id = node_id.pop()
244
245             # check that the node is not blacklisted or being provisioned
246             # by other RM
247             with PlanetlabNode.lock:
248                 plist = self.plapi.reserved()
249                 blist = self.plapi.blacklisted()
250                 if node_id not in blist and node_id not in plist:
251                 
252                     # check that is really alive, by performing ping
253                     ping_ok = self._do_ping(node_id)
254                     if not ping_ok:
255                         self._blacklist_node(node_id)
256                         self.fail_node_not_alive(hostname)
257                     else:
258                         if self._check_if_in_slice([node_id]):
259                             self._slicenode = True
260                         self._put_node_in_provision(node_id)
261                         self._node_to_provision = node_id
262                 else:
263                     self.fail_node_not_available(hostname)
264             super(PlanetlabNode, self).do_discover()
265         
266         else:
267             # the user specifies constraints based on attributes, zero, one or 
268             # more nodes can match these constraints 
269             nodes = self._filter_based_on_attributes()
270             nodes_alive = self._query_if_alive(nodes)
271
272             # nodes that are already part of user's slice have the priority to
273             # provisioned
274             nodes_inslice = self._check_if_in_slice(nodes_alive)
275             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
276             
277             node_id = None
278             if nodes_inslice:
279                 node_id = self._choose_random_node(nodes_inslice)
280                 self._slicenode = True                
281                 
282             if not node_id:
283                 # Either there were no matching nodes in the user's slice, or
284                 # the nodes in the slice  were blacklisted or being provisioned
285                 # by other RM. Note nodes_not_inslice is never empty
286                 node_id = self._choose_random_node(nodes_not_inslice)
287                 self._slicenode = False
288
289             if node_id:
290                 self._node_to_provision = node_id
291                 try:
292                     self._set_hostname_attr(node_id)
293                     self.info(" Selected node to provision ")
294                     super(PlanetlabNode, self).do_discover()
295                 except:
296                     with PlanetlabNode.lock:
297                         self._blacklist_node(node_id)
298                     self.do_discover()
299             else:
300                self.fail_not_enough_nodes() 
301             
302     def do_provision(self):
303         """
304         Add node to user's slice after verifing that the node is functioning
305         correctly
306         """
307         if self._skip_provision():
308             super(PlanetlabNode, self).do_provision()
309             return
310
311         provision_ok = False
312         ssh_ok = False
313         proc_ok = False
314         timeout = 1800
315
316         while not provision_ok:
317             node = self._node_to_provision
318             if not self._slicenode:
319                 self._add_node_to_slice(node)
320             
321                 # check ssh connection
322                 t = 0 
323                 while t < timeout and not ssh_ok:
324
325                     cmd = 'echo \'GOOD NODE\''
326                     ((out, err), proc) = self.execute(cmd)
327                     if out.find("GOOD NODE") < 0:
328                         t = t + 60
329                         time.sleep(60)
330                         continue
331                     else:
332                         ssh_ok = True
333                         continue
334             else:
335                 cmd = 'echo \'GOOD NODE\''
336                 ((out, err), proc) = self.execute(cmd)
337                 if not out.find("GOOD NODE") < 0:
338                     ssh_ok = True
339
340             if not ssh_ok:
341                 # the timeout was reach without establishing ssh connection
342                 # the node is blacklisted, deleted from the slice, and a new
343                 # node to provision is discovered
344                 with PlanetlabNode.lock:
345                     self.warn(" Could not SSH login ")
346                     self._blacklist_node(node)
347                     #self._delete_node_from_slice(node)
348                 self.do_discover()
349                 continue
350             
351             # check /proc directory is mounted (ssh_ok = True)
352             else:
353                 cmd = 'mount |grep proc'
354                 ((out, err), proc) = self.execute(cmd)
355                 if out.find("/proc type proc") < 0:
356                     with PlanetlabNode.lock:
357                         self.warn(" Could not find directory /proc ")
358                         self._blacklist_node(node)
359                         #self._delete_node_from_slice(node)
360                     self.do_discover()
361                     continue
362             
363                 else:
364                     provision_ok = True
365                     if not self.get('hostname'):
366                         self._set_hostname_attr(node)            
367                     # set IP attribute
368                     ip = self._get_ip(node)
369                     self.set("ip", ip)
370             
371         super(PlanetlabNode, self).do_provision()
372
373     def _filter_based_on_attributes(self):
374         """
375         Retrive the list of nodes ids that match user's constraints 
376         """
377         # Map user's defined attributes with tagnames of PlanetLab
378         timeframe = self.get("timeframe")[0]
379         attr_to_tags = {
380             'city' : 'city',
381             'country' : 'country',
382             'region' : 'region',
383             'architecture' : 'arch',
384             'operatingSystem' : 'fcdistro',
385             #'site' : 'pldistro',
386             'minReliability' : 'reliability%s' % timeframe,
387             'maxReliability' : 'reliability%s' % timeframe,
388             'minBandwidth' : 'bw%s' % timeframe,
389             'maxBandwidth' : 'bw%s' % timeframe,
390             'minLoad' : 'load%s' % timeframe,
391             'maxLoad' : 'load%s' % timeframe,
392             'minCpu' : 'cpu%s' % timeframe,
393             'maxCpu' : 'cpu%s' % timeframe,
394         }
395         
396         nodes_id = []
397         filters = {}
398
399         for attr_name, attr_obj in self._attrs.iteritems():
400             attr_value = self.get(attr_name)
401             
402             if attr_value is not None and attr_obj.flags == 8 and \
403                 attr_name != 'timeframe':
404         
405                 attr_tag = attr_to_tags[attr_name]
406                 filters['tagname'] = attr_tag
407
408                 # filter nodes by fixed constraints e.g. operating system
409                 if not 'min' in attr_name and not 'max' in attr_name:
410                     filters['value'] = attr_value
411                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
412
413                 # filter nodes by range constraints e.g. max bandwidth
414                 elif ('min' or 'max') in attr_name:
415                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
416
417         if not filters:
418             nodes = self.plapi.get_nodes()
419             for node in nodes:
420                 nodes_id.append(node['node_id'])
421         
422         return nodes_id
423                     
424
425     def _filter_by_fixed_attr(self, filters, nodes_id):
426         """
427         Query PLCAPI for nodes ids matching fixed attributes defined by the
428         user
429         """
430         node_tags = self.plapi.get_node_tags(filters)
431         if node_tags is not None:
432
433             if len(nodes_id) == 0:
434                 # first attribute being matched
435                 for node_tag in node_tags:
436                     nodes_id.append(node_tag['node_id'])
437             else:
438                 # remove the nodes ids that don't match the new attribute
439                 # that is being match
440
441                 nodes_id_tmp = []
442                 for node_tag in node_tags:
443                     if node_tag['node_id'] in nodes_id:
444                         nodes_id_tmp.append(node_tag['node_id'])
445
446                 if len(nodes_id_tmp):
447                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
448                 else:
449                     # no node from before match the new constraint
450                     self.fail_discovery()
451         else:
452             # no nodes match the filter applied
453             self.fail_discovery()
454
455         return nodes_id
456
457     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
458         """
459         Query PLCAPI for nodes ids matching attributes defined in a certain
460         range, by the user
461         """
462         node_tags = self.plapi.get_node_tags(filters)
463         if node_tags:
464             
465             if len(nodes_id) == 0:
466                 # first attribute being matched
467                 for node_tag in node_tags:
468  
469                    # check that matches the min or max restriction
470                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
471                         float(node_tag['value']) > attr_value:
472                         nodes_id.append(node_tag['node_id'])
473
474                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
475                         float(node_tag['value']) < attr_value:
476                         nodes_id.append(node_tag['node_id'])
477             else:
478
479                 # remove the nodes ids that don't match the new attribute
480                 # that is being match
481                 nodes_id_tmp = []
482                 for node_tag in node_tags:
483
484                     # check that matches the min or max restriction and was a
485                     # matching previous filters
486                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
487                         float(node_tag['value']) > attr_value and \
488                         node_tag['node_id'] in nodes_id:
489                         nodes_id_tmp.append(node_tag['node_id'])
490
491                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
492                         float(node_tag['value']) < attr_value and \
493                         node_tag['node_id'] in nodes_id:
494                         nodes_id_tmp.append(node_tag['node_id'])
495
496                 if len(nodes_id_tmp):
497                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
498                 else:
499                     # no node from before match the new constraint
500                     self.fail_discovery()
501
502         else: #TODO CHECK
503             # no nodes match the filter applied
504             self.fail_discovery()
505
506         return nodes_id
507         
508     def _query_if_alive(self, nodes_id=None, hostname=None):
509         """
510         Query PLCAPI for nodes that register activity recently, using filters 
511         related to the state of the node, e.g. last time it was contacted
512         """
513         if nodes_id is None and hostname is None:
514             msg = "Specify nodes_id or hostname"
515             raise RuntimeError, msg
516
517         if nodes_id is not None and hostname is not None:
518             msg = "Specify either nodes_id or hostname"
519             raise RuntimeError, msg
520
521         # define PL filters to check the node is alive
522         filters = dict()
523         filters['run_level'] = 'boot'
524         filters['boot_state'] = 'boot'
525         filters['node_type'] = 'regular' 
526         #filters['>last_contact'] =  int(time.time()) - 2*3600
527
528         # adding node_id or hostname to the filters to check for the particular
529         # node
530         if nodes_id:
531             filters['node_id'] = list(nodes_id)
532             alive_nodes_id = self._get_nodes_id(filters)
533         elif hostname:
534             filters['hostname'] = hostname
535             alive_nodes_id = self._get_nodes_id(filters)
536
537         if len(alive_nodes_id) == 0:
538             self.fail_node_not_alive(hostname)
539         else:
540             nodes_id = list()
541             for node_id in alive_nodes_id:
542                 nid = node_id['node_id']
543                 nodes_id.append(nid)
544
545             return nodes_id
546
547     def _choose_random_node(self, nodes):
548         """
549         From the possible nodes for provision, choose randomly to decrese the
550         probability of different RMs choosing the same node for provision
551         """
552         size = len(nodes)
553         while size:
554             size = size - 1
555             index = randint(0, size)
556             node_id = nodes[index]
557             nodes[index] = nodes[size]
558
559             # check the node is not blacklisted or being provision by other RM
560             # and perform ping to check that is really alive
561             with PlanetlabNode.lock:
562
563                 blist = self.plapi.blacklisted()
564                 plist = self.plapi.reserved()
565                 if node_id not in blist and node_id not in plist:
566                     ping_ok = self._do_ping(node_id)
567                     if not ping_ok:
568                         self._set_hostname_attr(node_id)
569                         self.warn(" Node not responding PING ")
570                         self._blacklist_node(node_id)
571                     else:
572                         # discovered node for provision, added to provision list
573                         self._put_node_in_provision(node_id)
574                         return node_id
575
576     def _get_nodes_id(self, filters):
577         return self.plapi.get_nodes(filters, fields=['node_id'])
578
579     def _add_node_to_slice(self, node_id):
580         self.info(" Adding node to slice ")
581         slicename = self.get("username")
582         with PlanetlabNode.lock:
583             slice_nodes = self.plapi.get_slice_nodes(slicename)
584             slice_nodes.append(node_id)
585             self.plapi.add_slice_nodes(slicename, slice_nodes)
586
587     def _delete_node_from_slice(self, node):
588         self.warn(" Deleting node from slice ")
589         slicename = self.get("username")
590         self.plapi.delete_slice_node(slicename, [node])
591
592     def _get_hostname(self):
593         hostname = self.get("hostname")
594         ip = self.get("ip")
595         if hostname:
596             return hostname
597         elif ip:
598             hostname = socket.gethostbyaddr(ip)[0]
599             self.set('hostname', hostname)
600             return hostname
601         else:
602             return None
603
604     def _set_hostname_attr(self, node):
605         """
606         Query PLCAPI for the hostname of a certain node id and sets the
607         attribute hostname, it will over write the previous value
608         """
609         hostname = self.plapi.get_nodes(node, ['hostname'])
610         self.set("hostname", hostname[0]['hostname'])
611
612     def _check_if_in_slice(self, nodes_id):
613         """
614         Query PLCAPI to find out if any node id from nodes_id is in the user's
615         slice
616         """
617         slicename = self.get("username")
618         slice_nodes = self.plapi.get_slice_nodes(slicename)
619         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
620         return nodes_inslice
621
622     def _do_ping(self, node_id):
623         """
624         Perform ping command on node's IP matching node id
625         """
626         ping_ok = False
627         ip = self._get_ip(node_id)
628         if not ip: return ping_ok
629
630         command = ['ping', '-c4']
631         command.append(ip)
632
633         (out, err) = lexec(command)
634         if not out.find("2 received") or not out.find("3 received") or not \
635             out.find("4 received") < 0:
636             ping_ok = True
637         
638         return ping_ok 
639
640     def _blacklist_node(self, node):
641         """
642         Add node mal functioning node to blacklist
643         """
644         self.warn(" Blacklisting malfunctioning node ")
645         self.plapi.blacklist_host(node)
646         if not self._hostname:
647             self.set('hostname', None)
648
649     def _put_node_in_provision(self, node):
650         """
651         Add node to the list of nodes being provisioned, in order for other RMs
652         to not try to provision the same one again
653         """
654         self.plapi.reserve_host(node)
655
656     def _get_ip(self, node_id):
657         """
658         Query PLCAPI for the IP of a node with certain node id
659         """
660         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
661         try:
662             ip = sshfuncs.gethostbyname(hostname['hostname'])
663         except:
664             # Fail while trying to find the IP
665             return None
666         return ip
667
668     def fail_discovery(self):
669         msg = "Discovery failed. No candidates found for node"
670         self.error(msg)
671         raise RuntimeError, msg
672
673     def fail_node_not_alive(self, hostname=None):
674         msg = "Node %s not alive" % hostname
675         raise RuntimeError, msg
676     
677     def fail_node_not_available(self, hostname):
678         msg = "Node %s not available for provisioning" % hostname
679         raise RuntimeError, msg
680
681     def fail_not_enough_nodes(self):
682         msg = "Not enough nodes available for provisioning"
683         raise RuntimeError, msg
684
685     def fail_plapi(self):
686         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
687             " attributes pluser and plpassword."
688         raise RuntimeError, msg
689
690     def valid_connection(self, guid):
691         # TODO: Validate!
692         return True
693
694