f3df754c95a0ff39d362a77f96752fa0d9045e5e
[nepi.git] / src / nepi / util / sfa_api.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
20
21
22 import logging
23 import hashlib
24
25 from sfa_sfav1 import SFAResourcesParser
26 import subprocess
27 import warnings
28
29 import threading
30
31 class SFAApi(object):
32
33     def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
34             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
35     
36         self._resources = dict()
37         self._reservable_resources = list()
38         self._leases = dict()
39         self._slice_tags = dict()
40         self._slice_resources = set()
41         self._slice_leases = set()
42         self._aggregate = aggregate
43         self._slice_hrn = slice_id
44         # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
45         # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
46         self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
47         self._lock = threading.Lock()
48
49         # Paremeters to contact the XMLRPC SFA service
50         self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
51                 '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
52                 '-k': private_key}
53
54         #self._logger = logging.getLogger('nepi.utils.sfiapi')
55         self._fetch_resources_info()
56         self._fetch_slice_info()
57
58     def _sfi_command_options(self):
59         command_options = " ".join("%s %s" % (k,v) for (k,v) in \
60                 self._sfi_parameters.iteritems() if v is not None)
61         return command_options
62
63     def _sfi_command_exec(self, command):
64         args = command.split(" ")
65         s = subprocess.Popen(args, stdout = subprocess.PIPE,
66                 stdin = subprocess.PIPE)
67         xml, err = s.communicate()
68         if err:
69            raise RuntimeError("Command excecution problem, error: %s", err)
70         return xml
71
72     def _fetch_resources_info(self, resources = True):
73         command_options = self._sfi_command_options()
74         command = "sfi.py " + command_options + " resources -l all"
75         try:
76             xml = self._sfi_command_exec(command)
77         except:
78             #self._logger.error("Error in SFA responds: %s", xml)
79             raise
80         if resources:
81             self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
82         else:
83             self._leases = self._parser.resources_from_xml(xml)
84         #self._update_reservable()
85         return xml
86     
87     def _fetch_slice_info(self):
88         command_options = self._sfi_command_options()
89         command = "sfi.py " + command_options + " resources -l all"
90         command = command + " " + self._slice_hrn
91         try:
92             xml = self._sfi_command_exec(command)
93         except:
94             #self._logger.error("Error in SFA responds: %s", xml)
95             raise
96         self._slice_resources, self._slice_leases, self._slice_tags = \
97             self._parser.resources_from_xml(xml, sliver = True, resources = True)
98         return xml
99
100     def _update_reservable(self):
101         for rid, r in self._resources.iteritems():
102             if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
103                  or (r['resource_type'] == 'channel'):
104                 self._reservable_resources.append(rid)
105
106
107     def discover_resources(self, resourceId=None, fields=[], **kwargs):
108         result = dict()
109         resources = self._resources
110
111         if resourceId is not None:
112             resource_ids = resourceId
113             if not isinstance(resource_ids, list):
114                 resource_ids = [resource_ids]
115             resources = self._filter_by_resourceId(resources, resource_ids)
116         else:
117             for filter, value in kwargs.items():
118                 resources = self._filter_by_filter(resources, filter, value)
119         if not fields:
120             return resources
121         else:
122             for k, info in resources.iteritems():
123                 info = self._extract_fields(info, fields)
124                 result[k] = info
125             return result
126                 
127     def _filter_by_resourceId(self, resources, resource_ids):
128         return dict((k, resources[k]) for k in resource_ids if k in resources)
129
130     def _filter_by_filter(self, resources, filter, value):
131         d = dict()
132         for k in resources.keys():
133             if filter in resources[k]:
134                 if resources[k][filter] == value:
135                     d[k] = resources[k]
136         return d
137                
138     def _extract_fields(self, info, fields):
139         return dict((k, info[k]) for k in fields if k in info)
140
141     def discover_fields(self):
142         resources = self._resources
143         fields = []
144         for k, data in resources.iteritems():
145             for field in data:
146                 if field not in fields:
147                     fields.append(field)
148         return fields
149
150     def discover_leases(self, resourceId=None):
151         leases = self._leases
152
153         if resourceId is not None:
154             resource_ids = resourceId
155             if not isinstance(resourceId, list):
156                 resource_ids = [resource_ids]
157             leases = self._filterbyresourceId(leases, resource_ids)
158         return leases
159
160     def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
161         result = dict()
162         if rtype not in ['node', 'channel']:
163             raise RuntimeError("Unknown type")
164
165         finish_time = start_time + duration * slot
166
167         leases_resources = dict()
168         reservable_resources = dict()
169         for lid, lease in leases.iteritems():
170             if lease[0]['type'] == rtype:
171                 leases_resources.update({lid: lease})
172         #print leases_resources
173         for rid, resource in resources.iteritems():
174             if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
175                 reservable_resources.update({rid: resource})
176             elif rtype == 'channel':
177                 reservable_resources.update({rid: resource})
178             #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
179             # (in case adding exclusive tag to channels)
180
181         free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
182     
183         if len(free_resources) >= quantity:
184             free_resources = free_resources[:quantity]
185             for rid, resource in resources.iteritems():
186                 if rid in free_resources:
187                     result[rid] = resource
188             return result
189         else:
190             maybe_free = []
191             new_quan = quantity - len(free_resources)
192             print new_quan
193
194             for lid, lease in leases_resources.iteritems():
195                 for l in lease:
196                     st = int(l['start_time'])
197                     ft = st + int(l['duration']) * slot
198                     if (st <= finish_time <= ft) or (st <= start_time <= ft):
199                         if lid in maybe_free:
200                             maybe_free.remove(lid)
201                         break
202                     else:
203                         if lid not in maybe_free:
204                             maybe_free.append(lid)
205                 if len(maybe_free) >= new_quan:
206                     free_resources = [free_resources, maybe_free]
207                     free_resources = sum(free_resources, [])
208                     for rid, resource in resources.iteritems():
209                         if rid in free_resources:
210                             result[rid] = resource
211                         return result
212                     #return free_resources
213             warnings.warn("There aren't enough nodes")
214
215                                  
216     def provision_resource(self, new_resource, start_time = None, duration = None):
217         import os, tempfile
218         with self._lock:
219             xml = self._fetch_slice_info()
220             new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
221             new_resource, start_time, duration, self._aggregate)
222             fh, fname = tempfile.mkstemp()
223             print fname
224             os.write(fh, new_xml)
225             os.close(fh)
226             try:
227                 command_options = self._sfi_command_options()
228                 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
229                 out = self._sfi_command_exec(command)
230             except:
231                 raise
232         xml = self._fetch_slice_info()
233         return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
234                 duration, self._aggregate)
235
236     def release_resource(self, resource, start_time = None, duration = None):
237         import os, tempfile
238         with self._lock:
239             xml = self._fetch_slice_info()
240             new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
241             start_time, duration, self._aggregate)
242             fh, fname = tempfile.mkstemp()
243             print fname
244             os.write(fh, new_xml)
245             os.close(fh)
246             try:
247                 command_options = self._sfi_command_options()
248                 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
249                 out = self._sfi_command_exec(command)
250             except:
251                 raise
252         xml = self._fetch_slice_info()
253         return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
254             duration, self._aggregate)
255
256
257 class SFAApiFactory(object):
258     lock = threading.Lock()
259     _apis = dict()
260
261     @classmethod
262     def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
263             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
264
265         key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry,
266                            sfi_sm, timeout, private_key, aggregate = 'ple')
267         api = cls._apis.get(key)
268         cls.lock.acquire()
269         api._fetch_resources_info(resources = False)
270         api._fetch_slice_info()
271         cls.lock.release()
272
273         if not api:
274             api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
275             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
276             cls._apis[key] = api
277
278         return api
279
280     @classmethod
281     def make_key(cls, *args):
282         skey = "".join(map(str, args))
283         return hashlib.md5(skey).hexdigest()
284