2 NEPI, a framework to manage network experiments
3 Copyright (C) 2013 INRIA
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from parser import sfa_sfav1
31 def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
32 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
34 self._resources = dict()
35 self._reservable_resources = list()
37 self._slice_tags = dict()
38 self._slice_resources = set()
39 self._slice_leases = set()
40 self._aggregate = aggregate
41 self._slice_hrn = slice_id
42 # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
43 # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
44 self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
45 self._lock = threading.Lock()
47 # Paremeters to contact the XMLRPC SFA service
48 self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
49 '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
52 #self._logger = logging.getLogger('nepi.utils.sfiapi')
53 self._fetch_resources_info()
54 self._fetch_slice_info()
56 def _sfi_command_options(self):
57 command_options = " ".join("%s %s" % (k,v) for (k,v) in \
58 self._sfi_parameters.iteritems() if v is not None)
59 return command_options
61 def _sfi_command_exec(self, command):
62 args = command.split(" ")
63 s = subprocess.Popen(args, stdout = subprocess.PIPE,
64 stdin = subprocess.PIPE)
65 xml, err = s.communicate()
67 raise RuntimeError("Command excecution problem, error: %s", err)
70 def _fetch_resources_info(self, resources = True):
71 command_options = self._sfi_command_options()
72 command = "sfi.py " + command_options + " resources -l all"
74 xml = self._sfi_command_exec(command)
76 #self._logger.error("Error in SFA responds: %s", xml)
79 self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
81 self._leases = self._parser.resources_from_xml(xml)
82 #self._update_reservable()
85 def _fetch_slice_info(self):
86 command_options = self._sfi_command_options()
87 command = "sfi.py " + command_options + " resources -l all"
88 command = command + " " + self._slice_hrn
90 xml = self._sfi_command_exec(command)
92 #self._logger.error("Error in SFA responds: %s", xml)
94 self._slice_resources, self._slice_leases, self._slice_tags = \
95 self._parser.resources_from_xml(xml, sliver = True, resources = True)
98 def _update_reservable(self):
99 for rid, r in self._resources.iteritems():
100 if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
101 or (r['resource_type'] == 'channel'):
102 self._reservable_resources.append(rid)
105 def discover_resources(self, resourceId=None, fields=[], **kwargs):
107 resources = self._resources
109 if resourceId is not None:
110 resource_ids = resourceId
111 if not isinstance(resource_ids, list):
112 resource_ids = [resource_ids]
113 resources = self._filter_by_resourceId(resources, resource_ids)
115 for filter, value in kwargs.items():
116 resources = self._filter_by_filter(resources, filter, value)
120 for k, info in resources.iteritems():
121 info = self._extract_fields(info, fields)
125 def _filter_by_resourceId(self, resources, resource_ids):
126 return dict((k, resources[k]) for k in resource_ids if k in resources)
128 def _filter_by_filter(self, resources, filter, value):
130 for k in resources.keys():
131 if filter in resources[k]:
132 if resources[k][filter] == value:
136 def _extract_fields(self, info, fields):
137 return dict((k, info[k]) for k in fields if k in info)
139 def discover_fields(self):
140 resources = self._resources
142 for k, data in resources.iteritems():
144 if field not in fields:
148 def discover_leases(self, resourceId=None):
149 leases = self._leases
151 if resourceId is not None:
152 resource_ids = resourceId
153 if not isinstance(resourceId, list):
154 resource_ids = [resource_ids]
155 leases = self._filterbyresourceId(leases, resource_ids)
158 def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
160 if rtype not in ['node', 'channel']:
161 raise RuntimeError("Unknown type")
163 finish_time = start_time + duration * slot
165 leases_resources = dict()
166 reservable_resources = dict()
167 for lid, lease in leases.iteritems():
168 if lease[0]['type'] == rtype:
169 leases_resources.update({lid: lease})
170 #print leases_resources
171 for rid, resource in resources.iteritems():
172 if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
173 reservable_resources.update({rid: resource})
174 elif rtype == 'channel':
175 reservable_resources.update({rid: resource})
176 #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
177 # (in case adding exclusive tag to channels)
179 free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
181 if len(free_resources) >= quantity:
182 free_resources = free_resources[:quantity]
183 for rid, resource in resources.iteritems():
184 if rid in free_resources:
185 result[rid] = resource
189 new_quan = quantity - len(free_resources)
192 for lid, lease in leases_resources.iteritems():
194 st = int(l['start_time'])
195 ft = st + int(l['duration']) * slot
196 if (st <= finish_time <= ft) or (st <= start_time <= ft):
197 if lid in maybe_free:
198 maybe_free.remove(lid)
201 if lid not in maybe_free:
202 maybe_free.append(lid)
203 if len(maybe_free) >= new_quan:
204 free_resources = [free_resources, maybe_free]
205 free_resources = sum(free_resources, [])
206 for rid, resource in resources.iteritems():
207 if rid in free_resources:
208 result[rid] = resource
210 #return free_resources
211 warnings.warn("There aren't enough nodes")
214 def provision_resource(self, new_resource, start_time = None, duration = None):
217 xml = self._fetch_slice_info()
218 new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
219 new_resource, start_time, duration, self._aggregate)
220 fh, fname = tempfile.mkstemp()
222 os.write(fh, new_xml)
225 command_options = self._sfi_command_options()
226 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
227 out = self._sfi_command_exec(command)
230 xml = self._fetch_slice_info()
231 return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
232 duration, self._aggregate)
234 def release_resource(self, resource, start_time = None, duration = None):
237 xml = self._fetch_slice_info()
238 new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
239 start_time, duration, self._aggregate)
240 fh, fname = tempfile.mkstemp()
242 os.write(fh, new_xml)
245 command_options = self._sfi_command_options()
246 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
247 out = self._sfi_command_exec(command)
250 xml = self._fetch_slice_info()
251 return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
252 duration, self._aggregate)
255 class SFAApiFactory(object):
256 lock = threading.Lock()
260 def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
261 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
263 key = cls.make_key(aggregate = 'ple', slice_id, sfi_auth, sfi_user, sfi_registry, sfi_sm,
264 timeout, private_key)
265 api = cls._apis.get(key)
267 api._fetch_resources_info(resources = False)
268 api._fetch_slice_info()
272 api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
273 sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
279 def make_key(cls, *args):
280 skey = "".join(map(str, args))
281 return hashlib.md5(skey).hexdigest()