2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from nepi.util.logger import Logger
28 from sfa.client.sfi import Sfi
29 from sfa.util.xrn import hrn_to_urn
31 log = Logger("SFA API")
32 log.debug("Packages sfa-common or sfa-client not installed.\
33 Could not import sfa.client.sfi or sfa.util.xrn")
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
39 API for quering the SFA service.
41 def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42 batch, rtype, timeout):
44 self._blacklist = set()
45 self._reserved = set()
46 self._resources_cache = None
47 self._already_cached = False
51 self._testbed_res = rtype
53 self._total = self._get_total_res()
54 self._slice_resources_batch = list()
56 self._log = Logger("SFA API")
58 self.rspec_proc = SfaRSpecProcessing()
59 self.lock_slice = threading.Lock()
60 self.lock_blist = threading.Lock()
61 self.lock_resv = threading.Lock()
63 self.api.options.timeout = timeout
64 self.api.options.raw = None
65 self.api.options.user = sfi_user
66 self.api.options.auth = sfi_auth
67 self.api.options.registry = sfi_registry
68 self.api.options.sm = sfi_sm
69 self.api.options.user_private_key = private_key
71 # Load blacklist from file
72 if ec.get_global('PlanetlabNode', 'persist_blacklist'):
75 def _set_blacklist(self):
76 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
77 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
78 with open(plblacklist_file, 'r') as f:
79 hosts_tobl = f.read().splitlines()
81 for host in hosts_tobl:
82 self._blacklist.add(host)
84 def _get_total_res(self):
86 res_gids = self._ec.resources
88 rm = self._ec.get_resource(gid)
89 if self._testbed_res.lower() in rm._rtype.lower():
93 def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None):
97 if command in ['describe', 'delete', 'allocate', 'provision']:
99 raise TypeError("The slice hrn is expected for this method %s" % command)
100 if command == 'allocate' and not rspec:
101 raise TypeError("RSpec is expected for this method %s" % command)
103 if command == 'allocate':
104 args_list = [slicename, rspec]
106 args_list = [slicename]
107 if command != 'delete':
108 args_list = args_list + ['-o', '/tmp/rspec_output']
110 elif command == 'resources':
111 args_list = ['-o', '/tmp/rspec_output']
113 else: raise TypeError("Sfi method not supported")
115 self.api.command = command
116 self.api.command_parser = self.api.create_parser_command(self.api.command)
117 (command_options, command_args) = self.api.command_parser.parse_args(args_list)
118 self.api.command_options = command_options
119 self.api.read_config()
123 os.remove("/tmp/rspec_output.rspec")
125 self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
128 self.api.dispatch(command, command_options, command_args)
129 with open("/tmp/rspec_output.rspec", "r") as result_file:
130 result = result_file.read()
133 self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
136 def get_resources_info(self):
138 Get all resources and its attributes from aggregate.
141 rspec_slice = self._sfi_exec_method('resources')
143 raise RuntimeError("Fail to list resources")
145 self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
146 self._already_cached = True
147 return self._resources_cache
149 def get_resources_hrn(self, resources=None):
151 Get list of resources hrn, without the resource info.
154 if not self._already_cached:
155 resources = self.get_resources_info()['resource']
157 resources = self._resources_cache['resource']
159 component_tohrn = dict()
160 for resource in resources:
161 hrn = resource['hrn'].replace('\\', '')
162 component_tohrn[resource['component_name']] = hrn
164 return component_tohrn
166 def get_slice_resources(self, slicename):
168 Get resources and info from slice.
171 with self.lock_slice:
172 rspec_slice = self._sfi_exec_method('describe', slicename)
174 self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
176 if rspec_slice is not None:
177 result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
180 return {'resource':[],'lease':[]}
183 def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
185 Get the list of resources' urn, build the rspec string and call the allocate
186 and provision method.
188 resources_hrn_new = list()
189 resource_parts = resource_hrn.split('.')
190 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
191 resources_hrn_new.append(resource_hrn)
193 with self.lock_slice:
194 rspec_slice = self._sfi_exec_method('describe', slicename)
195 if rspec_slice is not None:
196 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
197 else: slice_resources = []
199 slice_resources_hrn = self.get_resources_hrn(slice_resources)
200 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
201 s_parts = s_hrn_value.split('.')
202 s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
203 resources_hrn_new.append(s_hrn)
206 resources_urn = self._get_resources_urn(resources_hrn_new)
207 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
208 f = open("/tmp/rspec_input.rspec", "w")
213 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
214 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
218 self._log.debug("Allocating resources in slice %s" % slicename)
219 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
221 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
226 self._log.debug("Provisioning resources in slice %s" % slicename)
227 self._sfi_exec_method('provision', slicename)
229 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
232 def add_resource_to_slice_batch(self, slicename, resource_hrn, leases=None):
234 Method to add all resources together to the slice. Previous deletion of slivers.
236 # Specially used for wilabt that doesn't allow to add more resources to the slice
237 # after some resources are added. Every sliver have to be deleted and the batch
238 # has to be added at once.
240 self._slice_resources_batch.append(resource_hrn)
241 resources_hrn_new = list()
242 if self._count == len(self._total):
243 for resource_hrn in self._slice_resources_batch:
244 resource_parts = resource_hrn.split('.')
245 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
246 resources_hrn_new.append(resource_hrn)
247 with self.lock_slice:
248 self._sfi_exec_method('delete', slicename)
249 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
250 resources_urn = self._get_urn(resources_hrn_new)
251 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
253 f = open("/tmp/rspec_input.rspec", "w")
258 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
259 raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
263 self._log.debug("Allocating resources in slice %s" % slicename)
264 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
266 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
271 self._log.debug("Provisioning resources in slice %s" % slicename)
272 self._sfi_exec_method('provision', slicename)
274 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
277 raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
280 self._log.debug(" Waiting for more nodes to add the batch to the slice ")
282 def _get_urn(self, resources_hrn):
286 resources_urn = list()
287 for hrn in resources_hrn:
288 hrn = hrn.replace("\\", "").split('.')
291 urn = ['urn:publicid:IDN+', auth, '+node+', node]
293 resources_urn.append(urn)
296 def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
298 Get the list of resources' urn, build the rspec string and call the allocate
299 and provision method.
301 resource_urn = self._get_resources_urn([resource_hrn]).pop()
302 with self.lock_slice:
304 self._sfi_exec_method('delete', slicename, urn=resource_urn)
306 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
309 def remove_all_from_slice(self, slicename):
311 De-allocate and de-provision all slivers of the named slice.
313 with self.lock_slice:
315 self._sfi_exec_method('delete', slicename)
317 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
320 def _get_resources_urn(self, resources_hrn):
322 Builds list of resources' urn based on hrn.
324 resources_urn = list()
326 for resource in resources_hrn:
327 resources_urn.append(hrn_to_urn(resource, 'node'))
331 def blacklist_resource(self, resource_hrn):
332 with self.lock_blist:
333 self._blacklist.add(resource_hrn)
335 if resource_hrn in self._reserved:
336 self._reserved.remove(resource_hrn)
338 def blacklisted(self, resource_hrn):
339 with self.lock_blist:
340 if resource_hrn in self._blacklist:
344 def reserve_resource(self, resource_hrn):
345 self._reserved.add(resource_hrn)
347 def reserved(self, resource_hrn):
349 if resource_hrn in self._reserved:
352 self.reserve_resource(resource_hrn)
355 class SFAAPIFactory(object):
357 API Factory to manage a map of SFAAPI instances as key-value pairs, it
358 instanciate a single instance per key. The key represents the same SFA,
362 _lock = threading.Lock()
367 def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
368 batch = False, rtype = None, timeout = None):
370 if sfi_user and sfi_sm:
371 key = cls.make_key(sfi_user, sfi_sm)
373 api = cls._apis.get(key)
376 api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
377 ec, batch, rtype, timeout)
385 def make_key(cls, *args):
386 skey = "".join(map(str, args))
387 return hashlib.md5(skey).hexdigest()