d2888fdbd575abdd3a9c056ac4c55d78be3195e3
[nepi.git] / src / nepi / util / sfaapi.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
19
20 import threading
21 import hashlib
22 import re
23 import os
24
25 from nepi.util.logger import Logger
26
27 try:
28     from sfa.client.sfi import Sfi
29     from sfa.util.xrn import hrn_to_urn
30 except ImportError:
31     log = Logger("SFA API")
32     log.debug("Packages sfa-common or sfa-client not installed.\
33          Could not import sfa.client.sfi or sfa.util.xrn")
34
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
36
37 class SFAAPI(object):
38     """
39     API for quering the SFA service.
40     """
41     def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42         timeout):
43
44         self._blacklist = set()
45         self._reserved = set()
46         self._resources_cache = None
47         self._already_cached = False
48         self._log = Logger("SFA API")
49         self.api = Sfi()
50         self.rspec_proc = SfaRSpecProcessing()
51         self.lock_slice = threading.Lock()
52         self.lock_blist = threading.Lock()
53         self.lock_resv = threading.Lock()
54
55         self.api.options.timeout = timeout
56         self.api.options.raw = None
57         self.api.options.user = sfi_user
58         self.api.options.auth = sfi_auth
59         self.api.options.registry = sfi_registry
60         self.api.options.sm = sfi_sm
61         self.api.options.user_private_key = private_key
62
63         # Load blacklist from file
64         if ec.get_global('PlanetlabNode', 'persist_blacklist'):
65             self._set_blacklist()
66
67     def _set_blacklist(self):
68         nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
69         plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
70         with open(plblacklist_file, 'r') as f:
71             hosts_tobl = f.read().splitlines()
72             if hosts_tobl:
73                 for host in hosts_tobl:
74                     self._blacklist.add(host)
75
76     def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None):
77         """
78         Execute sfi method.
79         """
80         if command in ['describe', 'delete', 'allocate', 'provision']:
81             if not slicename:
82                 raise TypeError("The slice hrn is expected for this method %s" % command)
83             if command == 'allocate' and not rspec:
84                 raise TypeError("RSpec is expected for this method %s" % command)
85             
86             if command == 'allocate':
87                 args_list = [slicename, rspec]
88             elif command == 'delete':
89                 args_list = [slicename, urn]
90             else: args_list = [slicename, '-o', '/tmp/rspec_output']
91
92         elif command == 'resources':
93             args_list = ['-o', '/tmp/rspec_output']
94
95         else: raise TypeError("Sfi method not supported")
96
97         self.api.command = command
98         self.api.command_parser = self.api.create_parser_command(self.api.command)
99         (command_options, command_args) = self.api.command_parser.parse_args(args_list)
100         #print "1 %s" % command_options.info
101         #command_options.info = ""
102         #print "2 %s" % command_options.info
103         self.api.command_options = command_options
104         self.api.read_config()
105         self.api.bootstrap()
106
107         self.api.dispatch(command, command_options, command_args)
108         with open("/tmp/rspec_output.rspec", "r") as result_file:
109             result = result_file.read()
110         return result
111
112     def get_resources_info(self):
113         """
114         Get all resources and its attributes from aggregate.
115         """
116         try:
117             rspec_slice = self._sfi_exec_method('resources')
118         except:
119             raise RuntimeError("Fail to list resources")
120    
121         self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
122         self._already_cached = True
123         return self._resources_cache
124
125     def get_resources_hrn(self, resources=None):
126         """
127         Get list of resources hrn, without the resource info.
128         """
129         if not resources:
130             if not self._already_cached:
131                 resources = self.get_resources_info()['resource']
132             else:
133                 resources = self._resources_cache['resource']
134
135         component_tohrn = dict()
136         for resource in resources:
137             hrn = resource['hrn'].replace('\\', '')
138             component_tohrn[resource['component_name']] = hrn
139
140         return component_tohrn
141             
142     def get_slice_resources(self, slicename):
143         """
144         Get resources and info from slice.
145         """
146         try:
147             with self.lock_slice:
148                 rspec_slice = self._sfi_exec_method('describe', slicename)
149         except:
150             raise RuntimeError("Fail to describe resource for slice %s" % slicename)
151
152         result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
153         return result
154
155
156     def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
157         """
158         Get the list of resources' urn, build the rspec string and call the allocate 
159         and provision method.
160         """
161         resources_hrn_new = list()
162         resource_parts = resource_hrn.split('.')
163         resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
164         resources_hrn_new.append(resource_hrn)
165
166         slice_resources = self.get_slice_resources(slicename)['resource']
167
168         with self.lock_slice:
169             if slice_resources:
170                 slice_resources_hrn = self.get_resources_hrn(slice_resources)
171                 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
172                     s_parts = s_hrn_value.split('.')
173                     s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
174                     resources_hrn_new.append(s_hrn)
175
176             resources_urn = self._get_resources_urn(resources_hrn_new)
177             rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
178             f = open("/tmp/rspec_input.rspec", "w")
179             f.truncate(0)
180             f.write(rspec)
181             f.close()
182             
183             if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
184                 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
185
186             try:
187                 self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
188             except:
189                 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)            
190             try:
191                 self._sfi_exec_method('provision', slicename) 
192             except:
193                 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
194             return True
195
196     def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
197         """
198         Get the list of resources' urn, build the rspec string and call the allocate 
199         and provision method.
200         """
201         resource_urn = self._get_resources_urn([resource_hrn]).pop()
202         try:
203             self._sfi_exec_method('delete', slicename, urn=resource_urn)
204         except:
205             raise RuntimeError("Fail to delete resource for slice %s" % slicename)
206         return True
207
208
209     def _get_resources_urn(self, resources_hrn):
210         """
211         Builds list of resources' urn based on hrn.
212         """
213         resources_urn = list()
214
215         for resource in resources_hrn:
216             resources_urn.append(hrn_to_urn(resource, 'node'))
217             
218         return resources_urn
219
220     def blacklist_resource(self, resource_hrn):
221         with self.lock_blist:
222             self._blacklist.add(resource_hrn)
223         with self.lock_resv:
224             if resource_hrn in self._reserved:
225                 self._reserved.remove(resource_hrn)
226
227     def blacklisted(self, resource_hrn):
228         with self.lock_blist:
229             if resource_hrn in self._blacklist:
230                 return True
231         return False
232
233     def reserve_resource(self, resource_hrn):
234         self._reserved.add(resource_hrn)
235
236     def reserved(self, resource_hrn):
237         with self.lock_resv:
238             if resource_hrn in self._reserved:
239                 return True
240             else:
241                 self.reserve_resource(resource_hrn)
242                 return False
243
244 class SFAAPIFactory(object):
245     """
246     API Factory to manage a map of SFAAPI instances as key-value pairs, it
247     instanciate a single instance per key. The key represents the same SFA, 
248     credentials.
249     """
250
251     _lock = threading.Lock()
252     _apis = dict()
253
254    
255     @classmethod
256     def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
257             timeout = None):
258
259         if sfi_user and sfi_sm:
260             key = cls.make_key(sfi_user, sfi_sm)
261             with cls._lock:
262                 api = cls._apis.get(key)
263
264                 if not api:
265                     api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
266                         ec, timeout)
267                     cls._apis[key] = api
268
269                 return api
270
271         return None
272
273     @classmethod
274     def make_key(cls, *args):
275         skey = "".join(map(str, args))
276         return hashlib.md5(skey).hexdigest()
277