Adding doc strings and tests
[nepi.git] / src / nepi / util / sfaapi.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
19
20 import threading
21 import hashlib
22 import re
23 import os
24
25 from nepi.util.logger import Logger
26
27 try:
28     from sfa.client.sfi import Sfi
29     from sfa.util.xrn import hrn_to_urn
30 except ImportError:
31     log = Logger("SFA API")
32     log.debug("Packages sfa-common or sfa-client not installed.\
33          Could not import sfa.client.sfi or sfa.util.xrn")
34
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
36
37 class SFAAPI(object):
38     """
39     API for quering the SFA service. It uses Sfi class from the tool sfi client.
40     """
41     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42         batch, rtype, timeout):
43
44         self._blacklist = set()
45         self._reserved = set()
46         self._resources_cache = None
47         self._already_cached = False
48         self._ec = ec 
49
50         if batch:
51             self._testbed_res = rtype
52             self._count = 0
53             self._total = self._get_total_res()
54             self._slice_resources_batch = list()
55
56         self._log = Logger("SFA API")
57         self.api = Sfi()
58         self.rspec_proc = SfaRSpecProcessing()
59         self.lock_slice = threading.Lock()
60         self.lock_blist = threading.Lock()
61         self.lock_resv = threading.Lock()
62
63         self.api.options.timeout = timeout
64         self.api.options.raw = None
65         self.api.options.user = sfi_user
66         self.api.options.auth = sfi_auth
67         self.api.options.registry = sfi_registry
68         self.api.options.sm = sfi_sm
69         self.api.options.user_private_key = private_key
70
71         # Load blacklist from file
72         if ec.get_global('PlanetlabNode', 'persist_blacklist'):
73             self._set_blacklist()
74
75     def _set_blacklist(self):
76         """
77         Initialize the blacklist with previous nodes blacklisted, in 
78         previous runs.
79         """
80         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
81         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
82         with open(plblacklist_file, 'r') as f:
83             hosts_tobl = f.read().splitlines()
84             if hosts_tobl:
85                 for host in hosts_tobl:
86                     self._blacklist.add(host)
87
88     def _get_total_res(self):
89         """
90         Get the total amount of resources instanciated using this API,
91         to be able to add them using the same Allocate and Provision
92         call at once. Specially for Wilabt testbed that doesn't allow 
93         to add slivers after the slice already has some.
94         """
95         rms = list()
96         res_gids = self._ec.resources
97         for gid in res_gids:
98             rm = self._ec.get_resource(gid)
99             if self._testbed_res.lower() in rm._rtype.lower():
100                 rms.append(rm)
101         return rms
102
103     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None):
104         """
105         Execute sfi method, which correspond to SFA call. It can be the following
106         calls: Describe, Delete, Allocate, Provision, ListResources.
107         """
108         if command in ['describe', 'delete', 'allocate', 'provision']:
109             if not slicename:
110                 raise TypeError("The slice hrn is expected for this method %s" % command)
111             if command == 'allocate' and not rspec:
112                 raise TypeError("RSpec is expected for this method %s" % command)
113             
114             if command == 'allocate':
115                 args_list = [slicename, rspec]
116             else:
117                 args_list = [slicename]
118             if command != 'delete':
119                 args_list = args_list + ['-o', '/tmp/rspec_output']
120
121         elif command == 'resources':
122             args_list = ['-o', '/tmp/rspec_output']
123
124         else: raise TypeError("Sfi method not supported")
125
126         self.api.command = command
127         self.api.command_parser = self.api.create_parser_command(self.api.command)
128         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
129         self.api.command_options = command_options
130         self.api.read_config()
131         self.api.bootstrap()
132
133         try:
134             os.remove("/tmp/rspec_output.rspec")
135         except OSError:
136             self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
137
138         try:
139             self.api.dispatch(command, command_options, command_args)
140             with open("/tmp/rspec_output.rspec", "r") as result_file:
141                 result = result_file.read()
142                 return result
143         except:
144             self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
145             return None
146
147     def get_resources_info(self):
148         """
149         Get all resources and its attributes from aggregate.
150         """
151         try:
152             rspec_slice = self._sfi_exec_method('resources')
153         except:
154             raise RuntimeError("Fail to list resources")
155    
156         self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
157         self._already_cached = True
158         return self._resources_cache
159
160     def get_resources_hrn(self, resources=None):
161         """
162         Get list of resources hrn, without the resource info.
163         """
164         if not resources:
165             if not self._already_cached:
166                 resources = self.get_resources_info()['resource']
167             else:
168                 resources = self._resources_cache['resource']
169
170         component_tohrn = dict()
171         for resource in resources:
172             hrn = resource['hrn'].replace('\\', '')
173             component_tohrn[resource['component_name']] = hrn
174
175         return component_tohrn
176             
177     def get_slice_resources(self, slicename):
178         """
179         Get resources and info from slice.
180         """
181         try:
182             with self.lock_slice:
183                 rspec_slice = self._sfi_exec_method('describe', slicename)
184         except:
185             self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
186
187         if rspec_slice is not None:
188             result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
189             return result
190         else:
191             return {'resource':[],'lease':[]}
192
193
194     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
195         """
196         Get the list of resources' urn, build the rspec string and call the allocate 
197         and provision method.
198         """
199         resources_hrn_new = list()
200         resource_parts = resource_hrn.split('.')
201         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
202         resources_hrn_new.append(resource_hrn)
203
204         with self.lock_slice:
205             rspec_slice = self._sfi_exec_method('describe', slicename)
206             if rspec_slice is not None:
207                 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
208             else: slice_resources = []
209             if slice_resources:
210                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
211                 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
212                     s_parts = s_hrn_value.split('.')
213                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
214                     resources_hrn_new.append(s_hrn)
215
216
217             resources_urn = self._get_resources_urn(resources_hrn_new)
218             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
219             f = open("/tmp/rspec_input.rspec", "w")
220             f.truncate(0)
221             f.write(rspec)
222             f.close()
223             
224             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
225                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
226
227             # ALLOCATE
228             try:
229                 self._log.debug("Allocating resources in slice %s" % slicename)
230                 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
231             except:
232                 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
233
234             if out is not None:
235                 # PROVISION
236                 try:
237                     self._log.debug("Provisioning resources in slice %s" % slicename)
238                     self._sfi_exec_method('provision', slicename) 
239                 except:
240                     raise RuntimeError("Fail to provision resource for slice %s" % slicename)
241                 return True
242
243     def add_resource_to_slice_batch(self, slicename, resource_hrn, leases=None):
244         """
245         Method to add all resources together to the slice. Previous deletion of slivers.
246         Specially used for wilabt that doesn't allow to add more resources to the slice
247         after some resources are added. Every sliver have to be deleted and the batch 
248         has to be added at once.
249         """
250         self._count += 1
251         self._slice_resources_batch.append(resource_hrn)
252         resources_hrn_new = list()
253         if self._count == len(self._total):
254             for resource_hrn in self._slice_resources_batch:
255                 resource_parts = resource_hrn.split('.')
256                 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
257                 resources_hrn_new.append(resource_hrn)
258             with self.lock_slice:
259                 self._sfi_exec_method('delete', slicename)
260                 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
261                 resources_urn = self._get_urn(resources_hrn_new)
262                 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
263
264                 f = open("/tmp/rspec_input.rspec", "w")
265                 f.truncate(0)
266                 f.write(rspec)
267                 f.close()
268
269                 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
270                     raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
271
272                 # ALLOCATE    
273                 try:
274                     self._log.debug("Allocating resources in slice %s" % slicename)
275                     out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
276                 except:
277                     raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
278
279                 if out is not None:
280                     # PROVISION
281                     try:
282                         self._log.debug("Provisioning resources in slice %s" % slicename)
283                         self._sfi_exec_method('provision', slicename)
284                     except:
285                         raise RuntimeError("Fail to provision resource for slice %s" % slicename)
286                     return True
287                 else:
288                     raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
289     
290         else:
291             self._log.debug(" Waiting for more nodes to add the batch to the slice ")
292
293     def _get_urn(self, resources_hrn):
294         """
295         Get urn from hrn.
296         """
297         resources_urn = list()
298         for hrn in resources_hrn:
299             hrn = hrn.replace("\\", "").split('.')
300             node = hrn.pop()
301             auth = '.'.join(hrn)
302             urn = ['urn:publicid:IDN+', auth, '+node+', node]
303             urn = ''.join(urn)
304             resources_urn.append(urn)
305         return resources_urn
306
307     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
308         """
309         Remove slivers from slice. Currently sfi doesn't support removing particular
310         slivers.
311         """
312         resource_urn = self._get_resources_urn([resource_hrn]).pop()
313         with self.lock_slice:
314             try:
315                 self._sfi_exec_method('delete', slicename, urn=resource_urn)
316             except:
317                 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
318             return True
319
320     def remove_all_from_slice(self, slicename):
321         """
322         De-allocate and de-provision all slivers of the named slice.
323         Currently sfi doesn't support removing particular
324         slivers, so this method works only for removing every sliver. Setting the
325         resource_hrn parameter is not necessary.
326         """
327         with self.lock_slice:
328             try:
329                 self._sfi_exec_method('delete', slicename)
330             except:
331                 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
332             return True
333
334     def _get_resources_urn(self, resources_hrn):
335         """
336         Builds list of resources' urn based on hrn.
337         """
338         resources_urn = list()
339
340         for resource in resources_hrn:
341             resources_urn.append(hrn_to_urn(resource, 'node'))
342             
343         return resources_urn
344
345     def blacklist_resource(self, resource_hrn):
346         """
347         Adding resource_hrn to blacklist, and taking 
348         the resource from the reserved list.
349         """
350         with self.lock_blist:
351             self._blacklist.add(resource_hrn)
352         with self.lock_resv:
353             if resource_hrn in self._reserved:
354                 self._reserved.remove(resource_hrn)
355
356     def blacklisted(self, resource_hrn):
357         """
358         Check if the resource is in the blacklist. 
359         """
360         with self.lock_blist:
361             if resource_hrn in self._blacklist:
362                 return True
363         return False
364
365     def reserve_resource(self, resource_hrn):
366         """
367         Add resource to the reserved list.
368         """
369         self._reserved.add(resource_hrn)
370
371     def reserved(self, resource_hrn):
372         """
373         Check that the resource in not reserved.
374         """
375         with self.lock_resv:
376             if resource_hrn in self._reserved:
377                 return True
378             else:
379                 self.reserve_resource(resource_hrn)
380                 return False
381
382 class SFAAPIFactory(object):
383     """
384     API Factory to manage a map of SFAAPI instances as key-value pairs, it
385     instanciate a single instance per key. The key represents the same SFA, 
386     credentials.
387     """
388
389     _lock = threading.Lock()
390     _apis = dict()
391
392    
393     @classmethod
394     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
395             batch = False, rtype = None, timeout = None):
396
397         if sfi_user and sfi_sm:
398             key = cls.make_key(sfi_user, sfi_sm)
399             with cls._lock:
400                 api = cls._apis.get(key)
401
402                 if not api:
403                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
404                         ec, batch, rtype, timeout)
405                     cls._apis[key] = api
406
407                 return api
408
409         return None
410
411     @classmethod
412     def make_key(cls, *args):
413         skey = "".join(map(str, args))
414         return hashlib.md5(skey).hexdigest()
415