Merge from multihop_ssh to nepi-3-dev
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay 
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import socket
32 import threading
33 import datetime
34
35 @clsinit_copy
36 class PlanetlabNode(LinuxNode):
37     _rtype = "PlanetlabNode"
38     _help = "Controls a PlanetLab host accessible using a SSH key " \
39             "associated to a PlanetLab user account"
40     _backend = "planetlab"
41
42     lock = threading.Lock()
43
44     @classmethod
45     def _register_attributes(cls):
46         ip = Attribute("ip", "PlanetLab host public IP address",
47                 flags = Flags.ReadOnly)
48
49         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
50                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
51                     default = "www.planet-lab.eu",
52                     flags = Flags.Credential)
53
54         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
55                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
56                     default = "https://%(hostname)s:443/PLCAPI/",
57                     flags = Flags.ExecReadOnly)
58     
59         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
60                     authenticate in the website) ",
61                     flags = Flags.Credential)
62
63         pl_password = Attribute("plpassword", 
64                         "PlanetLab account password, as \
65                         the one to authenticate in the website) ",
66                         flags = Flags.Credential)
67
68         city = Attribute("city", "Constrain location (city) during resource \
69                 discovery. May use wildcards.",
70                 flags = Flags.Filter)
71
72         country = Attribute("country", "Constrain location (country) during \
73                     resource discovery. May use wildcards.",
74                     flags = Flags.Filter)
75
76         region = Attribute("region", "Constrain location (region) during \
77                     resource discovery. May use wildcards.",
78                     flags = Flags.Filter)
79
80         architecture = Attribute("architecture", "Constrain architecture \
81                         during resource discovery.",
82                         type = Types.Enumerate,
83                         allowed = ["x86_64", 
84                                     "i386"],
85                         flags = Flags.Filter)
86
87         operating_system = Attribute("operatingSystem", "Constrain operating \
88                             system during resource discovery.",
89                             type = Types.Enumerate,
90                             allowed =  ["f8",
91                                         "f12",
92                                         "f14",
93                                         "centos",
94                                         "other"],
95                             flags = Flags.Filter)
96
97         #site = Attribute("site", "Constrain the PlanetLab site this node \
98         #        should reside on.",
99         #        type = Types.Enumerate,
100         #        allowed = ["PLE",
101         #                    "PLC",
102         #                    "PLJ"],
103         #        flags = Flags.Filter)
104
105         min_reliability = Attribute("minReliability", "Constrain reliability \
106                             while picking PlanetLab nodes. Specifies a lower \
107                             acceptable bound.",
108                             type = Types.Double,
109                             range = (1, 100),
110                             flags = Flags.Filter)
111
112         max_reliability = Attribute("maxReliability", "Constrain reliability \
113                             while picking PlanetLab nodes. Specifies an upper \
114                             acceptable bound.",
115                             type = Types.Double,
116                             range = (1, 100),
117                             flags = Flags.Filter)
118
119         min_bandwidth = Attribute("minBandwidth", "Constrain available \
120                             bandwidth while picking PlanetLab nodes. \
121                             Specifies a lower acceptable bound.",
122                             type = Types.Double,
123                             range = (0, 2**31),
124                             flags = Flags.Filter)
125
126         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
127                             bandwidth while picking PlanetLab nodes. \
128                             Specifies an upper acceptable bound.",
129                             type = Types.Double,
130                             range = (0, 2**31),
131                             flags = Flags.Filter)
132
133         min_load = Attribute("minLoad", "Constrain node load average while \
134                     picking PlanetLab nodes. Specifies a lower acceptable \
135                     bound.",
136                     type = Types.Double,
137                     range = (0, 2**31),
138                     flags = Flags.Filter)
139
140         max_load = Attribute("maxLoad", "Constrain node load average while \
141                     picking PlanetLab nodes. Specifies an upper acceptable \
142                     bound.",
143                     type = Types.Double,
144                     range = (0, 2**31),
145                     flags = Flags.Filter)
146
147         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
148                     picking PlanetLab nodes. Specifies a lower acceptable \
149                     bound.",
150                     type = Types.Double,
151                     range = (0, 100),
152                     flags = Flags.Filter)
153
154         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
155                     picking PlanetLab nodes. Specifies an upper acceptable \
156                     bound.",
157                     type = Types.Double,
158                     range = (0, 100),
159                     flags = Flags.Filter)
160
161         timeframe = Attribute("timeframe", "Past time period in which to check\
162                         information about the node. Values are year,month, \
163                         week, latest",
164                         default = "week",
165                         type = Types.Enumerate,
166                         allowed = ["latest",
167                                     "week",
168                                     "month",
169                                     "year"],
170                         flags = Flags.Filter)
171
172         cls._register_attribute(ip)
173         cls._register_attribute(pl_url)
174         cls._register_attribute(pl_ptn)
175         cls._register_attribute(pl_user)
176         cls._register_attribute(pl_password)
177         #cls._register_attribute(site)
178         cls._register_attribute(city)
179         cls._register_attribute(country)
180         cls._register_attribute(region)
181         cls._register_attribute(architecture)
182         cls._register_attribute(operating_system)
183         cls._register_attribute(min_reliability)
184         cls._register_attribute(max_reliability)
185         cls._register_attribute(min_bandwidth)
186         cls._register_attribute(max_bandwidth)
187         cls._register_attribute(min_load)
188         cls._register_attribute(max_load)
189         cls._register_attribute(min_cpu)
190         cls._register_attribute(max_cpu)
191         cls._register_attribute(timeframe)
192
193     def __init__(self, ec, guid):
194         super(PlanetlabNode, self).__init__(ec, guid)
195
196         self._plapi = None
197         self._node_to_provision = None
198         self._slicenode = False
199         self._hostname = False
200
201         if self.get("gateway") or self.get("gatewayUser"):
202             self.set("gateway", None)
203             self.set("gatewayUser", None)
204
205     def _skip_provision(self):
206         pl_user = self.get("pluser")
207         pl_pass = self.get("plpassword")
208         if not pl_user and not pl_pass:
209             return True
210         else: return False
211     
212     @property
213     def plapi(self):
214         if not self._plapi:
215             pl_user = self.get("pluser")
216             pl_pass = self.get("plpassword")
217             pl_url = self.get("plcApiUrl")
218             pl_ptn = self.get("plcApiPattern")
219
220             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
221                 pl_ptn)
222             
223             if not self._plapi:
224                 self.fail_plapi()
225
226         return self._plapi
227
228     def do_discover(self):
229         """
230         Based on the attributes defined by the user, discover the suitable 
231         nodes for provision.
232         """
233         if self._skip_provision():
234             super(PlanetlabNode, self).do_discover()
235             return
236
237         hostname = self._get_hostname()
238         if hostname:
239             # the user specified one particular node to be provisioned
240             self._hostname = True
241             node_id = self._get_nodes_id({'hostname':hostname})
242             node_id = node_id.pop()['node_id']
243
244             # check that the node is not blacklisted or being provisioned
245             # by other RM
246             with PlanetlabNode.lock:
247                 plist = self.plapi.reserved()
248                 blist = self.plapi.blacklisted()
249                 if node_id not in blist and node_id not in plist:
250                 
251                     # check that is really alive, by performing ping
252                     ping_ok = self._do_ping(node_id)
253                     if not ping_ok:
254                         self._blacklist_node(node_id)
255                         self.fail_node_not_alive(hostname)
256                     else:
257                         if self._check_if_in_slice([node_id]):
258                             self._slicenode = True
259                         self._put_node_in_provision(node_id)
260                         self._node_to_provision = node_id
261                 else:
262                     self.fail_node_not_available(hostname)
263             super(PlanetlabNode, self).do_discover()
264         
265         else:
266             # the user specifies constraints based on attributes, zero, one or 
267             # more nodes can match these constraints 
268             nodes = self._filter_based_on_attributes()
269
270             # nodes that are already part of user's slice have the priority to
271             # provisioned
272             nodes_inslice = self._check_if_in_slice(nodes)
273             nodes_not_inslice = list(set(nodes) - set(nodes_inslice))
274             
275             node_id = None
276             if nodes_inslice:
277                 node_id = self._choose_random_node(nodes_inslice)
278                 self._slicenode = True                
279                 
280             if not node_id:
281                 # Either there were no matching nodes in the user's slice, or
282                 # the nodes in the slice  were blacklisted or being provisioned
283                 # by other RM. Note nodes_not_inslice is never empty
284                 node_id = self._choose_random_node(nodes_not_inslice)
285                 self._slicenode = False
286
287             if node_id:
288                 self._node_to_provision = node_id
289                 try:
290                     self._set_hostname_attr(node_id)
291                     self.info(" Selected node to provision ")
292                     super(PlanetlabNode, self).do_discover()
293                 except:
294                     with PlanetlabNode.lock:
295                         self._blacklist_node(node_id)
296                     self.do_discover()
297             else:
298                self.fail_not_enough_nodes() 
299             
300     def do_provision(self):
301         """
302         Add node to user's slice after verifing that the node is functioning
303         correctly
304         """
305         if self._skip_provision():
306             super(PlanetlabNode, self).do_provision()
307             return
308
309         provision_ok = False
310         ssh_ok = False
311         proc_ok = False
312         timeout = 1800
313
314         while not provision_ok:
315             node = self._node_to_provision
316             if not self._slicenode:
317                 self._add_node_to_slice(node)
318             
319                 # check ssh connection
320                 t = 0 
321                 while t < timeout and not ssh_ok:
322
323                     cmd = 'echo \'GOOD NODE\''
324                     ((out, err), proc) = self.execute(cmd)
325                     if out.find("GOOD NODE") < 0:
326                         t = t + 60
327                         time.sleep(60)
328                         continue
329                     else:
330                         ssh_ok = True
331                         continue
332             else:
333                 cmd = 'echo \'GOOD NODE\''
334                 ((out, err), proc) = self.execute(cmd)
335                 if not out.find("GOOD NODE") < 0:
336                     ssh_ok = True
337
338             if not ssh_ok:
339                 # the timeout was reach without establishing ssh connection
340                 # the node is blacklisted, deleted from the slice, and a new
341                 # node to provision is discovered
342                 with PlanetlabNode.lock:
343                     self.warn(" Could not SSH login ")
344                     self._blacklist_node(node)
345                     #self._delete_node_from_slice(node)
346                 self.do_discover()
347                 continue
348             
349             # check /proc directory is mounted (ssh_ok = True)
350             else:
351                 cmd = 'mount |grep proc'
352                 ((out, err), proc) = self.execute(cmd)
353                 if out.find("/proc type proc") < 0:
354                     with PlanetlabNode.lock:
355                         self.warn(" Could not find directory /proc ")
356                         self._blacklist_node(node)
357                         #self._delete_node_from_slice(node)
358                     self.do_discover()
359                     continue
360             
361                 else:
362                     provision_ok = True
363                     if not self.get('hostname'):
364                         self._set_hostname_attr(node)            
365                     # set IP attribute
366                     ip = self._get_ip(node)
367                     self.set("ip", ip)
368             
369         super(PlanetlabNode, self).do_provision()
370
371     def _filter_based_on_attributes(self):
372         """
373         Retrive the list of nodes ids that match user's constraints 
374         """
375         # Map user's defined attributes with tagnames of PlanetLab
376         timeframe = self.get("timeframe")[0]
377         attr_to_tags = {
378             'city' : 'city',
379             'country' : 'country',
380             'region' : 'region',
381             'architecture' : 'arch',
382             'operatingSystem' : 'fcdistro',
383             #'site' : 'pldistro',
384             'minReliability' : 'reliability%s' % timeframe,
385             'maxReliability' : 'reliability%s' % timeframe,
386             'minBandwidth' : 'bw%s' % timeframe,
387             'maxBandwidth' : 'bw%s' % timeframe,
388             'minLoad' : 'load%s' % timeframe,
389             'maxLoad' : 'load%s' % timeframe,
390             'minCpu' : 'cpu%s' % timeframe,
391             'maxCpu' : 'cpu%s' % timeframe,
392         }
393         
394         nodes_id = []
395         filters = {}
396
397         for attr_name, attr_obj in self._attrs.iteritems():
398             attr_value = self.get(attr_name)
399             
400             if attr_value is not None and attr_obj.flags == 8 and \
401                 attr_name != 'timeframe':
402         
403                 attr_tag = attr_to_tags[attr_name]
404                 filters['tagname'] = attr_tag
405
406                 # filter nodes by fixed constraints e.g. operating system
407                 if not 'min' in attr_name and not 'max' in attr_name:
408                     filters['value'] = attr_value
409                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
410
411                 # filter nodes by range constraints e.g. max bandwidth
412                 elif ('min' or 'max') in attr_name:
413                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
414
415         if not filters:
416             nodes = self._get_nodes_id()
417             for node in nodes:
418                 nodes_id.append(node['node_id'])
419         return nodes_id
420                     
421     def _filter_by_fixed_attr(self, filters, nodes_id):
422         """
423         Query PLCAPI for nodes ids matching fixed attributes defined by the
424         user
425         """
426         node_tags = self.plapi.get_node_tags(filters)
427         if node_tags is not None:
428
429             if len(nodes_id) == 0:
430                 # first attribute being matched
431                 for node_tag in node_tags:
432                     nodes_id.append(node_tag['node_id'])
433             else:
434                 # remove the nodes ids that don't match the new attribute
435                 # that is being match
436
437                 nodes_id_tmp = []
438                 for node_tag in node_tags:
439                     if node_tag['node_id'] in nodes_id:
440                         nodes_id_tmp.append(node_tag['node_id'])
441
442                 if len(nodes_id_tmp):
443                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
444                 else:
445                     # no node from before match the new constraint
446                     self.fail_discovery()
447         else:
448             # no nodes match the filter applied
449             self.fail_discovery()
450
451         return nodes_id
452
453     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
454         """
455         Query PLCAPI for nodes ids matching attributes defined in a certain
456         range, by the user
457         """
458         node_tags = self.plapi.get_node_tags(filters)
459         if node_tags:
460             
461             if len(nodes_id) == 0:
462                 # first attribute being matched
463                 for node_tag in node_tags:
464  
465                    # check that matches the min or max restriction
466                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
467                         float(node_tag['value']) > attr_value:
468                         nodes_id.append(node_tag['node_id'])
469
470                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
471                         float(node_tag['value']) < attr_value:
472                         nodes_id.append(node_tag['node_id'])
473             else:
474
475                 # remove the nodes ids that don't match the new attribute
476                 # that is being match
477                 nodes_id_tmp = []
478                 for node_tag in node_tags:
479
480                     # check that matches the min or max restriction and was a
481                     # matching previous filters
482                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
483                         float(node_tag['value']) > attr_value and \
484                         node_tag['node_id'] in nodes_id:
485                         nodes_id_tmp.append(node_tag['node_id'])
486
487                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
488                         float(node_tag['value']) < attr_value and \
489                         node_tag['node_id'] in nodes_id:
490                         nodes_id_tmp.append(node_tag['node_id'])
491
492                 if len(nodes_id_tmp):
493                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
494                 else:
495                     # no node from before match the new constraint
496                     self.fail_discovery()
497
498         else: #TODO CHECK
499             # no nodes match the filter applied
500             self.fail_discovery()
501
502         return nodes_id
503         
504     def _choose_random_node(self, nodes):
505         """
506         From the possible nodes for provision, choose randomly to decrese the
507         probability of different RMs choosing the same node for provision
508         """
509         size = len(nodes)
510         while size:
511             size = size - 1
512             index = randint(0, size)
513             node_id = nodes[index]
514             nodes[index] = nodes[size]
515
516             # check the node is not blacklisted or being provision by other RM
517             # and perform ping to check that is really alive
518             with PlanetlabNode.lock:
519
520                 blist = self.plapi.blacklisted()
521                 plist = self.plapi.reserved()
522                 if node_id not in blist and node_id not in plist:
523                     ping_ok = self._do_ping(node_id)
524                     if not ping_ok:
525                         self._set_hostname_attr(node_id)
526                         self.warn(" Node not responding PING ")
527                         self._blacklist_node(node_id)
528                     else:
529                         # discovered node for provision, added to provision list
530                         self._put_node_in_provision(node_id)
531                         return node_id
532
533     def _get_nodes_id(self, filters=None):
534         return self.plapi.get_nodes(filters, fields=['node_id'])
535
536     def _add_node_to_slice(self, node_id):
537         self.info(" Adding node to slice ")
538         slicename = self.get("username")
539         with PlanetlabNode.lock:
540             slice_nodes = self.plapi.get_slice_nodes(slicename)
541             slice_nodes.append(node_id)
542             self.plapi.add_slice_nodes(slicename, slice_nodes)
543
544     def _delete_node_from_slice(self, node):
545         self.warn(" Deleting node from slice ")
546         slicename = self.get("username")
547         self.plapi.delete_slice_node(slicename, [node])
548
549     def _get_hostname(self):
550         hostname = self.get("hostname")
551         ip = self.get("ip")
552         if hostname:
553             return hostname
554         elif ip:
555             hostname = socket.gethostbyaddr(ip)[0]
556             self.set('hostname', hostname)
557             return hostname
558         else:
559             return None
560
561     def _set_hostname_attr(self, node):
562         """
563         Query PLCAPI for the hostname of a certain node id and sets the
564         attribute hostname, it will over write the previous value
565         """
566         hostname = self.plapi.get_nodes(node, ['hostname'])
567         self.set("hostname", hostname[0]['hostname'])
568
569     def _check_if_in_slice(self, nodes_id):
570         """
571         Query PLCAPI to find out if any node id from nodes_id is in the user's
572         slice
573         """
574         slicename = self.get("username")
575         slice_nodes = self.plapi.get_slice_nodes(slicename)
576         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
577         return nodes_inslice
578
579     def _do_ping(self, node_id):
580         """
581         Perform ping command on node's IP matching node id
582         """
583         ping_ok = False
584         ip = self._get_ip(node_id)
585         if not ip: return ping_ok
586
587         command = ['ping', '-c4']
588         command.append(ip)
589
590         (out, err) = lexec(command)
591         if not out.find("2 received") or not out.find("3 received") or not \
592             out.find("4 received") < 0:
593             ping_ok = True
594         
595         return ping_ok 
596
597     def _blacklist_node(self, node):
598         """
599         Add node mal functioning node to blacklist
600         """
601         self.warn(" Blacklisting malfunctioning node ")
602         self.plapi.blacklist_host(node)
603         if not self._hostname:
604             self.set('hostname', None)
605
606     def _put_node_in_provision(self, node):
607         """
608         Add node to the list of nodes being provisioned, in order for other RMs
609         to not try to provision the same one again
610         """
611         self.plapi.reserve_host(node)
612
613     def _get_ip(self, node_id):
614         """
615         Query PLCAPI for the IP of a node with certain node id
616         """
617         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
618         try:
619             ip = sshfuncs.gethostbyname(hostname['hostname'])
620         except:
621             # Fail while trying to find the IP
622             return None
623         return ip
624
625     def fail_discovery(self):
626         msg = "Discovery failed. No candidates found for node"
627         self.error(msg)
628         raise RuntimeError, msg
629
630     def fail_node_not_alive(self, hostname=None):
631         msg = "Node %s not alive" % hostname
632         raise RuntimeError, msg
633     
634     def fail_node_not_available(self, hostname):
635         msg = "Node %s not available for provisioning" % hostname
636         raise RuntimeError, msg
637
638     def fail_not_enough_nodes(self):
639         msg = "Not enough nodes available for provisioning"
640         raise RuntimeError, msg
641
642     def fail_plapi(self):
643         msg = "Failing while trying to instanciate the PLC API.\nSet the" + \
644             " attributes pluser and plpassword."
645         raise RuntimeError, msg
646
647     def valid_connection(self, guid):
648         # TODO: Validate!
649         return True
650
651