merged ex_shutdown into nepi-3-dev
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, \
23         ResourceState, reschedule_delay, failtrap
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27 from nepi.util import sshfuncs
28
29 from random import randint
30 import time
31 import threading
32
33 @clsinit_copy
34 class PlanetlabNode(LinuxNode):
35     _rtype = "PlanetlabNode"
36     _help = "Controls a PlanetLab host accessible using a SSH key " \
37             "associated to a PlanetLab user account"
38     _backend = "planetlab"
39
40     lock = threading.Lock()
41
42     @classmethod
43     def _register_attributes(cls):
44         ip = Attribute("ip", "PlanetLab host public IP address",
45                 flags = Flags.ReadOnly)
46
47         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
48                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
49                     default = "www.planet-lab.eu",
50                     flags = Flags.Credential)
51
52         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
53                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
54                     default = "https://%(hostname)s:443/PLCAPI/",
55                     flags = Flags.ExecReadOnly)
56     
57         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
58                     authenticate in the website) ",
59                     flags = Flags.Credential)
60
61         pl_password = Attribute("plpassword", 
62                         "PlanetLab account password, as \
63                         the one to authenticate in the website) ",
64                         flags = Flags.Credential)
65
66         city = Attribute("city", "Constrain location (city) during resource \
67                 discovery. May use wildcards.",
68                 flags = Flags.Filter)
69
70         country = Attribute("country", "Constrain location (country) during \
71                     resource discovery. May use wildcards.",
72                     flags = Flags.Filter)
73
74         region = Attribute("region", "Constrain location (region) during \
75                     resource discovery. May use wildcards.",
76                     flags = Flags.Filter)
77
78         architecture = Attribute("architecture", "Constrain architecture \
79                         during resource discovery.",
80                         type = Types.Enumerate,
81                         allowed = ["x86_64", 
82                                     "i386"],
83                         flags = Flags.Filter)
84
85         operating_system = Attribute("operatingSystem", "Constrain operating \
86                             system during resource discovery.",
87                             type = Types.Enumerate,
88                             allowed =  ["f8",
89                                         "f12",
90                                         "f14",
91                                         "centos",
92                                         "other"],
93                             flags = Flags.Filter)
94
95         #site = Attribute("site", "Constrain the PlanetLab site this node \
96         #        should reside on.",
97         #        type = Types.Enumerate,
98         #        allowed = ["PLE",
99         #                    "PLC",
100         #                    "PLJ"],
101         #        flags = Flags.Filter)
102
103         min_reliability = Attribute("minReliability", "Constrain reliability \
104                             while picking PlanetLab nodes. Specifies a lower \
105                             acceptable bound.",
106                             type = Types.Double,
107                             range = (1, 100),
108                             flags = Flags.Filter)
109
110         max_reliability = Attribute("maxReliability", "Constrain reliability \
111                             while picking PlanetLab nodes. Specifies an upper \
112                             acceptable bound.",
113                             type = Types.Double,
114                             range = (1, 100),
115                             flags = Flags.Filter)
116
117         min_bandwidth = Attribute("minBandwidth", "Constrain available \
118                             bandwidth while picking PlanetLab nodes. \
119                             Specifies a lower acceptable bound.",
120                             type = Types.Double,
121                             range = (0, 2**31),
122                             flags = Flags.Filter)
123
124         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
125                             bandwidth while picking PlanetLab nodes. \
126                             Specifies an upper acceptable bound.",
127                             type = Types.Double,
128                             range = (0, 2**31),
129                             flags = Flags.Filter)
130
131         min_load = Attribute("minLoad", "Constrain node load average while \
132                     picking PlanetLab nodes. Specifies a lower acceptable \
133                     bound.",
134                     type = Types.Double,
135                     range = (0, 2**31),
136                     flags = Flags.Filter)
137
138         max_load = Attribute("maxLoad", "Constrain node load average while \
139                     picking PlanetLab nodes. Specifies an upper acceptable \
140                     bound.",
141                     type = Types.Double,
142                     range = (0, 2**31),
143                     flags = Flags.Filter)
144
145         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
146                     picking PlanetLab nodes. Specifies a lower acceptable \
147                     bound.",
148                     type = Types.Double,
149                     range = (0, 100),
150                     flags = Flags.Filter)
151
152         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
153                     picking PlanetLab nodes. Specifies an upper acceptable \
154                     bound.",
155                     type = Types.Double,
156                     range = (0, 100),
157                     flags = Flags.Filter)
158
159         timeframe = Attribute("timeframe", "Past time period in which to check\
160                         information about the node. Values are year,month, \
161                         week, latest",
162                         default = "week",
163                         type = Types.Enumerate,
164                         allowed = ["latest",
165                                     "week",
166                                     "month",
167                                     "year"],
168                         flags = Flags.Filter)
169
170         cls._register_attribute(ip)
171         cls._register_attribute(pl_url)
172         cls._register_attribute(pl_ptn)
173         cls._register_attribute(pl_user)
174         cls._register_attribute(pl_password)
175         #cls._register_attribute(site)
176         cls._register_attribute(city)
177         cls._register_attribute(country)
178         cls._register_attribute(region)
179         cls._register_attribute(architecture)
180         cls._register_attribute(operating_system)
181         cls._register_attribute(min_reliability)
182         cls._register_attribute(max_reliability)
183         cls._register_attribute(min_bandwidth)
184         cls._register_attribute(max_bandwidth)
185         cls._register_attribute(min_load)
186         cls._register_attribute(max_load)
187         cls._register_attribute(min_cpu)
188         cls._register_attribute(max_cpu)
189         cls._register_attribute(timeframe)
190
191     def __init__(self, ec, guid):
192         super(PlanetlabNode, self).__init__(ec, guid)
193
194         self._plapi = None
195         self._node_to_provision = None
196     
197     @property
198     def plapi(self):
199         if not self._plapi:
200             pl_user = self.get("pluser")
201             pl_pass = self.get("plpassword")
202             pl_url = self.get("plcApiUrl")
203             pl_ptn = self.get("plcApiPattern")
204
205             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
206                     pl_ptn)
207             
208         return self._plapi
209
210     def discover(self):
211         """
212         Based on the attributes defined by the user, discover the suitable nodes
213         """
214         hostname = self._get_hostname()
215         print self.guid, hostname 
216         if hostname:
217             # the user specified one particular node to be provisioned
218             # check with PLCAPI if it is alvive
219             node_id = self._query_if_alive(hostname=hostname)
220             node_id = node_id.pop()
221             print self.guid, node_id
222
223             # check that the node is not blacklisted or being provisioned
224             # by other RM
225             with PlanetlabNode.lock:
226                 plist = self.plapi.reserved()
227                 blist = self.plapi.blacklisted()
228                 print self.guid,plist
229                 print self.guid,blist
230                 if node_id not in blist and node_id not in plist:
231                 
232                     # check that is really alive, by performing ping
233                     ping_ok = self._do_ping(node_id)
234                     if not ping_ok:
235                         self._blacklist_node(node_id)
236                         self.fail_node_not_alive(hostname)
237                     else:
238                         self._put_node_in_provision(node_id)
239                         self._node_to_provision = node_id
240                         super(PlanetlabNode, self).discover()
241                 
242                 else:
243                     self.fail_node_not_available(hostname)
244         
245         else:
246             # the user specifies constraints based on attributes, zero, one or 
247             # more nodes can match these constraints 
248             nodes = self._filter_based_on_attributes()
249             nodes_alive = self._query_if_alive(nodes)
250     
251             # nodes that are already part of user's slice have the priority to
252             # provisioned
253             nodes_inslice = self._check_if_in_slice(nodes_alive)
254             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
255             
256             node_id = None
257             if nodes_inslice:
258                 node_id = self._choose_random_node(nodes_inslice)
259                 
260             if not node_id:
261                 # Either there were no matching nodes in the user's slice, or
262                 # the nodes in the slice  were blacklisted or being provisioned
263                 # by other RM. Note nodes_not_inslice is never empty
264                 node_id = self._choose_random_node(nodes_not_inslice)
265
266             if node_id:
267                 self._node_to_provision = node_id
268                 super(PlanetlabNode, self).discover()
269             else:
270                self.fail_not_enough_nodes() 
271             
272     def provision(self):
273         """
274         Add node to user's slice after verifing that the node is functioning
275         correctly
276         """
277         provision_ok = False
278         ssh_ok = False
279         proc_ok = False
280         timeout = 120
281
282         while not provision_ok:
283             node = self._node_to_provision
284             # Adding try catch to set hostname because sometimes MyPLC fails
285             # when trying to retrive node's hostname
286             try:
287                 self._set_hostname_attr(node)
288             except:
289                 with PlanetlabNode.lock:
290                     self._blacklist_node(node)
291                 self.discover()
292                 continue
293
294             self._add_node_to_slice(node)
295             
296             # check ssh connection
297             t = 0 
298             while t < timeout and not ssh_ok:
299
300                 cmd = 'echo \'GOOD NODE\''
301                 ((out, err), proc) = self.execute(cmd)
302                 if out.find("GOOD NODE") < 0:
303                     t = t + 60
304                     time.sleep(60)
305                     continue
306                 else:
307                     ssh_ok = True
308                     continue
309
310             if not ssh_ok:
311                 # the timeout was reach without establishing ssh connection
312                 # the node is blacklisted, deleted from the slice, and a new
313                 # node to provision is discovered
314                 with PlanetlabNode.lock:
315                     self._blacklist_node(node)
316                     self._delete_node_from_slice(node)
317                 self.set('hostname', None)
318                 self.discover()
319                 continue
320             
321             # check /proc directory is mounted (ssh_ok = True)
322             else:
323                 cmd = 'mount |grep proc'
324                 ((out, err), proc) = self.execute(cmd)
325                 if out.find("/proc type proc") < 0:
326                     with PlanetlabNode.lock:
327                         self._blacklist_node(node)
328                         self._delete_node_from_slice(node)
329                     self.set('hostname', None)
330                     self.discover()
331                     continue
332             
333                 else:
334                     provision_ok = True
335                     # set IP attribute
336                     ip = self._get_ip(node)
337                     self.set("ip", ip)
338             
339         super(PlanetlabNode, self).provision()
340
341     def _filter_based_on_attributes(self):
342         """
343         Retrive the list of nodes ids that match user's constraints 
344         """
345         # Map user's defined attributes with tagnames of PlanetLab
346         timeframe = self.get("timeframe")[0]
347         attr_to_tags = {
348             'city' : 'city',
349             'country' : 'country',
350             'region' : 'region',
351             'architecture' : 'arch',
352             'operatingSystem' : 'fcdistro',
353             #'site' : 'pldistro',
354             'minReliability' : 'reliability%s' % timeframe,
355             'maxReliability' : 'reliability%s' % timeframe,
356             'minBandwidth' : 'bw%s' % timeframe,
357             'maxBandwidth' : 'bw%s' % timeframe,
358             'minLoad' : 'load%s' % timeframe,
359             'maxLoad' : 'load%s' % timeframe,
360             'minCpu' : 'cpu%s' % timeframe,
361             'maxCpu' : 'cpu%s' % timeframe,
362         }
363         
364         nodes_id = []
365         filters = {}
366
367         for attr_name, attr_obj in self._attrs.iteritems():
368             attr_value = self.get(attr_name)
369             
370             if attr_value is not None and attr_obj.flags == 8 and \
371                 attr_name != 'timeframe':
372         
373                 attr_tag = attr_to_tags[attr_name]
374                 filters['tagname'] = attr_tag
375
376                 # filter nodes by fixed constraints e.g. operating system
377                 if not 'min' in attr_name and not 'max' in attr_name:
378                     filters['value'] = attr_value
379                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
380
381                 # filter nodes by range constraints e.g. max bandwidth
382                 elif ('min' or 'max') in attr_name:
383                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
384
385         if not filters:
386             nodes = self.plapi.get_nodes()
387             for node in nodes:
388                 nodes_id.append(node['node_id'])
389                 
390         return nodes_id
391                     
392
393     def _filter_by_fixed_attr(self, filters, nodes_id):
394         """
395         Query PLCAPI for nodes ids matching fixed attributes defined by the
396         user
397         """
398         node_tags = self.plapi.get_node_tags(filters)
399         if node_tags is not None:
400
401             if len(nodes_id) == 0:
402                 # first attribute being matched
403                 for node_tag in node_tags:
404                     nodes_id.append(node_tag['node_id'])
405             else:
406                 # remove the nodes ids that don't match the new attribute
407                 # that is being match
408
409                 nodes_id_tmp = []
410                 for node_tag in node_tags:
411                     if node_tag['node_id'] in nodes_id:
412                         nodes_id_tmp.append(node_tag['node_id'])
413
414                 if len(nodes_id_tmp):
415                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
416                 else:
417                     # no node from before match the new constraint
418                     self.fail_discovery()
419         else:
420             # no nodes match the filter applied
421             self.fail_discovery()
422
423         return nodes_id
424
425     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
426         """
427         Query PLCAPI for nodes ids matching attributes defined in a certain
428         range, by the user
429         """
430         node_tags = self.plapi.get_node_tags(filters)
431         if node_tags is not None:
432             
433             if len(nodes_id) == 0:
434                 # first attribute being matched
435                 for node_tag in node_tags:
436  
437                    # check that matches the min or max restriction
438                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
439                         float(node_tag['value']) > attr_value:
440                         nodes_id.append(node_tag['node_id'])
441
442                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
443                         float(node_tag['value']) < attr_value:
444                         nodes_id.append(node_tag['node_id'])
445             else:
446
447                 # remove the nodes ids that don't match the new attribute
448                 # that is being match
449                 nodes_id_tmp = []
450                 for node_tag in node_tags:
451
452                     # check that matches the min or max restriction and was a
453                     # matching previous filters
454                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
455                         float(node_tag['value']) > attr_value and \
456                         node_tag['node_id'] in nodes_id:
457                         nodes_id_tmp.append(node_tag['node_id'])
458
459                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
460                         float(node_tag['value']) < attr_value and \
461                         node_tag['node_id'] in nodes_id:
462                         nodes_id_tmp.append(node_tag['node_id'])
463
464                 if len(nodes_id_tmp):
465                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
466                 else:
467                     # no node from before match the new constraint
468                     self.fail_discovery()
469
470         else: #TODO CHECK
471             # no nodes match the filter applied
472             self.fail_discovery()
473
474         return nodes_id
475         
476     def _query_if_alive(self, nodes_id=None, hostname=None):
477         """
478         Query PLCAPI for nodes that register activity recently, using filters 
479         related to the state of the node, e.g. last time it was contacted
480         """
481         if nodes_id is None and hostname is None:
482             msg = "Specify nodes_id or hostname"
483             raise RuntimeError, msg
484
485         if nodes_id is not None and hostname is not None:
486             msg = "Specify either nodes_id or hostname"
487             raise RuntimeError, msg
488
489         # define PL filters to check the node is alive
490         filters = dict()
491         filters['run_level'] = 'boot'
492         filters['boot_state'] = 'boot'
493         filters['node_type'] = 'regular' 
494         #filters['>last_contact'] =  int(time.time()) - 2*3600
495
496         # adding node_id or hostname to the filters to check for the particular
497         # node
498         if nodes_id:
499             filters['node_id'] = list(nodes_id)
500             alive_nodes_id = self._get_nodes_id(filters)
501         elif hostname:
502             filters['hostname'] = hostname
503             alive_nodes_id = self._get_nodes_id(filters)
504
505         if len(alive_nodes_id) == 0:
506             self.fail_node_not_alive(self, hostname)
507         else:
508             nodes_id = list()
509             for node_id in alive_nodes_id:
510                 nid = node_id['node_id']
511                 nodes_id.append(nid)
512
513             return nodes_id
514
515     def _choose_random_node(self, nodes):
516         """
517         From the possible nodes for provision, choose randomly to decrese the
518         probability of different RMs choosing the same node for provision
519         """
520         size = len(nodes)
521         while size:
522             size = size - 1
523             index = randint(0, size)
524             node_id = nodes[index]
525             nodes[index] = nodes[size]
526
527             # check the node is not blacklisted or being provision by other RM
528             # and perform ping to check that is really alive
529             with PlanetlabNode.lock:
530
531                 blist = self.plapi.blacklisted()
532                 plist = self.plapi.reserved()
533                 if node_id not in blist and node_id not in plist:
534                     ping_ok = self._do_ping(node_id)
535                     print " ### ping_ok #### %s guid %s" % (ping_ok, self.guid)
536                     if not ping_ok:
537                         self._blacklist_node(node_id)
538                     else:
539                         # discovered node for provision, added to provision list
540                         self._put_node_in_provision(node_id)
541                         print "node_id %s , guid %s" % (node_id, self.guid)
542                         return node_id
543
544     def _get_nodes_id(self, filters):
545         return self.plapi.get_nodes(filters, fields=['node_id'])
546
547     def _add_node_to_slice(self, node_id):
548         self.info(" Selected node to provision ")
549         slicename = self.get("username")
550         with PlanetlabNode.lock:
551             slice_nodes = self.plapi.get_slice_nodes(slicename)
552             slice_nodes.append(node_id)
553             self.plapi.add_slice_nodes(slicename, slice_nodes)
554
555     def _delete_node_from_slice(self, node):
556         self.warn(" Deleting node from slice ")
557         slicename = self.get("username")
558         self.plapi.delete_slice_node(slicename, [node])
559
560     def _get_hostname(self):
561         hostname = self.get("hostname")
562         ip = self.get("ip")
563         if hostname:
564             return hostname
565         elif ip:
566             hostname = sshfuncs.gethostbyname(ip)
567             return hostname
568         else:
569             return None
570
571     def _set_hostname_attr(self, node):
572         """
573         Query PLCAPI for the hostname of a certain node id and sets the
574         attribute hostname, it will over write the previous value
575         """
576         hostname = self.plapi.get_nodes(node, ['hostname'])
577         self.set("hostname", hostname[0]['hostname'])
578
579     def _check_if_in_slice(self, nodes_id):
580         """
581         Query PLCAPI to find out if any node id from nodes_id is in the user's
582         slice
583         """
584         slicename = self.get("username")
585         slice_nodes = self.plapi.get_slice_nodes(slicename)
586         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
587         return nodes_inslice
588
589     def _do_ping(self, node_id):
590         """
591         Perform ping command on node's IP matching node id
592         """
593         ping_ok = False
594         ip = self._get_ip(node_id)
595         print "ip de do_ping %s, guid %s" % (ip, self.guid)
596         if not ip: return ping_ok
597
598         command = "ping -c2 %s" % ip
599
600         (out, err) = lexec(command)
601         print "out de do_ping %s, guid %s" % (out, self.guid)
602         if not out.find("2 received") < 0:
603             ping_ok = True
604         
605         print "ping_ok de do_ping %s, guid %s" % (ping_ok, self.guid)
606         return ping_ok 
607
608     def _blacklist_node(self, node):
609         """
610         Add node mal functioning node to blacklist
611         """
612         self.warn(" Blacklisting malfunctioning node ")
613         self._plapi.blacklist_host(node)
614
615     def _put_node_in_provision(self, node):
616         """
617         Add node to the list of nodes being provisioned, in order for other RMs
618         to not try to provision the same one again
619         """
620         self._plapi.reserve_host(node)
621
622     def _get_ip(self, node_id):
623         """
624         Query PLCAPI for the IP of a node with certain node id
625         """
626         hostname = self.plapi.get_nodes(node_id, ['hostname'])[0]
627         print "#### HOSTNAME ##### %s ### guid %s " % (hostname['hostname'], self.guid)
628         ip = sshfuncs.gethostbyname(hostname['hostname'])
629         if not ip:
630             # Fail while trying to find the IP
631             return None
632         return ip
633
634     def fail_discovery(self):
635         self.fail()
636         msg = "Discovery failed. No candidates found for node"
637         self.error(msg)
638         raise RuntimeError, msg
639
640     def fail_node_not_alive(self, hostname=None):
641         self.fail()
642         msg = "Node %s not alive" % hostname
643         raise RuntimeError, msg
644     
645     def fail_node_not_available(self, hostname):
646         self.fail()
647         msg = "Node %s not available for provisioning" % hostname
648         raise RuntimeError, msg
649
650     def fail_not_enough_nodes(self):
651         self.fail()
652         msg = "Not enough nodes available for provisioning"
653         raise RuntimeError, msg
654
655     def valid_connection(self, guid):
656         # TODO: Validate!
657         return True
658
659