Adding warning method to utils Logger
[nepi.git] / src / nepi / util / sfa_api.py
1 #
2 #    NEPI, a framework to manage network experiments
3 #    Copyright (C) 2013 INRIA
4 #
5 #    This program is free software: you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation, either version 3 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # Author: Alina Quereilhac <alina.quereilhac@inria.fr>
19 #         Lucia Guevgeozian Odizzio <lucia.guevgeozian_odizzio@inria.fr>
20
21
22 import logging
23 import hashlib
24
25 from sfa_sfav1 import SFAResourcesParser
26 import subprocess
27
28 # TODO: Use nepi utils Logger instead of warnings!
29 import warnings
30
31 import threading
32
33 class SFAApi(object):
34
35     def __init__(self, aggregate = 'ple', slice_id = None, sfi_auth = None, sfi_user = None,
36             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
37     
38         self._resources = dict()
39         self._reservable_resources = list()
40         self._leases = dict()
41         self._slice_tags = dict()
42         self._slice_resources = set()
43         self._slice_leases = set()
44         self._aggregate = aggregate
45         self._slice_hrn = slice_id
46         # TODO: take into account Rspec version, SFA V1, GENI V2, GENI V3
47         # For now is SFA V1 from PlanetLab and Nitos (wrong namespace)
48         self._parser = sfa_sfav1.SFAResourcesParser(['ple', 'omf'])
49         self._lock = threading.Lock()
50
51         # Paremeters to contact the XMLRPC SFA service
52         self._sfi_parameters = {'-a': sfi_auth, '-u': sfi_user,
53                 '-r': sfi_registry, '-s': sfi_sm, '-t': timeout,
54                 '-k': private_key}
55
56         #self._logger = logging.getLogger('nepi.utils.sfiapi')
57         self._fetch_resources_info()
58         self._fetch_slice_info()
59
60     def _sfi_command_options(self):
61         command_options = " ".join("%s %s" % (k,v) for (k,v) in \
62                 self._sfi_parameters.iteritems() if v is not None)
63         return command_options
64
65     def _sfi_command_exec(self, command):
66         args = command.split(" ")
67         s = subprocess.Popen(args, stdout = subprocess.PIPE,
68                 stdin = subprocess.PIPE)
69         xml, err = s.communicate()
70         if err:
71            raise RuntimeError("Command excecution problem, error: %s", err)
72         return xml
73
74     def _fetch_resources_info(self, resources = True):
75         command_options = self._sfi_command_options()
76         command = "sfi.py " + command_options + " resources -l all"
77         try:
78             xml = self._sfi_command_exec(command)
79         except:
80             #self._logger.error("Error in SFA responds: %s", xml)
81             raise
82         if resources:
83             self._resources, self._leases = self._parser.resources_from_xml(xml, resources = True)
84         else:
85             self._leases = self._parser.resources_from_xml(xml)
86         #self._update_reservable()
87         return xml
88     
89     def _fetch_slice_info(self):
90         command_options = self._sfi_command_options()
91         command = "sfi.py " + command_options + " resources -l all"
92         command = command + " " + self._slice_hrn
93         try:
94             xml = self._sfi_command_exec(command)
95         except:
96             #self._logger.error("Error in SFA responds: %s", xml)
97             raise
98         self._slice_resources, self._slice_leases, self._slice_tags = \
99             self._parser.resources_from_xml(xml, sliver = True, resources = True)
100         return xml
101
102     def _update_reservable(self):
103         for rid, r in self._resources.iteritems():
104             if (r['resource_type'] == 'node' and r['exclusive'].upper() == 'TRUE') \
105                  or (r['resource_type'] == 'channel'):
106                 self._reservable_resources.append(rid)
107
108
109     def discover_resources(self, resourceId=None, fields=[], **kwargs):
110         result = dict()
111         resources = self._resources
112
113         if resourceId is not None:
114             resource_ids = resourceId
115             if not isinstance(resource_ids, list):
116                 resource_ids = [resource_ids]
117             resources = self._filter_by_resourceId(resources, resource_ids)
118         else:
119             for filter, value in kwargs.items():
120                 resources = self._filter_by_filter(resources, filter, value)
121         if not fields:
122             return resources
123         else:
124             for k, info in resources.iteritems():
125                 info = self._extract_fields(info, fields)
126                 result[k] = info
127             return result
128                 
129     def _filter_by_resourceId(self, resources, resource_ids):
130         return dict((k, resources[k]) for k in resource_ids if k in resources)
131
132     def _filter_by_filter(self, resources, filter, value):
133         d = dict()
134         for k in resources.keys():
135             if filter in resources[k]:
136                 if resources[k][filter] == value:
137                     d[k] = resources[k]
138         return d
139                
140     def _extract_fields(self, info, fields):
141         return dict((k, info[k]) for k in fields if k in info)
142
143     def discover_fields(self):
144         resources = self._resources
145         fields = []
146         for k, data in resources.iteritems():
147             for field in data:
148                 if field not in fields:
149                     fields.append(field)
150         return fields
151
152     def discover_leases(self, resourceId=None):
153         leases = self._leases
154
155         if resourceId is not None:
156             resource_ids = resourceId
157             if not isinstance(resourceId, list):
158                 resource_ids = [resource_ids]
159             leases = self._filterbyresourceId(leases, resource_ids)
160         return leases
161
162     def find_resources(self, leases, resources, rtype, quantity, start_time, duration, slot):
163         result = dict()
164         if rtype not in ['node', 'channel']:
165             raise RuntimeError("Unknown type")
166
167         finish_time = start_time + duration * slot
168
169         leases_resources = dict()
170         reservable_resources = dict()
171         for lid, lease in leases.iteritems():
172             if lease[0]['type'] == rtype:
173                 leases_resources.update({lid: lease})
174         #print leases_resources
175         for rid, resource in resources.iteritems():
176             if rtype == 'node' and (resource['type'] == 'node' and resource['exclusive'].upper() == 'TRUE'):
177                 reservable_resources.update({rid: resource})
178             elif rtype == 'channel':
179                 reservable_resources.update({rid: resource})
180             #if resource['type'] == 'rtype' and resources['exclusive'].upper() == 'TRUE':\
181             # (in case adding exclusive tag to channels)
182
183         free_resources = list(set(reservable_resources.keys()) - set(leases_resources.keys()))
184     
185         if len(free_resources) >= quantity:
186             free_resources = free_resources[:quantity]
187             for rid, resource in resources.iteritems():
188                 if rid in free_resources:
189                     result[rid] = resource
190             return result
191         else:
192             maybe_free = []
193             new_quan = quantity - len(free_resources)
194             print new_quan
195
196             for lid, lease in leases_resources.iteritems():
197                 for l in lease:
198                     st = int(l['start_time'])
199                     ft = st + int(l['duration']) * slot
200                     if (st <= finish_time <= ft) or (st <= start_time <= ft):
201                         if lid in maybe_free:
202                             maybe_free.remove(lid)
203                         break
204                     else:
205                         if lid not in maybe_free:
206                             maybe_free.append(lid)
207                 if len(maybe_free) >= new_quan:
208                     free_resources = [free_resources, maybe_free]
209                     free_resources = sum(free_resources, [])
210                     for rid, resource in resources.iteritems():
211                         if rid in free_resources:
212                             result[rid] = resource
213                         return result
214                     #return free_resources
215             warnings.warn("There aren't enough nodes")
216
217                                  
218     def provision_resource(self, new_resource, start_time = None, duration = None):
219         import os, tempfile
220         with self._lock:
221             xml = self._fetch_slice_info()
222             new_xml = self._parser.create_reservation_xml(xml, self._slice_hrn,\
223             new_resource, start_time, duration, self._aggregate)
224             fh, fname = tempfile.mkstemp()
225             print fname
226             os.write(fh, new_xml)
227             os.close(fh)
228             try:
229                 command_options = self._sfi_command_options()
230                 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
231                 out = self._sfi_command_exec(command)
232             except:
233                 raise
234         xml = self._fetch_slice_info()
235         return self._parser.verify_reservation_xml(xml, self._slice_hrn, new_resource, start_time,\
236                 duration, self._aggregate)
237
238     def release_resource(self, resource, start_time = None, duration = None):
239         import os, tempfile
240         with self._lock:
241             xml = self._fetch_slice_info()
242             new_xml = self._parser.release_reservation_xml(xml, self._slice_hrn, resource,\
243             start_time, duration, self._aggregate)
244             fh, fname = tempfile.mkstemp()
245             print fname
246             os.write(fh, new_xml)
247             os.close(fh)
248             try:
249                 command_options = self._sfi_command_options()
250                 command = "sfi.py " + command_options + " create %s %s" % (self._slice_hrn, fname)
251                 out = self._sfi_command_exec(command)
252             except:
253                 raise
254         xml = self._fetch_slice_info()
255         return not self._parser.verify_reservation_xml(xml, self._slice_hrn, resource, start_time,\
256             duration, self._aggregate)
257
258
259 class SFAApiFactory(object):
260     lock = threading.Lock()
261     _apis = dict()
262
263     @classmethod
264     def get_api(slice_id = None, sfi_auth = None, sfi_user = None,
265             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None):
266
267         key = cls.make_key(slice_id, sfi_auth, sfi_user, sfi_registry,
268                            sfi_sm, timeout, private_key, aggregate = 'ple')
269         api = cls._apis.get(key)
270         cls.lock.acquire()
271         api._fetch_resources_info(resources = False)
272         api._fetch_slice_info()
273         cls.lock.release()
274
275         if not api:
276             api = SFAApi(slice_id = None, sfi_auth = None, sfi_user = None,
277             sfi_registry = None, sfi_sm = None, timeout = None, private_key = None)
278             cls._apis[key] = api
279
280         return api
281
282     @classmethod
283     def make_key(cls, *args):
284         skey = "".join(map(str, args))
285         return hashlib.md5(skey).hexdigest()
286