2 # NEPI, a framework to manage network experiments
3 # Copyright (C) 2013 INRIA
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 # Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
25 from sfa_sfav1 import SFAResourcesParser
33 def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
34 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
36 self._resources = dict()
37 self._reservable_resources = list()
39 self._slice_tags = dict()
40 self._slice_resources = set()
41 self._slice_leases = set()
42 self._aggregate = aggregate
43 self._slice_hrn = slice_id
44 # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
45 # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
46 self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
47 self._lock = threading.Lock()
49 # Paremeters to contact the XMLRPC SFA service
50 self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
51 '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
54 #self._logger = logging.getLogger('nepi.utils.sfiapi')
55 self._fetch_resources_info()
56 self._fetch_slice_info()
58 def _sfi_command_options(self):
59 command_options = " ".join("%s %s" % (k,v) for (k,v) in \
60 self._sfi_parameters.iteritems() if v is not None)
61 return command_options
63 def _sfi_command_exec(self, command):
64 args = command.split(" ")
65 s = subprocess.Popen(args, stdout = subprocess.PIPE,
66 stdin = subprocess.PIPE)
67 xml, err = s.communicate()
69 raise RuntimeError("Command excecution problem, error: %s", err)
72 def _fetch_resources_info(self, resources = True):
73 command_options = self._sfi_command_options()
74 command = "sfi.py " + command_options + " resources -l all"
76 xml = self._sfi_command_exec(command)
78 #self._logger.error("Error in SFA responds: %s", xml)
81 self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
83 self._leases = self._parser.resources_from_xml(xml)
84 #self._update_reservable()
87 def _fetch_slice_info(self):
88 command_options = self._sfi_command_options()
89 command = "sfi.py " + command_options + " resources -l all"
90 command = command + " " + self._slice_hrn
92 xml = self._sfi_command_exec(command)
94 #self._logger.error("Error in SFA responds: %s", xml)
96 self._slice_resources, self._slice_leases, self._slice_tags = \
97 self._parser.resources_from_xml(xml, sliver = True, resources = True)
100 def _update_reservable(self):
101 for rid, r in self._resources.iteritems():
102 if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
103 or (r['resource_type'] == 'channel'):
104 self._reservable_resources.append(rid)
107 def discover_resources(self, resourceId=None, fields=[], **kwargs):
109 resources = self._resources
111 if resourceId is not None:
112 resource_ids = resourceId
113 if not isinstance(resource_ids, list):
114 resource_ids = [resource_ids]
115 resources = self._filter_by_resourceId(resources, resource_ids)
117 for filter, value in kwargs.items():
118 resources = self._filter_by_filter(resources, filter, value)
122 for k, info in resources.iteritems():
123 info = self._extract_fields(info, fields)
127 def _filter_by_resourceId(self, resources, resource_ids):
128 return dict((k, resources[k]) for k in resource_ids if k in resources)
130 def _filter_by_filter(self, resources, filter, value):
132 for k in resources.keys():
133 if filter in resources[k]:
134 if resources[k][filter] == value:
138 def _extract_fields(self, info, fields):
139 return dict((k, info[k]) for k in fields if k in info)
141 def discover_fields(self):
142 resources = self._resources
144 for k, data in resources.iteritems():
146 if field not in fields:
150 def discover_leases(self, resourceId=None):
151 leases = self._leases
153 if resourceId is not None:
154 resource_ids = resourceId
155 if not isinstance(resourceId, list):
156 resource_ids = [resource_ids]
157 leases = self._filterbyresourceId(leases, resource_ids)
160 def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
162 if rtype not in ['node', 'channel']:
163 raise RuntimeError("Unknown type")
165 finish_time = start_time + duration * slot
167 leases_resources = dict()
168 reservable_resources = dict()
169 for lid, lease in leases.iteritems():
170 if lease[0]['type'] == rtype:
171 leases_resources.update({lid: lease})
172 #print leases_resources
173 for rid, resource in resources.iteritems():
174 if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
175 reservable_resources.update({rid: resource})
176 elif rtype == 'channel':
177 reservable_resources.update({rid: resource})
178 #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
179 # (in case adding exclusive tag to channels)
181 free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
183 if len(free_resources) >= quantity:
184 free_resources = free_resources[:quantity]
185 for rid, resource in resources.iteritems():
186 if rid in free_resources:
187 result[rid] = resource
191 new_quan = quantity - len(free_resources)
194 for lid, lease in leases_resources.iteritems():
196 st = int(l['start_time'])
197 ft = st + int(l['duration']) * slot
198 if (st <= finish_time <= ft) or (st <= start_time <= ft):
199 if lid in maybe_free:
200 maybe_free.remove(lid)
203 if lid not in maybe_free:
204 maybe_free.append(lid)
205 if len(maybe_free) >= new_quan:
206 free_resources = [free_resources, maybe_free]
207 free_resources = sum(free_resources, [])
208 for rid, resource in resources.iteritems():
209 if rid in free_resources:
210 result[rid] = resource
212 #return free_resources
213 warnings.warn("There aren't enough nodes")
216 def provision_resource(self, new_resource, start_time = None, duration = None):
219 xml = self._fetch_slice_info()
220 new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
221 new_resource, start_time, duration, self._aggregate)
222 fh, fname = tempfile.mkstemp()
224 os.write(fh, new_xml)
227 command_options = self._sfi_command_options()
228 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
229 out = self._sfi_command_exec(command)
232 xml = self._fetch_slice_info()
233 return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
234 duration, self._aggregate)
236 def release_resource(self, resource, start_time = None, duration = None):
239 xml = self._fetch_slice_info()
240 new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
241 start_time, duration, self._aggregate)
242 fh, fname = tempfile.mkstemp()
244 os.write(fh, new_xml)
247 command_options = self._sfi_command_options()
248 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
249 out = self._sfi_command_exec(command)
252 xml = self._fetch_slice_info()
253 return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
254 duration, self._aggregate)
257 class SFAApiFactory(object):
258 lock = threading.Lock()
262 def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
263 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
265 key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry,
266 sfi_sm, timeout, private_key, aggregate = 'ple')
267 api = cls._apis.get(key)
269 api._fetch_resources_info(resources = False)
270 api._fetch_slice_info()
274 api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
275 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
281 def make_key(cls, *args):
282 skey = "".join(map(str, args))
283 return hashlib.md5(skey).hexdigest()