2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from nepi.util.logger import Logger
28 from sfa.client.sfi import Sfi
29 from sfa.util.xrn import hrn_to_urn
31 log = Logger("SFA API")
32 log.debug("Packages sfa-common or sfa-client not installed.\
33 Could not import sfa.client.sfi or sfa.util.xrn")
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
39 API for quering the SFA service. It uses Sfi class from the tool sfi client.
41 def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42 batch, rtype, timeout):
44 self._blacklist = set()
45 self._reserved = set()
46 self._resources_cache = None
47 self._already_cached = False
51 self._testbed_res = rtype
53 self._total = self._get_total_res()
54 self._slice_resources_batch = list()
56 self._log = Logger("SFA API")
58 self.rspec_proc = SfaRSpecProcessing()
59 self.lock_slice = threading.Lock()
60 self.lock_blist = threading.Lock()
61 self.lock_resv = threading.Lock()
63 self.api.options.timeout = timeout
64 self.api.options.raw = None
65 self.api.options.user = sfi_user
66 self.api.options.auth = sfi_auth
67 self.api.options.registry = sfi_registry
68 self.api.options.sm = sfi_sm
69 self.api.options.user_private_key = private_key
71 # Load blacklist from file
72 if ec.get_global('PlanetlabNode', 'persist_blacklist'):
75 def _set_blacklist(self):
77 Initialize the blacklist with previous nodes blacklisted, in
80 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
81 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
82 with open(plblacklist_file, 'r') as f:
83 hosts_tobl = f.read().splitlines()
85 for host in hosts_tobl:
86 self._blacklist.add(host)
88 def _get_total_res(self):
90 Get the total amount of resources instanciated using this API,
91 to be able to add them using the same Allocate and Provision
92 call at once. Specially for Wilabt testbed that doesn't allow
93 to add slivers after the slice already has some.
96 res_gids = self._ec.resources
98 rm = self._ec.get_resource(gid)
99 if self._testbed_res.lower() in rm._rtype.lower():
103 def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None):
105 Execute sfi method, which correspond to SFA call. It can be the following
106 calls: Describe, Delete, Allocate, Provision, ListResources.
108 if command in ['describe', 'delete', 'allocate', 'provision']:
110 raise TypeError("The slice hrn is expected for this method %s" % command)
111 if command == 'allocate' and not rspec:
112 raise TypeError("RSpec is expected for this method %s" % command)
114 if command == 'allocate':
115 args_list = [slicename, rspec]
117 args_list = [slicename]
118 if command != 'delete':
119 args_list = args_list + ['-o', '/tmp/rspec_output']
121 elif command == 'resources':
122 args_list = ['-o', '/tmp/rspec_output']
124 else: raise TypeError("Sfi method not supported")
126 self.api.command = command
127 self.api.command_parser = self.api.create_parser_command(self.api.command)
128 (command_options, command_args) = self.api.command_parser.parse_args(args_list)
129 self.api.command_options = command_options
130 self.api.read_config()
134 os.remove("/tmp/rspec_output.rspec")
136 self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
139 self.api.dispatch(command, command_options, command_args)
140 with open("/tmp/rspec_output.rspec", "r") as result_file:
141 result = result_file.read()
144 self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
147 def get_resources_info(self):
149 Get all resources and its attributes from aggregate.
152 rspec_slice = self._sfi_exec_method('resources')
154 raise RuntimeError("Fail to list resources")
156 self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
157 self._already_cached = True
158 return self._resources_cache
160 def get_resources_hrn(self, resources=None):
162 Get list of resources hrn, without the resource info.
165 if not self._already_cached:
166 resources = self.get_resources_info()['resource']
168 resources = self._resources_cache['resource']
170 component_tohrn = dict()
171 for resource in resources:
172 hrn = resource['hrn'].replace('\\', '')
173 component_tohrn[resource['component_name']] = hrn
175 return component_tohrn
177 def get_slice_resources(self, slicename):
179 Get resources and info from slice.
182 with self.lock_slice:
183 rspec_slice = self._sfi_exec_method('describe', slicename)
185 self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
187 if rspec_slice is not None:
188 result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
191 return {'resource':[],'lease':[]}
194 def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
196 Get the list of resources' urn, build the rspec string and call the allocate
197 and provision method.
199 resources_hrn_new = list()
200 resource_parts = resource_hrn.split('.')
201 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
202 resources_hrn_new.append(resource_hrn)
204 with self.lock_slice:
205 rspec_slice = self._sfi_exec_method('describe', slicename)
206 if rspec_slice is not None:
207 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
208 else: slice_resources = []
210 slice_resources_hrn = self.get_resources_hrn(slice_resources)
211 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
212 s_parts = s_hrn_value.split('.')
213 s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
214 resources_hrn_new.append(s_hrn)
217 resources_urn = self._get_resources_urn(resources_hrn_new)
218 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
219 f = open("/tmp/rspec_input.rspec", "w")
224 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
225 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
229 self._log.debug("Allocating resources in slice %s" % slicename)
230 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
232 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
237 self._log.debug("Provisioning resources in slice %s" % slicename)
238 self._sfi_exec_method('provision', slicename)
240 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
243 def add_resource_to_slice_batch(self, slicename, resource_hrn, leases=None):
245 Method to add all resources together to the slice. Previous deletion of slivers.
246 Specially used for wilabt that doesn't allow to add more resources to the slice
247 after some resources are added. Every sliver have to be deleted and the batch
248 has to be added at once.
251 self._slice_resources_batch.append(resource_hrn)
252 resources_hrn_new = list()
253 if self._count == len(self._total):
254 for resource_hrn in self._slice_resources_batch:
255 resource_parts = resource_hrn.split('.')
256 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
257 resources_hrn_new.append(resource_hrn)
258 with self.lock_slice:
259 self._sfi_exec_method('delete', slicename)
260 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
261 resources_urn = self._get_urn(resources_hrn_new)
262 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
264 f = open("/tmp/rspec_input.rspec", "w")
269 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
270 raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
274 self._log.debug("Allocating resources in slice %s" % slicename)
275 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
277 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
282 self._log.debug("Provisioning resources in slice %s" % slicename)
283 self._sfi_exec_method('provision', slicename)
285 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
288 raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
291 self._log.debug(" Waiting for more nodes to add the batch to the slice ")
293 def _get_urn(self, resources_hrn):
297 resources_urn = list()
298 for hrn in resources_hrn:
299 hrn = hrn.replace("\\", "").split('.')
302 urn = ['urn:publicid:IDN+', auth, '+node+', node]
304 resources_urn.append(urn)
307 def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
309 Remove slivers from slice. Currently sfi doesn't support removing particular
312 resource_urn = self._get_resources_urn([resource_hrn]).pop()
313 with self.lock_slice:
315 self._sfi_exec_method('delete', slicename, urn=resource_urn)
317 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
320 def remove_all_from_slice(self, slicename):
322 De-allocate and de-provision all slivers of the named slice.
323 Currently sfi doesn't support removing particular
324 slivers, so this method works only for removing every sliver. Setting the
325 resource_hrn parameter is not necessary.
327 with self.lock_slice:
329 self._sfi_exec_method('delete', slicename)
331 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
334 def _get_resources_urn(self, resources_hrn):
336 Builds list of resources' urn based on hrn.
338 resources_urn = list()
340 for resource in resources_hrn:
341 resources_urn.append(hrn_to_urn(resource, 'node'))
345 def blacklist_resource(self, resource_hrn):
347 Adding resource_hrn to blacklist, and taking
348 the resource from the reserved list.
350 with self.lock_blist:
351 self._blacklist.add(resource_hrn)
353 if resource_hrn in self._reserved:
354 self._reserved.remove(resource_hrn)
356 def blacklisted(self, resource_hrn):
358 Check if the resource is in the blacklist.
360 with self.lock_blist:
361 if resource_hrn in self._blacklist:
365 def reserve_resource(self, resource_hrn):
367 Add resource to the reserved list.
369 self._reserved.add(resource_hrn)
371 def reserved(self, resource_hrn):
373 Check that the resource in not reserved.
376 if resource_hrn in self._reserved:
379 self.reserve_resource(resource_hrn)
382 class SFAAPIFactory(object):
384 API Factory to manage a map of SFAAPI instances as key-value pairs, it
385 instanciate a single instance per key. The key represents the same SFA,
389 _lock = threading.Lock()
394 def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
395 batch = False, rtype = None, timeout = None):
397 if sfi_user and sfi_sm:
398 key = cls.make_key(sfi_user, sfi_sm)
400 api = cls._apis.get(key)
403 api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
404 ec, batch, rtype, timeout)
412 def make_key(cls, *args):
413 skey = "".join(map(str, args))
414 return hashlib.md5(skey).hexdigest()