fixed 2 bugs/typos found through testing
[nepi.git] / src / nepi / util / sfaapi.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License version 2 as
7 #    published by the Free Software Foundation;
8 #
9 #    This program is distributed in the hope that it will be useful,
10 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
11 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 #    GNU General Public License for more details.
13 #
14 #    You should have received a copy of the GNU General Public License
15 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
18
19 import threading
20 import hashlib
21 import re
22 import os
23 import time
24
25 from nepi.util.logger import Logger
26
27 try:
28     from sfa.client.sfi import Sfi
29     from sfa.util.xrn import hrn_to_urn
30 except ImportError:
31     log = Logger("SFA API")
32     log.debug("Packages sfa-common or sfa-client not installed.\
33          Could not import sfa.client.sfi or sfa.util.xrn")
34
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
36
37 class SFAAPI(object):
38     """
39     API for quering the SFA service. It uses Sfi class from the tool sfi client.
40     """
41     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42         batch, rtype, timeout):
43
44         self._blacklist = set()
45         self._reserved = set()
46         self._resources_cache = None
47         self._already_cached = False
48         self._ec = ec 
49         self.apis = 1
50
51         if batch:
52             self._testbed_res = rtype
53             self._count = 0
54             self._total = self._get_total_res()
55             self._slice_resources_batch = list()
56
57         self._log = Logger("SFA API")
58         self.api = Sfi()
59         self.rspec_proc = SfaRSpecProcessing()
60         self.lock_slice = threading.Lock()
61         self.lock_blist = threading.Lock()
62         self.lock_resv = threading.Lock()
63
64         self.api.options.timeout = timeout
65         self.api.options.raw = None
66         self.api.options.user = sfi_user
67         self.api.options.auth = sfi_auth
68         self.api.options.registry = sfi_registry
69         self.api.options.sm = sfi_sm
70         self.api.options.user_private_key = private_key
71
72         # Load blacklist from file
73         if ec.get_global('PlanetlabNode', 'persist_blacklist'):
74             self._set_blacklist()
75
76     def _set_blacklist(self):
77         """
78         Initialize the blacklist with previous nodes blacklisted, in 
79         previous runs.
80         """
81         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
82         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
83         with open(plblacklist_file, 'r') as f:
84             hosts_tobl = f.read().splitlines()
85             if hosts_tobl:
86                 for host in hosts_tobl:
87                     self._blacklist.add(host)
88
89     def _get_total_res(self):
90         """
91         Get the total amount of resources instanciated using this API,
92         to be able to add them using the same Allocate and Provision
93         call at once. Specially for Wilabt testbed that doesn't allow 
94         to add slivers after the slice already has some.
95         """
96         rms = list()
97         res_gids = self._ec.resources
98         for gid in res_gids:
99             rm = self._ec.get_resource(gid)
100             if self._testbed_res.lower() in rm._rtype.lower():
101                 rms.append(rm)
102         return rms
103
104     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
105         """
106         Execute sfi method, which correspond to SFA call. It can be the following
107         calls: Describe, Delete, Allocate, Provision, ListResources.
108         """
109         if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
110             if not slicename:
111                 raise TypeError("The slice hrn is expected for this method %s" % command)
112             if command == 'allocate' and not rspec:
113                 raise TypeError("RSpec is expected for this method %s" % command)
114             
115             if command == 'allocate':
116                 args_list = [slicename, rspec]
117             else:
118                 args_list = [slicename]
119             if command != 'delete':
120                 args_list = args_list + ['-o', '/tmp/rspec_output']
121             if command == 'action':
122                 args_list = [slicename, action]
123
124         elif command == 'resources':
125             args_list = ['-o', '/tmp/rspec_output']
126
127         else: raise TypeError("Sfi method not supported")
128
129         self.api.command = command
130         self.api.command_parser = self.api.create_parser_command(self.api.command)
131         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
132         self.api.command_options = command_options
133         self.api.read_config()
134         self.api.bootstrap()
135
136         try:
137             os.remove("/tmp/rspec_output.rspec")
138         except OSError:
139             self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
140
141         try:
142             self.api.dispatch(command, command_options, command_args)
143             with open("/tmp/rspec_output.rspec", "r") as result_file:
144                 result = result_file.read()
145                 return result
146         except:
147             self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
148             return None
149
150     def get_resources_info(self):
151         """
152         Get all resources and its attributes from aggregate.
153         """
154         try:
155             rspec_slice = self._sfi_exec_method('resources')
156         except:
157             raise RuntimeError("Fail to list resources")
158    
159         self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
160         self._already_cached = True
161         return self._resources_cache
162
163     def get_resources_hrn(self, resources=None):
164         """
165         Get list of resources hrn, without the resource info.
166         """
167         if not resources:
168             if not self._already_cached:
169                 resources = self.get_resources_info()['resource']
170             else:
171                 resources = self._resources_cache['resource']
172
173         component_tohrn = dict()
174         for resource in resources:
175             hrn = resource['hrn'].replace('\\', '')
176             component_tohrn[resource['component_name']] = hrn
177
178         return component_tohrn
179             
180     def get_slice_resources(self, slicename):
181         """
182         Get resources and info from slice.
183         """
184         try:
185             with self.lock_slice:
186                 rspec_slice = self._sfi_exec_method('describe', slicename)
187         except:
188             self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
189
190         if rspec_slice is not None:
191             result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
192             return result
193         else:
194             return {'resource':[],'lease':[]}
195
196
197     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
198         """
199         Get the list of resources' urn, build the rspec string and call the allocate 
200         and provision method.
201         """
202         resources_hrn_new = list()
203         resource_parts = resource_hrn.split('.')
204         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
205         resources_hrn_new.append(resource_hrn)
206
207         with self.lock_slice:
208             rspec_slice = self._sfi_exec_method('describe', slicename)
209             if rspec_slice is not None:
210                 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
211             else: slice_resources = []
212             if slice_resources:
213                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
214                 for s_hrn_key, s_hrn_value in slice_resources_hrn.items():
215                     s_parts = s_hrn_value.split('.')
216                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
217                     resources_hrn_new.append(s_hrn)
218
219
220             resources_urn = self._get_resources_urn(resources_hrn_new)
221             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
222             with open("/tmp/rspec_input.rspec", "w") as f:
223                 f.truncate(0)
224                 f.write(rspec)
225             
226             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
227                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
228
229             # ALLOCATE
230             try:
231                 self._log.debug("Allocating resources in slice %s" % slicename)
232                 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
233             except:
234                 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
235
236             if out is not None:
237                 # PROVISION
238                 try:
239                     self._log.debug("Provisioning resources in slice %s" % slicename)
240                     self._sfi_exec_method('provision', slicename) 
241                 except:
242                     raise RuntimeError("Fail to provision resource for slice %s" % slicename)
243                 return True
244
245     def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
246         """
247         Method to add all resources together to the slice. Previous deletion of slivers.
248         Specially used for wilabt that doesn't allow to add more resources to the slice
249         after some resources are added. Every sliver have to be deleted and the batch 
250         has to be added at once.
251         """
252         self._count += 1
253         self._slice_resources_batch.append(resource_hrn)
254         resources_hrn_new = list()
255         if self._count == len(self._total):
256             check_all_inslice = self._check_all_inslice(self._slice_resources_batch, slicename)
257             if check_all_inslice == True:
258                 return True
259             for resource_hrn in self._slice_resources_batch:
260                 resource_parts = resource_hrn.split('.')
261                 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
262                 resources_hrn_new.append(resource_hrn)
263             with self.lock_slice:
264                 if check_all_inslice != 0:
265                     self._sfi_exec_method('delete', slicename)
266                     time.sleep(480)
267                 
268                 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
269                 resources_urn = self._get_urn(resources_hrn_new)
270                 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
271                 with open("/tmp/rspec_input.rspec", "w") as f:
272                     f.truncate(0)
273                     f.write(rspec)
274
275                 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
276                     raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
277
278                 # ALLOCATE    
279                 try:
280                     self._log.debug("Allocating resources in slice %s" % slicename)
281                     out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
282                 except:
283                     raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
284
285                 if out is not None:
286                     # PROVISION
287                     try:
288                         self._log.debug("Provisioning resources in slice %s" % slicename)
289                         self._sfi_exec_method('provision', slicename)
290                         self._sfi_exec_method('action', slicename=slicename, action='geni_start')
291                     except:
292                         raise RuntimeError("Fail to provision resource for slice %s" % slicename)
293                     return True
294                 else:
295                     raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
296     
297         else:
298             self._log.debug(" Waiting for more nodes to add the batch to the slice ")
299
300     def _check_all_inslice(self, resources_hrn, slicename):
301         slice_res = self.get_slice_resources(slicename)['resource']
302         if slice_res:
303             if len(slice_res[0]['services']) != 0:
304                 # 2to3 added list() and it is useful
305                 slice_res_hrn = list(self.get_resources_hrn(slice_res).values())
306                 if self._compare_lists(slice_res_hrn, resources_hrn):
307                     return True
308                 else: return len(slice_res_hrn)
309         return 0
310
311     def _compare_lists(self, list1, list2):
312         if len(list1) != len(list2):
313             return False
314         for item in list1:
315             if item not in list2:
316                 return False
317         return True
318
319     def _get_urn(self, resources_hrn):
320         """
321         Get urn from hrn.
322         """
323         resources_urn = list()
324         for hrn in resources_hrn:
325             hrn = hrn.replace("\\", "").split('.')
326             node = hrn.pop()
327             auth = '.'.join(hrn)
328             urn = ['urn:publicid:IDN+', auth, '+node+', node]
329             urn = ''.join(urn)
330             resources_urn.append(urn)
331         return resources_urn
332
333     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
334         """
335         Remove slivers from slice. Currently sfi doesn't support removing particular
336         slivers.
337         """
338         resource_urn = self._get_resources_urn([resource_hrn]).pop()
339         with self.lock_slice:
340             try:
341                 self._sfi_exec_method('delete', slicename, urn=resource_urn)
342             except:
343                 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
344             return True
345
346     def remove_all_from_slice(self, slicename):
347         """
348         De-allocate and de-provision all slivers of the named slice.
349         Currently sfi doesn't support removing particular
350         slivers, so this method works only for removing every sliver. Setting the
351         resource_hrn parameter is not necessary.
352         """
353         with self.lock_slice:
354             try:
355                 self._sfi_exec_method('delete', slicename)
356             except:
357                 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
358             return True
359
360     def _get_resources_urn(self, resources_hrn):
361         """
362         Builds list of resources' urn based on hrn.
363         """
364         resources_urn = list()
365
366         for resource in resources_hrn:
367             resources_urn.append(hrn_to_urn(resource, 'node'))
368             
369         return resources_urn
370
371     def blacklist_resource(self, resource_hrn):
372         """
373         Adding resource_hrn to blacklist, and taking 
374         the resource from the reserved list.
375         """
376         with self.lock_blist:
377             self._blacklist.add(resource_hrn)
378         with self.lock_resv:
379             if resource_hrn in self._reserved:
380                 self._reserved.remove(resource_hrn)
381
382     def blacklisted(self, resource_hrn):
383         """
384         Check if the resource is in the blacklist. 
385         """
386         with self.lock_blist:
387             if resource_hrn in self._blacklist:
388                 return True
389         return False
390
391     def reserve_resource(self, resource_hrn):
392         """
393         Add resource to the reserved list.
394         """
395         self._reserved.add(resource_hrn)
396
397     def reserved(self, resource_hrn):
398         """
399         Check that the resource in not reserved.
400         """
401         with self.lock_resv:
402             if resource_hrn in self._reserved:
403                 return True
404             else:
405                 self.reserve_resource(resource_hrn)
406                 return False
407
408     def release(self):
409         """
410         Remove hosts from the reserved and blacklist lists, and in case
411         the persist attribute is set, it saves the blacklisted hosts
412         in the blacklist file.
413         """
414         self.apis -= 1
415         if self.apis == 0:
416             blacklist = self._blacklist
417             self._blacklist = set()
418             self._reserved = set()
419 #            if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
420 #                if blacklist:
421 #                    to_blacklist = list()
422 #                    hostnames = self.get_nodes(list(blacklist), ['hostname'])
423 #                    for hostname in hostnames:
424 #                        to_blacklist.append(hostname['hostname'])
425 #
426 #                    nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
427 #                    plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
428 #
429 #                    with open(plblacklist_file, 'w') as f:
430 #                        for host in to_blacklist:
431 #                            f.write("%s\n" % host)
432 #
433
434
435 class SFAAPIFactory(object):
436     """
437     API Factory to manage a map of SFAAPI instances as key-value pairs, it
438     instanciate a single instance per key. The key represents the same SFA, 
439     credentials.
440     """
441
442     _lock = threading.Lock()
443     _apis = dict()
444
445    
446     @classmethod
447     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
448             batch = False, rtype = None, timeout = None):
449
450         if sfi_user and sfi_sm:
451             key = cls.make_key(sfi_user, sfi_sm)
452             with cls._lock:
453                 api = cls._apis.get(key)
454
455                 if not api:
456                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
457                         ec, batch, rtype, timeout)
458                     cls._apis[key] = api
459                 else:
460                     api.apis += 1
461
462                 return api
463
464         return None
465
466     @classmethod
467     def make_key(cls, *args):
468         skey = "".join(map(str, args))
469         return hashlib.md5(skey).hexdigest()
470