9c3b5d34dd18ae8a9ec6d71347b8e8cb03275de3
[nepi.git] / src / nepi / util / sfaapi.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
19
20 import threading
21 import hashlib
22 import re
23 import os
24 import time
25
26 from nepi.util.logger import Logger
27
28 try:
29     from sfa.client.sfi import Sfi
30     from sfa.util.xrn import hrn_to_urn
31 except ImportError:
32     log = Logger("SFA API")
33     log.debug("Packages sfa-common or sfa-client not installed.\
34          Could not import sfa.client.sfi or sfa.util.xrn")
35
36 from nepi.util.sfarspec_proc import SfaRSpecProcessing
37
38 class SFAAPI(object):
39     """
40     API for quering the SFA service. It uses Sfi class from the tool sfi client.
41     """
42     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
43         batch, rtype, timeout):
44
45         self._blacklist = set()
46         self._reserved = set()
47         self._resources_cache = None
48         self._already_cached = False
49         self._ec = ec 
50         self.apis = 1
51
52         if batch:
53             self._testbed_res = rtype
54             self._count = 0
55             self._total = self._get_total_res()
56             self._slice_resources_batch = list()
57
58         self._log = Logger("SFA API")
59         self.api = Sfi()
60         self.rspec_proc = SfaRSpecProcessing()
61         self.lock_slice = threading.Lock()
62         self.lock_blist = threading.Lock()
63         self.lock_resv = threading.Lock()
64
65         self.api.options.timeout = timeout
66         self.api.options.raw = None
67         self.api.options.user = sfi_user
68         self.api.options.auth = sfi_auth
69         self.api.options.registry = sfi_registry
70         self.api.options.sm = sfi_sm
71         self.api.options.user_private_key = private_key
72
73         # Load blacklist from file
74         if ec.get_global('PlanetlabNode', 'persist_blacklist'):
75             self._set_blacklist()
76
77     def _set_blacklist(self):
78         """
79         Initialize the blacklist with previous nodes blacklisted, in 
80         previous runs.
81         """
82         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
83         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
84         with open(plblacklist_file, 'r') as f:
85             hosts_tobl = f.read().splitlines()
86             if hosts_tobl:
87                 for host in hosts_tobl:
88                     self._blacklist.add(host)
89
90     def _get_total_res(self):
91         """
92         Get the total amount of resources instanciated using this API,
93         to be able to add them using the same Allocate and Provision
94         call at once. Specially for Wilabt testbed that doesn't allow 
95         to add slivers after the slice already has some.
96         """
97         rms = list()
98         res_gids = self._ec.resources
99         for gid in res_gids:
100             rm = self._ec.get_resource(gid)
101             if self._testbed_res.lower() in rm._rtype.lower():
102                 rms.append(rm)
103         return rms
104
105     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
106         """
107         Execute sfi method, which correspond to SFA call. It can be the following
108         calls: Describe, Delete, Allocate, Provision, ListResources.
109         """
110         if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
111             if not slicename:
112                 raise TypeError("The slice hrn is expected for this method %s" % command)
113             if command == 'allocate' and not rspec:
114                 raise TypeError("RSpec is expected for this method %s" % command)
115             
116             if command == 'allocate':
117                 args_list = [slicename, rspec]
118             else:
119                 args_list = [slicename]
120             if command != 'delete':
121                 args_list = args_list + ['-o', '/tmp/rspec_output']
122             if command == 'action':
123                 args_list = [slicename, action]
124
125         elif command == 'resources':
126             args_list = ['-o', '/tmp/rspec_output']
127
128         else: raise TypeError("Sfi method not supported")
129
130         self.api.command = command
131         self.api.command_parser = self.api.create_parser_command(self.api.command)
132         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
133         self.api.command_options = command_options
134         self.api.read_config()
135         self.api.bootstrap()
136
137         try:
138             os.remove("/tmp/rspec_output.rspec")
139         except OSError:
140             self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
141
142         try:
143             self.api.dispatch(command, command_options, command_args)
144             with open("/tmp/rspec_output.rspec", "r") as result_file:
145                 result = result_file.read()
146                 return result
147         except:
148             self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
149             return None
150
151     def get_resources_info(self):
152         """
153         Get all resources and its attributes from aggregate.
154         """
155         try:
156             rspec_slice = self._sfi_exec_method('resources')
157         except:
158             raise RuntimeError("Fail to list resources")
159    
160         self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
161         self._already_cached = True
162         return self._resources_cache
163
164     def get_resources_hrn(self, resources=None):
165         """
166         Get list of resources hrn, without the resource info.
167         """
168         if not resources:
169             if not self._already_cached:
170                 resources = self.get_resources_info()['resource']
171             else:
172                 resources = self._resources_cache['resource']
173
174         component_tohrn = dict()
175         for resource in resources:
176             hrn = resource['hrn'].replace('\\', '')
177             component_tohrn[resource['component_name']] = hrn
178
179         return component_tohrn
180             
181     def get_slice_resources(self, slicename):
182         """
183         Get resources and info from slice.
184         """
185         try:
186             with self.lock_slice:
187                 rspec_slice = self._sfi_exec_method('describe', slicename)
188         except:
189             self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
190
191         if rspec_slice is not None:
192             result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
193             return result
194         else:
195             return {'resource':[],'lease':[]}
196
197
198     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
199         """
200         Get the list of resources' urn, build the rspec string and call the allocate 
201         and provision method.
202         """
203         resources_hrn_new = list()
204         resource_parts = resource_hrn.split('.')
205         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
206         resources_hrn_new.append(resource_hrn)
207
208         with self.lock_slice:
209             rspec_slice = self._sfi_exec_method('describe', slicename)
210             if rspec_slice is not None:
211                 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
212             else: slice_resources = []
213             if slice_resources:
214                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
215                 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
216                     s_parts = s_hrn_value.split('.')
217                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
218                     resources_hrn_new.append(s_hrn)
219
220
221             resources_urn = self._get_resources_urn(resources_hrn_new)
222             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, None, leases)
223             f = open("/tmp/rspec_input.rspec", "w")
224             f.truncate(0)
225             f.write(rspec)
226             f.close()
227             
228             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
229                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
230
231             # ALLOCATE
232             try:
233                 self._log.debug("Allocating resources in slice %s" % slicename)
234                 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
235             except:
236                 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
237
238             if out is not None:
239                 # PROVISION
240                 try:
241                     self._log.debug("Provisioning resources in slice %s" % slicename)
242                     self._sfi_exec_method('provision', slicename) 
243                 except:
244                     raise RuntimeError("Fail to provision resource for slice %s" % slicename)
245                 return True
246
247     def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
248         """
249         Method to add all resources together to the slice. Previous deletion of slivers.
250         Specially used for wilabt that doesn't allow to add more resources to the slice
251         after some resources are added. Every sliver have to be deleted and the batch 
252         has to be added at once.
253         """
254         self._count += 1
255         self._slice_resources_batch.append(resource_hrn)
256         resources_hrn_new = list()
257         if self._count == len(self._total):
258             check_all_inslice = self._check_all_inslice(self._slice_resources_batch, slicename)
259             if check_all_inslice == True:
260                 return True
261             for resource_hrn in self._slice_resources_batch:
262                 resource_parts = resource_hrn.split('.')
263                 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
264                 resources_hrn_new.append(resource_hrn)
265             with self.lock_slice:
266                 if check_all_inslice != 0:
267                     self._sfi_exec_method('delete', slicename)
268                     time.sleep(480)
269                 
270                 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
271                 resources_urn = self._get_urn(resources_hrn_new)
272                 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
273                 f = open("/tmp/rspec_input.rspec", "w")
274                 f.truncate(0)
275                 f.write(rspec)
276                 f.close()
277
278                 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
279                     raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
280
281                 # ALLOCATE    
282                 try:
283                     self._log.debug("Allocating resources in slice %s" % slicename)
284                     out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
285                 except:
286                     raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
287
288                 if out is not None:
289                     # PROVISION
290                     try:
291                         self._log.debug("Provisioning resources in slice %s" % slicename)
292                         self._sfi_exec_method('provision', slicename)
293                         self._sfi_exec_method('action', slicename=slicename, action='geni_start')
294                     except:
295                         raise RuntimeError("Fail to provision resource for slice %s" % slicename)
296                     return True
297                 else:
298                     raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
299     
300         else:
301             self._log.debug(" Waiting for more nodes to add the batch to the slice ")
302
303     def _check_all_inslice(self, resources_hrn, slicename):
304         slice_res = self.get_slice_resources(slicename)['resource']
305         if slice_res:
306             if len(slice_res[0]['services']) != 0:
307                 slice_res_hrn = self.get_resources_hrn(slice_res).values()
308                 if self._compare_lists(slice_res_hrn, resources_hrn):
309                     return True
310                 else: return len(slice_res_hrn)
311         return 0
312
313     def _compare_lists(self, list1, list2):
314         if len(list1) != len(list2):
315             return False
316         for item in list1:
317             if item not in list2:
318                 return False
319         return True
320
321     def _get_urn(self, resources_hrn):
322         """
323         Get urn from hrn.
324         """
325         resources_urn = list()
326         for hrn in resources_hrn:
327             hrn = hrn.replace("\\", "").split('.')
328             node = hrn.pop()
329             auth = '.'.join(hrn)
330             urn = ['urn:publicid:IDN+', auth, '+node+', node]
331             urn = ''.join(urn)
332             resources_urn.append(urn)
333         return resources_urn
334
335     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
336         """
337         Remove slivers from slice. Currently sfi doesn't support removing particular
338         slivers.
339         """
340         resource_urn = self._get_resources_urn([resource_hrn]).pop()
341         with self.lock_slice:
342             try:
343                 self._sfi_exec_method('delete', slicename, urn=resource_urn)
344             except:
345                 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
346             return True
347
348     def remove_all_from_slice(self, slicename):
349         """
350         De-allocate and de-provision all slivers of the named slice.
351         Currently sfi doesn't support removing particular
352         slivers, so this method works only for removing every sliver. Setting the
353         resource_hrn parameter is not necessary.
354         """
355         with self.lock_slice:
356             try:
357                 self._sfi_exec_method('delete', slicename)
358             except:
359                 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
360             return True
361
362     def _get_resources_urn(self, resources_hrn):
363         """
364         Builds list of resources' urn based on hrn.
365         """
366         resources_urn = list()
367
368         for resource in resources_hrn:
369             resources_urn.append(hrn_to_urn(resource, 'node'))
370             
371         return resources_urn
372
373     def blacklist_resource(self, resource_hrn):
374         """
375         Adding resource_hrn to blacklist, and taking 
376         the resource from the reserved list.
377         """
378         with self.lock_blist:
379             self._blacklist.add(resource_hrn)
380         with self.lock_resv:
381             if resource_hrn in self._reserved:
382                 self._reserved.remove(resource_hrn)
383
384     def blacklisted(self, resource_hrn):
385         """
386         Check if the resource is in the blacklist. 
387         """
388         with self.lock_blist:
389             if resource_hrn in self._blacklist:
390                 return True
391         return False
392
393     def reserve_resource(self, resource_hrn):
394         """
395         Add resource to the reserved list.
396         """
397         self._reserved.add(resource_hrn)
398
399     def reserved(self, resource_hrn):
400         """
401         Check that the resource in not reserved.
402         """
403         with self.lock_resv:
404             if resource_hrn in self._reserved:
405                 return True
406             else:
407                 self.reserve_resource(resource_hrn)
408                 return False
409
410     def release(self):
411         """
412         Remove hosts from the reserved and blacklist lists, and in case
413         the persist attribute is set, it saves the blacklisted hosts
414         in the blacklist file.
415         """
416         self.apis -= 1
417         if self.apis == 0:
418             blacklist = self._blacklist
419             self._blacklist = set()
420             self._reserved = set()
421 #            if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
422 #                if blacklist:
423 #                    to_blacklist = list()
424 #                    hostnames = self.get_nodes(list(blacklist), ['hostname'])
425 #                    for hostname in hostnames:
426 #                        to_blacklist.append(hostname['hostname'])
427 #
428 #                    nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
429 #                    plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
430 #
431 #                    with open(plblacklist_file, 'w') as f:
432 #                        for host in to_blacklist:
433 #                            f.write("%s\n" % host)
434 #
435
436
437 class SFAAPIFactory(object):
438     """
439     API Factory to manage a map of SFAAPI instances as key-value pairs, it
440     instanciate a single instance per key. The key represents the same SFA, 
441     credentials.
442     """
443
444     _lock = threading.Lock()
445     _apis = dict()
446
447    
448     @classmethod
449     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
450             batch = False, rtype = None, timeout = None):
451
452         if sfi_user and sfi_sm:
453             key = cls.make_key(sfi_user, sfi_sm)
454             with cls._lock:
455                 api = cls._apis.get(key)
456
457                 if not api:
458                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
459                         ec, batch, rtype, timeout)
460                     cls._apis[key] = api
461                 else:
462                     api.apis += 1
463
464                 return api
465
466         return None
467
468     @classmethod
469     def make_key(cls, *args):
470         skey = "".join(map(str, args))
471         return hashlib.md5(skey).hexdigest()
472