48dfb48543b644045afb0173d857fe411eec5521
[nepi.git] / src / nepi / util / sfaapi.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
19
20 import threading
21 import hashlib
22 import re
23 import os
24
25 from nepi.util.logger import Logger
26
27 try:
28     from sfa.client.sfi import Sfi
29     from sfa.util.xrn import hrn_to_urn
30 except ImportError:
31     log = Logger("SFA API")
32     log.debug("Packages sfa-common or sfa-client not installed.\
33          Could not import sfa.client.sfi or sfa.util.xrn")
34
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
36
37 class SFAAPI(object):
38     """
39     API for quering the SFA service.
40     """
41     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42         batch, rtype, timeout):
43
44         self._blacklist = set()
45         self._reserved = set()
46         self._resources_cache = None
47         self._already_cached = False
48         self._ec = ec 
49
50         if batch:
51             self._testbed_res = rtype
52             self._count = 0
53             self._total = self._get_total_res()
54             self._slice_resources_batch = list()
55
56         self._log = Logger("SFA API")
57         self.api = Sfi()
58         self.rspec_proc = SfaRSpecProcessing()
59         self.lock_slice = threading.Lock()
60         self.lock_blist = threading.Lock()
61         self.lock_resv = threading.Lock()
62
63         self.api.options.timeout = timeout
64         self.api.options.raw = None
65         self.api.options.user = sfi_user
66         self.api.options.auth = sfi_auth
67         self.api.options.registry = sfi_registry
68         self.api.options.sm = sfi_sm
69         self.api.options.user_private_key = private_key
70
71         # Load blacklist from file
72         if ec.get_global('PlanetlabNode', 'persist_blacklist'):
73             self._set_blacklist()
74
75     def _set_blacklist(self):
76         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
77         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
78         with open(plblacklist_file, 'r') as f:
79             hosts_tobl = f.read().splitlines()
80             if hosts_tobl:
81                 for host in hosts_tobl:
82                     self._blacklist.add(host)
83
84     def _get_total_res(self):
85         rms = list()
86         res_gids = self._ec.resources
87         for gid in res_gids:
88             rm = self._ec.get_resource(gid)
89             if self._testbed_res.lower() in rm._rtype.lower():
90                 rms.append(rm)
91         return rms
92
93     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None):
94         """
95         Execute sfi method.
96         """
97         if command in ['describe', 'delete', 'allocate', 'provision']:
98             if not slicename:
99                 raise TypeError("The slice hrn is expected for this method %s" % command)
100             if command == 'allocate' and not rspec:
101                 raise TypeError("RSpec is expected for this method %s" % command)
102             
103             if command == 'allocate':
104                 args_list = [slicename, rspec]
105             else:
106                 args_list = [slicename]
107             if command != 'delete':
108                 args_list = args_list + ['-o', '/tmp/rspec_output']
109
110         elif command == 'resources':
111             args_list = ['-o', '/tmp/rspec_output']
112
113         else: raise TypeError("Sfi method not supported")
114
115         self.api.command = command
116         self.api.command_parser = self.api.create_parser_command(self.api.command)
117         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
118         self.api.command_options = command_options
119         self.api.read_config()
120         self.api.bootstrap()
121
122         try:
123             os.remove("/tmp/rspec_output.rspec")
124         except OSError:
125             self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
126
127         try:
128             self.api.dispatch(command, command_options, command_args)
129             with open("/tmp/rspec_output.rspec", "r") as result_file:
130                 result = result_file.read()
131                 return result
132         except:
133             self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
134             return None
135
136     def get_resources_info(self):
137         """
138         Get all resources and its attributes from aggregate.
139         """
140         try:
141             rspec_slice = self._sfi_exec_method('resources')
142         except:
143             raise RuntimeError("Fail to list resources")
144    
145         self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
146         self._already_cached = True
147         return self._resources_cache
148
149     def get_resources_hrn(self, resources=None):
150         """
151         Get list of resources hrn, without the resource info.
152         """
153         if not resources:
154             if not self._already_cached:
155                 resources = self.get_resources_info()['resource']
156             else:
157                 resources = self._resources_cache['resource']
158
159         component_tohrn = dict()
160         for resource in resources:
161             hrn = resource['hrn'].replace('\\', '')
162             component_tohrn[resource['component_name']] = hrn
163
164         return component_tohrn
165             
166     def get_slice_resources(self, slicename):
167         """
168         Get resources and info from slice.
169         """
170         try:
171             with self.lock_slice:
172                 rspec_slice = self._sfi_exec_method('describe', slicename)
173         except:
174             self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
175
176         if rspec_slice is not None:
177             result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
178             return result
179         else:
180             return {'resource':[],'lease':[]}
181
182
183     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
184         """
185         Get the list of resources' urn, build the rspec string and call the allocate 
186         and provision method.
187         """
188         resources_hrn_new = list()
189         resource_parts = resource_hrn.split('.')
190         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
191         resources_hrn_new.append(resource_hrn)
192
193         with self.lock_slice:
194             rspec_slice = self._sfi_exec_method('describe', slicename)
195             if rspec_slice is not None:
196                 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
197             else: slice_resources = []
198             if slice_resources:
199                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
200                 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
201                     s_parts = s_hrn_value.split('.')
202                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
203                     resources_hrn_new.append(s_hrn)
204
205
206             resources_urn = self._get_resources_urn(resources_hrn_new)
207             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
208             f = open("/tmp/rspec_input.rspec", "w")
209             f.truncate(0)
210             f.write(rspec)
211             f.close()
212             
213             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
214                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
215
216             # ALLOCATE
217             try:
218                 self._log.debug("Allocating resources in slice %s" % slicename)
219                 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
220             except:
221                 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
222
223             if out is not None:
224                 # PROVISION
225                 try:
226                     self._log.debug("Provisioning resources in slice %s" % slicename)
227                     self._sfi_exec_method('provision', slicename) 
228                 except:
229                     raise RuntimeError("Fail to provision resource for slice %s" % slicename)
230                 return True
231
232     def add_resource_to_slice_batch(self, slicename, resource_hrn, leases=None):
233         """
234         Method to add all resources together to the slice. Previous deletion of slivers.
235         """
236         # Specially used for wilabt that doesn't allow to add more resources to the slice
237         # after some resources are added. Every sliver have to be deleted and the batch 
238         # has to be added at once.
239         self._count += 1
240         self._slice_resources_batch.append(resource_hrn)
241         resources_hrn_new = list()
242         if self._count == len(self._total):
243             for resource_hrn in self._slice_resources_batch:
244                 resource_parts = resource_hrn.split('.')
245                 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
246                 resources_hrn_new.append(resource_hrn)
247             with self.lock_slice:
248                 self._sfi_exec_method('delete', slicename)
249                 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
250                 resources_urn = self._get_urn(resources_hrn_new)
251                 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
252
253                 f = open("/tmp/rspec_input.rspec", "w")
254                 f.truncate(0)
255                 f.write(rspec)
256                 f.close()
257
258                 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
259                     raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
260
261                 # ALLOCATE    
262                 try:
263                     self._log.debug("Allocating resources in slice %s" % slicename)
264                     out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
265                 except:
266                     raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
267
268                 if out is not None:
269                     # PROVISION
270                     try:
271                         self._log.debug("Provisioning resources in slice %s" % slicename)
272                         self._sfi_exec_method('provision', slicename)
273                     except:
274                         raise RuntimeError("Fail to provision resource for slice %s" % slicename)
275                     return True
276                 else:
277                     raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
278     
279         else:
280             self._log.debug(" Waiting for more nodes to add the batch to the slice ")
281
282     def _get_urn(self, resources_hrn):
283         """
284         Get urn from hrn.
285         """
286         resources_urn = list()
287         for hrn in resources_hrn:
288             hrn = hrn.replace("\\", "").split('.')
289             node = hrn.pop()
290             auth = '.'.join(hrn)
291             urn = ['urn:publicid:IDN+', auth, '+node+', node]
292             urn = ''.join(urn)
293             resources_urn.append(urn)
294         return resources_urn
295
296     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
297         """
298         Get the list of resources' urn, build the rspec string and call the allocate 
299         and provision method.
300         """
301         resource_urn = self._get_resources_urn([resource_hrn]).pop()
302         with self.lock_slice:
303             try:
304                 self._sfi_exec_method('delete', slicename, urn=resource_urn)
305             except:
306                 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
307             return True
308
309     def remove_all_from_slice(self, slicename):
310         """
311         De-allocate and de-provision all slivers of the named slice.
312         """
313         with self.lock_slice:
314             try:
315                 self._sfi_exec_method('delete', slicename)
316             except:
317                 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
318             return True
319
320     def _get_resources_urn(self, resources_hrn):
321         """
322         Builds list of resources' urn based on hrn.
323         """
324         resources_urn = list()
325
326         for resource in resources_hrn:
327             resources_urn.append(hrn_to_urn(resource, 'node'))
328             
329         return resources_urn
330
331     def blacklist_resource(self, resource_hrn):
332         with self.lock_blist:
333             self._blacklist.add(resource_hrn)
334         with self.lock_resv:
335             if resource_hrn in self._reserved:
336                 self._reserved.remove(resource_hrn)
337
338     def blacklisted(self, resource_hrn):
339         with self.lock_blist:
340             if resource_hrn in self._blacklist:
341                 return True
342         return False
343
344     def reserve_resource(self, resource_hrn):
345         self._reserved.add(resource_hrn)
346
347     def reserved(self, resource_hrn):
348         with self.lock_resv:
349             if resource_hrn in self._reserved:
350                 return True
351             else:
352                 self.reserve_resource(resource_hrn)
353                 return False
354
355 class SFAAPIFactory(object):
356     """
357     API Factory to manage a map of SFAAPI instances as key-value pairs, it
358     instanciate a single instance per key. The key represents the same SFA, 
359     credentials.
360     """
361
362     _lock = threading.Lock()
363     _apis = dict()
364
365    
366     @classmethod
367     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
368             batch = False, rtype = None, timeout = None):
369
370         if sfi_user and sfi_sm:
371             key = cls.make_key(sfi_user, sfi_sm)
372             with cls._lock:
373                 api = cls._apis.get(key)
374
375                 if not api:
376                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
377                         ec, batch, rtype, timeout)
378                     cls._apis[key] = api
379
380                 return api
381
382         return None
383
384     @classmethod
385     def make_key(cls, *args):
386         skey = "".join(map(str, args))
387         return hashlib.md5(skey).hexdigest()
388