2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from nepi.util.logger import Logger
28 from sfa.client.sfi import Sfi
29 from sfa.util.xrn import hrn_to_urn
31 log = Logger("SFA API")
32 log.debug("Packages sfa-common or sfa-client not installed.\
33 Could not import sfa.client.sfi or sfa.util.xrn")
35 from nepi.util.sfarspec_proc import SfaRSpecProcessing
39 API for quering the SFA service. It uses Sfi class from the tool sfi client.
41 def __init__(self, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
42 batch, rtype, timeout):
44 self._blacklist = set()
45 self._reserved = set()
46 self._resources_cache = None
47 self._already_cached = False
52 self._testbed_res = rtype
54 self._total = self._get_total_res()
55 self._slice_resources_batch = list()
57 self._log = Logger("SFA API")
59 self.rspec_proc = SfaRSpecProcessing()
60 self.lock_slice = threading.Lock()
61 self.lock_blist = threading.Lock()
62 self.lock_resv = threading.Lock()
64 self.api.options.timeout = timeout
65 self.api.options.raw = None
66 self.api.options.user = sfi_user
67 self.api.options.auth = sfi_auth
68 self.api.options.registry = sfi_registry
69 self.api.options.sm = sfi_sm
70 self.api.options.user_private_key = private_key
72 # Load blacklist from file
73 if ec.get_global('PlanetlabNode', 'persist_blacklist'):
76 def _set_blacklist(self):
78 Initialize the blacklist with previous nodes blacklisted, in
81 nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
82 plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
83 with open(plblacklist_file, 'r') as f:
84 hosts_tobl = f.read().splitlines()
86 for host in hosts_tobl:
87 self._blacklist.add(host)
89 def _get_total_res(self):
91 Get the total amount of resources instanciated using this API,
92 to be able to add them using the same Allocate and Provision
93 call at once. Specially for Wilabt testbed that doesn't allow
94 to add slivers after the slice already has some.
97 res_gids = self._ec.resources
99 rm = self._ec.get_resource(gid)
100 if self._testbed_res.lower() in rm._rtype.lower():
104 def _sfi_exec_method(self, command, slicename=None, rspec=None, urn=None, action=None):
106 Execute sfi method, which correspond to SFA call. It can be the following
107 calls: Describe, Delete, Allocate, Provision, ListResources.
109 if command in ['describe', 'delete', 'allocate', 'provision', 'action']:
111 raise TypeError("The slice hrn is expected for this method %s" % command)
112 if command == 'allocate' and not rspec:
113 raise TypeError("RSpec is expected for this method %s" % command)
115 if command == 'allocate':
116 args_list = [slicename, rspec]
118 args_list = [slicename]
119 if command != 'delete':
120 args_list = args_list + ['-o', '/tmp/rspec_output']
121 if command == 'action':
122 args_list = [slicename, action]
124 elif command == 'resources':
125 args_list = ['-o', '/tmp/rspec_output']
127 else: raise TypeError("Sfi method not supported")
129 self.api.command = command
130 self.api.command_parser = self.api.create_parser_command(self.api.command)
131 (command_options, command_args) = self.api.command_parser.parse_args(args_list)
132 self.api.command_options = command_options
133 self.api.read_config()
137 os.remove("/tmp/rspec_output.rspec")
139 self._log.debug("Couldn't remove temporary output file for RSpec or it doesn't exist")
142 self.api.dispatch(command, command_options, command_args)
143 with open("/tmp/rspec_output.rspec", "r") as result_file:
144 result = result_file.read()
147 self._log.debug(" Couldn't retrive rspec output information from method %s " % command)
150 def get_resources_info(self):
152 Get all resources and its attributes from aggregate.
155 rspec_slice = self._sfi_exec_method('resources')
157 raise RuntimeError("Fail to list resources")
159 self._resources_cache = self.rspec_proc.parse_sfa_rspec(rspec_slice)
160 self._already_cached = True
161 return self._resources_cache
163 def get_resources_hrn(self, resources=None):
165 Get list of resources hrn, without the resource info.
168 if not self._already_cached:
169 resources = self.get_resources_info()['resource']
171 resources = self._resources_cache['resource']
173 component_tohrn = dict()
174 for resource in resources:
175 hrn = resource['hrn'].replace('\\', '')
176 component_tohrn[resource['component_name']] = hrn
178 return component_tohrn
180 def get_slice_resources(self, slicename):
182 Get resources and info from slice.
185 with self.lock_slice:
186 rspec_slice = self._sfi_exec_method('describe', slicename)
188 self._log.debug("Fail to describe resources for slice %s, slice may be empty" % slicename)
190 if rspec_slice is not None:
191 result = self.rspec_proc.parse_sfa_rspec(rspec_slice)
194 return {'resource':[],'lease':[]}
197 def add_resource_to_slice(self, slicename, resource_hrn, leases=None):
199 Get the list of resources' urn, build the rspec string and call the allocate
200 and provision method.
202 resources_hrn_new = list()
203 resource_parts = resource_hrn.split('.')
204 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
205 resources_hrn_new.append(resource_hrn)
207 with self.lock_slice:
208 rspec_slice = self._sfi_exec_method('describe', slicename)
209 if rspec_slice is not None:
210 slice_resources = self.rspec_proc.parse_sfa_rspec(rspec_slice)['resource']
211 else: slice_resources = []
213 slice_resources_hrn = self.get_resources_hrn(slice_resources)
214 for s_hrn_key, s_hrn_value in slice_resources_hrn.iteritems():
215 s_parts = s_hrn_value.split('.')
216 s_hrn = '.'.join(s_parts[:2]) + '.' + '\\.'.join(s_parts[2:])
217 resources_hrn_new.append(s_hrn)
220 resources_urn = self._get_resources_urn(resources_hrn_new)
221 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, leases)
222 f = open("/tmp/rspec_input.rspec", "w")
227 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
228 raise RuntimeError("Fail to create rspec file to allocate resource in slice %s" % slicename)
232 self._log.debug("Allocating resources in slice %s" % slicename)
233 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
235 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
240 self._log.debug("Provisioning resources in slice %s" % slicename)
241 self._sfi_exec_method('provision', slicename)
243 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
246 def add_resource_to_slice_batch(self, slicename, resource_hrn, properties=None, leases=None):
248 Method to add all resources together to the slice. Previous deletion of slivers.
249 Specially used for wilabt that doesn't allow to add more resources to the slice
250 after some resources are added. Every sliver have to be deleted and the batch
251 has to be added at once.
254 self._slice_resources_batch.append(resource_hrn)
255 resources_hrn_new = list()
256 if self._count == len(self._total):
257 for resource_hrn in self._slice_resources_batch:
258 resource_parts = resource_hrn.split('.')
259 resource_hrn = '.'.join(resource_parts[:2]) + '.' + '\\.'.join(resource_parts[2:])
260 resources_hrn_new.append(resource_hrn)
261 with self.lock_slice:
262 self._sfi_exec_method('delete', slicename)
263 # Re implementing urn from hrn because the library sfa-common doesn't work for wilabt
264 resources_urn = self._get_urn(resources_hrn_new)
265 rspec = self.rspec_proc.build_sfa_rspec(slicename, resources_urn, properties, leases)
266 f = open("/tmp/rspec_input.rspec", "w")
271 if not os.path.getsize("/tmp/rspec_input.rspec") > 0:
272 raise RuntimeError("Fail to create rspec file to allocate resources in slice %s" % slicename)
276 self._log.debug("Allocating resources in slice %s" % slicename)
277 out = self._sfi_exec_method('allocate', slicename, "/tmp/rspec_input.rspec")
279 raise RuntimeError("Fail to allocate resource for slice %s" % slicename)
284 self._log.debug("Provisioning resources in slice %s" % slicename)
285 self._sfi_exec_method('provision', slicename)
286 self._sfi_exec_method('action', slicename=slicename, action='geni_start')
288 raise RuntimeError("Fail to provision resource for slice %s" % slicename)
291 raise RuntimeError("Fail to allocate resources for slice %s" % slicename)
294 self._log.debug(" Waiting for more nodes to add the batch to the slice ")
296 def _get_urn(self, resources_hrn):
300 resources_urn = list()
301 for hrn in resources_hrn:
302 hrn = hrn.replace("\\", "").split('.')
305 urn = ['urn:publicid:IDN+', auth, '+node+', node]
307 resources_urn.append(urn)
310 def remove_resource_from_slice(self, slicename, resource_hrn, leases=None):
312 Remove slivers from slice. Currently sfi doesn't support removing particular
315 resource_urn = self._get_resources_urn([resource_hrn]).pop()
316 with self.lock_slice:
318 self._sfi_exec_method('delete', slicename, urn=resource_urn)
320 raise RuntimeError("Fail to delete resource for slice %s" % slicename)
323 def remove_all_from_slice(self, slicename):
325 De-allocate and de-provision all slivers of the named slice.
326 Currently sfi doesn't support removing particular
327 slivers, so this method works only for removing every sliver. Setting the
328 resource_hrn parameter is not necessary.
330 with self.lock_slice:
332 self._sfi_exec_method('delete', slicename)
334 raise RuntimeError("Fail to delete slivers for slice %s" % slicename)
337 def _get_resources_urn(self, resources_hrn):
339 Builds list of resources' urn based on hrn.
341 resources_urn = list()
343 for resource in resources_hrn:
344 resources_urn.append(hrn_to_urn(resource, 'node'))
348 def blacklist_resource(self, resource_hrn):
350 Adding resource_hrn to blacklist, and taking
351 the resource from the reserved list.
353 with self.lock_blist:
354 self._blacklist.add(resource_hrn)
356 if resource_hrn in self._reserved:
357 self._reserved.remove(resource_hrn)
359 def blacklisted(self, resource_hrn):
361 Check if the resource is in the blacklist.
363 with self.lock_blist:
364 if resource_hrn in self._blacklist:
368 def reserve_resource(self, resource_hrn):
370 Add resource to the reserved list.
372 self._reserved.add(resource_hrn)
374 def reserved(self, resource_hrn):
376 Check that the resource in not reserved.
379 if resource_hrn in self._reserved:
382 self.reserve_resource(resource_hrn)
387 Remove hosts from the reserved and blacklist lists, and in case
388 the persist attribute is set, it saves the blacklisted hosts
389 in the blacklist file.
393 blacklist = self._blacklist
394 self._blacklist = set()
395 self._reserved = set()
396 # if self._ecobj.get_global('PlanetlabSfaNode', 'persist_blacklist'):
398 # to_blacklist = list()
399 # hostnames = self.get_nodes(list(blacklist), ['hostname'])
400 # for hostname in hostnames:
401 # to_blacklist.append(hostname['hostname'])
403 # nepi_home = os.path.join(os.path.expanduser("~"), ".nepi")
404 # plblacklist_file = os.path.join(nepi_home, "plblacklist.txt")
406 # with open(plblacklist_file, 'w') as f:
407 # for host in to_blacklist:
408 # f.write("%s\n" % host)
412 class SFAAPIFactory(object):
414 API Factory to manage a map of SFAAPI instances as key-value pairs, it
415 instanciate a single instance per key. The key represents the same SFA,
419 _lock = threading.Lock()
424 def get_api(cls, sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key, ec,
425 batch = False, rtype = None, timeout = None):
427 if sfi_user and sfi_sm:
428 key = cls.make_key(sfi_user, sfi_sm)
430 api = cls._apis.get(key)
433 api = SFAAPI(sfi_user, sfi_auth, sfi_registry, sfi_sm, private_key,
434 ec, batch, rtype, timeout)
444 def make_key(cls, *args):
445 skey = "".join(map(str, args))
446 return hashlib.md5(skey).hexdigest()