ec_shutdown
[nepi.git] / src / nepi / resources / planetlab / node.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian <lucia.guevgeozian_odizzio@inria.fr>
20
21 from nepi.execution.attribute import Attribute, Flags, Types
22 from nepi.execution.resource import ResourceManager, clsinit_copy, ResourceState, \
23         reschedule_delay
24 from nepi.resources.linux.node import LinuxNode
25 from nepi.resources.planetlab.plcapi import PLCAPIFactory 
26 from nepi.util.execfuncs import lexec
27
28 from random import randint
29 import time
30 import threading
31
32 @clsinit_copy
33 class PlanetlabNode(LinuxNode):
34     _rtype = "PlanetlabNode"
35     _help = "Controls a PlanetLab host accessible using a SSH key " \
36             "associated to a PlanetLab user account"
37     _backend = "planetlab"
38
39     blacklist = list()
40     provisionlist = list()
41
42     lock_blist = threading.Lock()
43     lock_plist = threading.Lock()
44
45     lock_slice = threading.Lock()
46
47
48     @classmethod
49     def _register_attributes(cls):
50         ip = Attribute("ip", "PlanetLab host public IP address",
51                 flags = Flags.ReadOnly)
52
53         pl_url = Attribute("plcApiUrl", "URL of PlanetLab PLCAPI host \
54                     (e.g. www.planet-lab.eu or www.planet-lab.org) ",
55                     default = "www.planet-lab.eu",
56                     flags = Flags.Credential)
57
58         pl_ptn = Attribute("plcApiPattern", "PLC API service regexp pattern \
59                     (e.g. https://%(hostname)s:443/PLCAPI/ ) ",
60                     default = "https://%(hostname)s:443/PLCAPI/",
61                     flags = Flags.ExecReadOnly)
62     
63         pl_user = Attribute("pluser", "PlanetLab account user, as the one to \
64                     authenticate in the website) ",
65                     flags = Flags.Credential)
66
67         pl_password = Attribute("password", 
68                         "PlanetLab account password, as \
69                         the one to authenticate in the website) ",
70                         flags = Flags.Credential)
71
72         city = Attribute("city", "Constrain location (city) during resource \
73                 discovery. May use wildcards.",
74                 flags = Flags.Filter)
75
76         country = Attribute("country", "Constrain location (country) during \
77                     resource discovery. May use wildcards.",
78                     flags = Flags.Filter)
79
80         region = Attribute("region", "Constrain location (region) during \
81                     resource discovery. May use wildcards.",
82                     flags = Flags.Filter)
83
84         architecture = Attribute("architecture", "Constrain architecture \
85                         during resource discovery.",
86                         type = Types.Enumerate,
87                         allowed = ["x86_64", 
88                                     "i386"],
89                         flags = Flags.Filter)
90
91         operating_system = Attribute("operatingSystem", "Constrain operating \
92                             system during resource discovery.",
93                             type = Types.Enumerate,
94                             allowed =  ["f8",
95                                         "f12",
96                                         "f14",
97                                         "centos",
98                                         "other"],
99                             flags = Flags.Filter)
100
101         site = Attribute("site", "Constrain the PlanetLab site this node \
102                 should reside on.",
103                 type = Types.Enumerate,
104                 allowed = ["PLE",
105                             "PLC",
106                             "PLJ"],
107                 flags = Flags.Filter)
108
109         min_reliability = Attribute("minReliability", "Constrain reliability \
110                             while picking PlanetLab nodes. Specifies a lower \
111                             acceptable bound.",
112                             type = Types.Double,
113                             range = (1, 100),
114                             flags = Flags.Filter)
115
116         max_reliability = Attribute("maxReliability", "Constrain reliability \
117                             while picking PlanetLab nodes. Specifies an upper \
118                             acceptable bound.",
119                             type = Types.Double,
120                             range = (1, 100),
121                             flags = Flags.Filter)
122
123         min_bandwidth = Attribute("minBandwidth", "Constrain available \
124                             bandwidth while picking PlanetLab nodes. \
125                             Specifies a lower acceptable bound.",
126                             type = Types.Double,
127                             range = (0, 2**31),
128                             flags = Flags.Filter)
129
130         max_bandwidth = Attribute("maxBandwidth", "Constrain available \
131                             bandwidth while picking PlanetLab nodes. \
132                             Specifies an upper acceptable bound.",
133                             type = Types.Double,
134                             range = (0, 2**31),
135                             flags = Flags.Filter)
136
137         min_load = Attribute("minLoad", "Constrain node load average while \
138                     picking PlanetLab nodes. Specifies a lower acceptable \
139                     bound.",
140                     type = Types.Double,
141                     range = (0, 2**31),
142                     flags = Flags.Filter)
143
144         max_load = Attribute("maxLoad", "Constrain node load average while \
145                     picking PlanetLab nodes. Specifies an upper acceptable \
146                     bound.",
147                     type = Types.Double,
148                     range = (0, 2**31),
149                     flags = Flags.Filter)
150
151         min_cpu = Attribute("minCpu", "Constrain available cpu time while \
152                     picking PlanetLab nodes. Specifies a lower acceptable \
153                     bound.",
154                     type = Types.Double,
155                     range = (0, 100),
156                     flags = Flags.Filter)
157
158         max_cpu = Attribute("maxCpu", "Constrain available cpu time while \
159                     picking PlanetLab nodes. Specifies an upper acceptable \
160                     bound.",
161                     type = Types.Double,
162                     range = (0, 100),
163                     flags = Flags.Filter)
164
165         timeframe = Attribute("timeframe", "Past time period in which to check\
166                         information about the node. Values are year,month, \
167                         week, latest",
168                         default = "week",
169                         type = Types.Enumerate,
170                         allowed = ["latest",
171                                     "week",
172                                     "month",
173                                     "year"],
174                         flags = Flags.Filter)
175
176         cls._register_attribute(ip)
177         cls._register_attribute(pl_url)
178         cls._register_attribute(pl_ptn)
179         cls._register_attribute(pl_user)
180         cls._register_attribute(pl_password)
181         cls._register_attribute(site)
182         cls._register_attribute(city)
183         cls._register_attribute(country)
184         cls._register_attribute(region)
185         cls._register_attribute(architecture)
186         cls._register_attribute(operating_system)
187         cls._register_attribute(min_reliability)
188         cls._register_attribute(max_reliability)
189         cls._register_attribute(min_bandwidth)
190         cls._register_attribute(max_bandwidth)
191         cls._register_attribute(min_load)
192         cls._register_attribute(max_load)
193         cls._register_attribute(min_cpu)
194         cls._register_attribute(max_cpu)
195         cls._register_attribute(timeframe)
196
197     def __init__(self, ec, guid):
198         super(PlanetlabNode, self).__init__(ec, guid)
199
200         self._plapi = None
201         self._node_to_provision = None
202     
203     @property
204     def plapi(self):
205         if not self._plapi:
206             pl_user = self.get("pluser")
207             pl_pass = self.get("password")
208             pl_url = self.get("plcApiUrl")
209             pl_ptn = self.get("plcApiPattern")
210
211             self._plapi =  PLCAPIFactory.get_api(pl_user, pl_pass, pl_url,
212                     pl_ptn)
213             
214         return self._plapi
215
216     def discoverl(self):
217         """
218         Based on the attributes defined by the user, discover the suitable nodes
219         """
220         hostname = self.get("hostname")
221         if hostname:
222             # the user specified one particular node to be provisioned
223             # check with PLCAPI if it is alvive
224             node_id = self._query_if_alive(hostname=hostname)
225             node_id = node_id.pop()
226
227             # check that the node is not blacklisted or already being provision 
228             # by other RM
229             blist = PlanetlabNode.blacklist
230             plist = PlanetlabNode.provisionlist
231             if node_id not in blist and node_id not in plist:
232                 
233                 # check that is really alive, by performing ping
234                 ping_ok = self._do_ping(node_id)
235                 if not ping_ok:
236                     self._blacklist_node(node_id)
237                     self.fail_node_not_alive(hostname)
238                 else:
239                     self._node_to_provision = node_id
240                     self._put_node_in_provision(node_id)
241                     super(PlanetlabNode, self).discover()
242                 
243             else:
244                 self.fail_node_not_available(hostname)                
245         
246         else:
247             # the user specifies constraints based on attributes, zero, one or 
248             # more nodes can match these constraints 
249             nodes = self._filter_based_on_attributes()
250             nodes_alive = self._query_if_alive(nodes)
251     
252             # nodes that are already part of user's slice have the priority to
253             # provisioned
254             nodes_inslice = self._check_if_in_slice(nodes_alive)
255             nodes_not_inslice = list(set(nodes_alive) - set(nodes_inslice))
256             
257             node_id = None
258             if nodes_inslice:
259                 node_id = self._choose_random_node(nodes_inslice)
260                 
261             if not node_id and nodes_not_inslice:
262                 # Either there were no matching nodes in the user's slice, or
263                 # the nodes in the slice  were blacklisted or being provisioned
264                 # by other RM. Note nodes_not_inslice is never empty
265                 node_id = self._choose_random_node(nodes_not_inslice)
266             if not node_id:
267                 self.fail_not_enough_nodes()
268
269             self._node_to_provision = node_id
270             super(PlanetlabNode, self).discover()
271             
272     def provisionl(self):
273         """
274         Add node to user's slice after verifing that the node is functioning
275         correctly
276         """
277         provision_ok = False
278         ssh_ok = False
279         proc_ok = False
280         timeout = 1200
281
282         while not provision_ok:
283             node = self._node_to_provision
284             self._set_hostname_attr(node)
285             self._add_node_to_slice(node)
286             
287             # check ssh connection
288             t = 0 
289             while t < timeout and not ssh_ok:
290
291                 cmd = 'echo \'GOOD NODE\''
292                 ((out, err), proc) = self.execute(cmd)
293                 if out.find("GOOD NODE") < 0:
294                     t = t + 60
295                     time.sleep(60)
296                     continue
297                 else:
298                     ssh_ok = True
299                     continue
300
301             if not ssh_ok:
302                 # the timeout was reach without establishing ssh connection
303                 # the node is blacklisted, deleted from the slice, and a new
304                 # node to provision is discovered
305                 self._blacklist_node(node)
306                 self._delete_node_from_slice(node)
307                 self.discover()
308                 continue
309             
310             # check /proc directory is mounted (ssh_ok = True)
311             else:
312                 cmd = 'mount |grep proc'
313                 ((out, err), proc) = self.execute(cmd)
314                 if out.find("/proc type proc") < 0:
315                     self._blacklist_node(node)
316                     self._delete_node_from_slice(node)
317                     self.discover()
318                     continue
319             
320                 else:
321                     provision_ok = True
322                     # set IP attribute
323                     ip = self._get_ip(node)
324                     self.set("ip", ip)
325             
326         super(PlanetlabNode, self).provision()
327
328     def _filter_based_on_attributes(self):
329         """
330         Retrive the list of nodes ids that match user's constraints 
331         """
332         # Map user's defined attributes with tagnames of PlanetLab
333         timeframe = self.get("timeframe")[0]
334         attr_to_tags = {
335             'city' : 'city',
336             'country' : 'country',
337             'region' : 'region',
338             'architecture' : 'arch',
339             'operatingSystem' : 'fcdistro',
340             #'site' : 'pldistro',
341             'minReliability' : 'reliability%s' % timeframe,
342             'maxReliability' : 'reliability%s' % timeframe,
343             'minBandwidth' : 'bw%s' % timeframe,
344             'maxBandwidth' : 'bw%s' % timeframe,
345             'minLoad' : 'load%s' % timeframe,
346             'maxLoad' : 'load%s' % timeframe,
347             'minCpu' : 'cpu%s' % timeframe,
348             'maxCpu' : 'cpu%s' % timeframe,
349         }
350         
351         nodes_id = []
352         filters = {}
353
354         for attr_name, attr_obj in self._attrs.iteritems():
355             attr_value = self.get(attr_name)
356             
357             if attr_value is not None and attr_obj.flags == 8 and \
358                 attr_name != 'timeframe':
359         
360                 attr_tag = attr_to_tags[attr_name]
361                 filters['tagname'] = attr_tag
362
363                 # filter nodes by fixed constraints e.g. operating system
364                 if not 'min' in attr_name and not 'max' in attr_name:
365                     filters['value'] = attr_value
366                     nodes_id = self._filter_by_fixed_attr(filters, nodes_id)
367
368                 # filter nodes by range constraints e.g. max bandwidth
369                 elif ('min' or 'max') in attr_name:
370                     nodes_id = self._filter_by_range_attr(attr_name, attr_value, filters, nodes_id)
371                 
372         return nodes_id
373                     
374
375     def _filter_by_fixed_attr(self, filters, nodes_id):
376         """
377         Query PLCAPI for nodes ids matching fixed attributes defined by the
378         user
379         """
380         node_tags = self.plapi.get_node_tags(filters)
381         if node_tags is not None:
382
383             if len(nodes_id) == 0:
384                 # first attribute being matched
385                 for node_tag in node_tags:
386                     nodes_id.append(node_tag['node_id'])
387             else:
388                 # remove the nodes ids that don't match the new attribute
389                 # that is being match
390
391                 nodes_id_tmp = []
392                 for node_tag in node_tags:
393                     if node_tag['node_id'] in nodes_id:
394                         nodes_id_tmp.append(node_tag['node_id'])
395
396                 if len(nodes_id_tmp):
397                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
398                 else:
399                     # no node from before match the new constraint
400                     self.fail_discovery()
401         else:
402             # no nodes match the filter applied
403             self.fail_discovery()
404
405         return nodes_id
406
407     def _filter_by_range_attr(self, attr_name, attr_value, filters, nodes_id):
408         """
409         Query PLCAPI for nodes ids matching attributes defined in a certain
410         range, by the user
411         """
412         node_tags = self.plapi.get_node_tags(filters)
413         if node_tags is not None:
414             
415             if len(nodes_id) == 0:
416                 # first attribute being matched
417                 for node_tag in node_tags:
418  
419                    # check that matches the min or max restriction
420                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
421                         float(node_tag['value']) > attr_value:
422                         nodes_id.append(node_tag['node_id'])
423
424                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
425                         float(node_tag['value']) < attr_value:
426                         nodes_id.append(node_tag['node_id'])
427             else:
428
429                 # remove the nodes ids that don't match the new attribute
430                 # that is being match
431                 nodes_id_tmp = []
432                 for node_tag in node_tags:
433
434                     # check that matches the min or max restriction and was a
435                     # matching previous filters
436                     if 'min' in attr_name and node_tag['value'] != 'n/a' and \
437                         float(node_tag['value']) > attr_value and \
438                         node_tag['node_id'] in nodes_id:
439                         nodes_id_tmp.append(node_tag['node_id'])
440
441                     elif 'max' in attr_name and node_tag['value'] != 'n/a' and \
442                         float(node_tag['value']) < attr_value and \
443                         node_tag['node_id'] in nodes_id:
444                         nodes_id_tmp.append(node_tag['node_id'])
445
446                 if len(nodes_id_tmp):
447                     nodes_id = set(nodes_id) & set(nodes_id_tmp)
448                 else:
449                     # no node from before match the new constraint
450                     self.fail_discovery()
451
452         else: #TODO CHECK
453             # no nodes match the filter applied
454             self.fail_discovery()
455
456         return nodes_id
457         
458     def _query_if_alive(self, nodes_id=None, hostname=None):
459         """
460         Query PLCAPI for nodes that register activity recently, using filters 
461         related to the state of the node, e.g. last time it was contacted
462         """
463         if nodes_id is None and hostname is None:
464             msg = "Specify nodes_id or hostname"
465             raise RuntimeError, msg
466
467         if nodes_id is not None and hostname is not None:
468             msg = "Specify either nodes_id or hostname"
469             raise RuntimeError, msg
470
471         # define PL filters to check the node is alive
472         filters = dict()
473         filters['run_level'] = 'boot'
474         filters['boot_state'] = 'boot'
475         filters['node_type'] = 'regular' 
476         filters['>last_contact'] =  int(time.time()) - 2*3600
477
478         # adding node_id or hostname to the filters to check for the particular
479         # node
480         if nodes_id:
481             filters['node_id'] = list(nodes_id)
482             alive_nodes_id = self._get_nodes_id(filters)
483         elif hostname:
484             filters['hostname'] = hostname
485             alive_nodes_id = self._get_nodes_id(filters)
486
487         if len(alive_nodes_id) == 0:
488             self.fail_discovery()
489         else:
490             nodes_id = list()
491             for node_id in alive_nodes_id:
492                 nid = node_id['node_id']
493                 nodes_id.append(nid)
494
495             return nodes_id
496
497     def _choose_random_node(self, nodes):
498         """
499         From the possible nodes for provision, choose randomly to decrese the
500         probability of different RMs choosing the same node for provision
501         """
502         blist = PlanetlabNode.blacklist
503         plist = PlanetlabNode.provisionlist
504
505         size = len(nodes)
506         while size:
507             size = size - 1
508             index = randint(0, size)
509             node_id = nodes[index]
510             nodes[index] = nodes[size]
511
512             # check the node is not blacklisted or being provision by other RM
513             # and perform ping to check that is really alive
514             if node_id not in blist and node_id not in plist:
515                 ping_ok = self._do_ping(node_id)
516                 if not ping_ok:
517                     self._blacklist_node(node_id)
518                 else:
519                     # discovered node for provision, added to provision list
520                     self._put_node_in_provision(node_id)
521                     return node_id
522
523     def _get_nodes_id(self, filters):
524         return self.plapi.get_nodes(filters, fields=['node_id'])
525
526     def _add_node_to_slice(self, node_id):
527         self.warn(" Adding node to slice ")
528         slicename = self.get("username")
529         with PlanetlabNode.lock_slice:
530             slice_nodes = self.plapi.get_slice_nodes(slicename)
531             slice_nodes.append(node_id)
532             self.plapi.add_slice_nodes(slicename, slice_nodes)
533
534     def _delete_node_from_slice(self, node):
535         self.warn(" Deleting node from slice ")
536         slicename = self.get("username")
537         self.plapi.delete_slice_node(slicename, [node])
538
539     def _set_hostname_attr(self, node):
540         """
541         Query PLCAPI for the hostname of a certain node id and sets the
542         attribute hostname, it will over write the previous value
543         """
544         hostname = self.plapi.get_nodes(node, ['hostname'])
545         self.set("hostname", hostname[0]['hostname'])
546
547     def _check_if_in_slice(self, nodes_id):
548         """
549         Query PLCAPI to find out if any node id from nodes_id is in the user's
550         slice
551         """
552         slicename = self.get("username")
553         slice_nodes = self.plapi.get_slice_nodes(slicename)
554         nodes_inslice = list(set(nodes_id) & set(slice_nodes))
555
556         return nodes_inslice
557
558     def _do_ping(self, node_id):
559         """
560         Perform ping command on node's IP matching node id
561         """
562         ping_ok = False
563         ip = self._get_ip(node_id)
564         command = "ping -c2 %s | echo \"PING OK\"" % ip
565
566         (out, err) = lexec(command)
567         if not out.find("PING OK") < 0:
568             ping_ok = True
569
570         return ping_ok 
571
572     def _blacklist_node(self, node):
573         """
574         Add node mal functioning node to blacklist
575         """
576         blist = PlanetlabNode.blacklist
577
578         self.warn(" Blacklisting malfunctioning node ")
579         with PlanetlabNode.lock_blist:
580             blist.append(node)
581
582     def _put_node_in_provision(self, node):
583         """
584         Add node to the list of nodes being provisioned, in order for other RMs
585         to not try to provision the same one again
586         """
587         plist = PlanetlabNode.provisionlist
588
589         self.warn(" Provisioning node ")
590         with PlanetlabNode.lock_plist:
591             plist.append(node)
592
593     def _get_ip(self, node_id):
594         """
595         Query PLCAPI for the IP of a node with certain node id
596         """
597         ip = self.plapi.get_interfaces({'node_id':node_id}, fields=['ip'])
598         ip = ip[0]['ip']
599         return ip
600
601     def fail_discovery(self):
602         self.fail()
603         msg = "Discovery failed. No candidates found for node"
604         self.error(msg)
605         raise RuntimeError, msg
606
607     def fail_node_not_alive(self, hostname):
608         msg = "Node %s not alive, pick another node" % hostname
609         raise RuntimeError, msg
610     
611     def fail_node_not_available(self, hostname):
612         msg = "Node %s not available for provisioning, pick another \
613                 node" % hostname
614         raise RuntimeError, msg
615
616     def fail_not_enough_nodes(self):
617         msg = "Not enough nodes available for provisioning"
618         raise RuntimeError, msg
619
620     def valid_connection(self, guid):
621         # TODO: Validate!
622         return True
623
624