Merge remote-tracking branch 'origin/geni-v3' into geni-v3
[sfa.git] / sfa / dummy / dummydriver.py
1 import time
2 import datetime
3 #
4 from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \
5     RecordNotFound, SfaNotImplemented, SliverDoesNotExist, SearchFailed, \
6     UnsupportedOperation, Forbidden
7 from sfa.util.sfalogging import logger
8 from sfa.util.defaultdict import defaultdict
9 from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch
10 from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf
11 from sfa.util.cache import Cache
12
13 # one would think the driver should not need to mess with the SFA db, but..
14 from sfa.storage.alchemy import dbsession
15 from sfa.storage.model import RegRecord, SliverAllocation
16 from sfa.trust.credential import Credential
17
18 # used to be used in get_ticket
19 #from sfa.trust.sfaticket import SfaTicket
20 from sfa.rspecs.version_manager import VersionManager
21 from sfa.rspecs.rspec import RSpec
22
23 # the driver interface, mostly provides default behaviours
24 from sfa.managers.driver import Driver
25
26 from sfa.dummy.dummyshell import DummyShell
27 from sfa.dummy.dummyaggregate import DummyAggregate
28 from sfa.dummy.dummyslices import DummySlices
29 from sfa.dummy.dummyxrn import DummyXrn, slicename_to_hrn, hostname_to_hrn, hrn_to_dummy_slicename, xrn_to_hostname
30
31
32 def list_to_dict(recs, key):
33     """
34     convert a list of dictionaries into a dictionary keyed on the 
35     specified dictionary key 
36     """
37     return dict ( [ (rec[key],rec) for rec in recs ] )
38
39 #
40 # DummyShell is just an xmlrpc serverproxy where methods can be sent as-is; 
41
42 class DummyDriver (Driver):
43
44     # the cache instance is a class member so it survives across incoming requests
45     cache = None
46
47     def __init__ (self, config):
48         Driver.__init__ (self, config)
49         self.config = config
50         self.hrn = config.SFA_INTERFACE_HRN
51         self.root_auth = config.SFA_REGISTRY_ROOT_AUTH
52         self.shell = DummyShell (config)
53         self.testbedInfo = self.shell.GetTestbedInfo()
54  
55     def check_sliver_credentials(self, creds, urns):
56         # build list of cred object hrns
57         slice_cred_names = []
58         for cred in creds:
59             slice_cred_hrn = Credential(cred=cred).get_gid_object().get_hrn()
60             slice_cred_names.append(DummyXrn(xrn=slice_cred_hrn).dummy_slicename())
61
62         # look up slice name of slivers listed in urns arg
63         slice_ids = []
64         for urn in urns:
65             sliver_id_parts = Xrn(xrn=urn).get_sliver_id_parts()
66             try:
67                 slice_ids.append(int(sliver_id_parts[0]))
68             except ValueError:
69                 pass
70
71         if not slice_ids:
72              raise Forbidden("sliver urn not provided")
73
74         slices = self.shell.GetSlices({'slice_ids': slice_ids})
75         sliver_names = [slice['slice_name'] for slice in slices]
76
77         # make sure we have a credential for every specified sliver ierd
78         for sliver_name in sliver_names:
79             if sliver_name not in slice_cred_names:
80                 msg = "Valid credential not found for target: %s" % sliver_name
81                 raise Forbidden(msg)
82
83     ########################################
84     ########## registry oriented
85     ########################################
86
87     def augment_records_with_testbed_info (self, sfa_records):
88         return self.fill_record_info (sfa_records)
89
90     ########## 
91     def register (self, sfa_record, hrn, pub_key):
92         type = sfa_record['type']
93         dummy_record = self.sfa_fields_to_dummy_fields(type, hrn, sfa_record)
94         
95         if type == 'authority':
96             pointer = -1
97
98         elif type == 'slice':
99             slices = self.shell.GetSlices({'slice_name': dummy_record['slice_name']})
100             if not slices:
101                  pointer = self.shell.AddSlice(dummy_record)
102             else:
103                  pointer = slices[0]['slice_id']
104
105         elif type == 'user':
106             users = self.shell.GetUsers({'email':sfa_record['email']})
107             if not users:
108                 pointer = self.shell.AddUser(dummy_record)
109             else:
110                 pointer = users[0]['user_id']
111     
112             # Add the user's key
113             if pub_key:
114                 self.shell.AddUserKey({'user_id' : pointer, 'key' : pub_key})
115
116         elif type == 'node':
117             nodes = self.shell.GetNodes(dummy_record['hostname'])
118             if not nodes:
119                 pointer = self.shell.AddNode(dummy_record)
120             else:
121                 pointer = users[0]['node_id']
122     
123         return pointer
124         
125     ##########
126     def update (self, old_sfa_record, new_sfa_record, hrn, new_key):
127         pointer = old_sfa_record['pointer']
128         type = old_sfa_record['type']
129         dummy_record=self.sfa_fields_to_dummy_fields(type, hrn, new_sfa_record)
130
131         # new_key implemented for users only
132         if new_key and type not in [ 'user' ]:
133             raise UnknownSfaType(type)
134
135     
136         if type == "slice":
137             self.shell.UpdateSlice({'slice_id': pointer, 'fields': dummy_record})
138     
139         elif type == "user":
140             self.shell.UpdateUser({'user_id': pointer, 'fields': dummy_record})
141
142             if new_key:
143                 self.shell.AddUserKey({'user_id' : pointer, 'key' : new_key})
144
145         elif type == "node":
146             self.shell.UpdateNode({'node_id': pointer, 'fields': dummy_record})
147
148
149         return True
150         
151
152     ##########
153     def remove (self, sfa_record):
154         type=sfa_record['type']
155         pointer=sfa_record['pointer']
156         if type == 'user':
157             self.shell.DeleteUser({'user_id': pointer})
158         elif type == 'slice':
159             self.shell.DeleteSlice({'slice_id': pointer})
160         elif type == 'node':
161             self.shell.DeleteNode({'node_id': pointer})
162
163         return True
164
165
166
167
168
169     ##
170     # Convert SFA fields to Dummy testbed fields for use when registering or updating
171     # registry record in the dummy testbed
172     #
173
174     def sfa_fields_to_dummy_fields(self, type, hrn, sfa_record):
175
176         dummy_record = {}
177  
178         if type == "slice":
179             dummy_record["slice_name"] = hrn_to_dummy_slicename(hrn)
180         
181         elif type == "node":
182             if "hostname" not in sfa_record:
183                 raise MissingSfaInfo("hostname")
184             dummy_record["hostname"] = sfa_record["hostname"]
185             if "type" in sfa_record:
186                dummy_record["type"] = sfa_record["type"]
187             else:
188                dummy_record["type"] = "dummy_type"
189  
190         elif type == "authority":
191             dummy_record["name"] = hrn
192
193         elif type == "user":
194             dummy_record["user_name"] = sfa_record["email"].split('@')[0]
195             dummy_record["email"] = sfa_record["email"]
196
197         return dummy_record
198
199     ####################
200     def fill_record_info(self, records):
201         """
202         Given a (list of) SFA record, fill in the DUMMY TESTBED specific 
203         and SFA specific fields in the record. 
204         """
205         if not isinstance(records, list):
206             records = [records]
207
208         self.fill_record_dummy_info(records)
209         self.fill_record_hrns(records)
210         self.fill_record_sfa_info(records)
211         return records
212
213     def fill_record_dummy_info(self, records):
214         """
215         Fill in the DUMMY specific fields of a SFA record. This
216         involves calling the appropriate DUMMY method to retrieve the 
217         database record for the object.
218             
219         @param record: record to fill in field (in/out param)     
220         """
221         # get ids by type
222         node_ids, slice_ids, user_ids = [], [], [] 
223         type_map = {'node': node_ids, 'slice': slice_ids, 'user': user_ids}
224                   
225         for record in records:
226             for type in type_map:
227                 if type == record['type']:
228                     type_map[type].append(record['pointer'])
229
230         # get dummy records
231         nodes, slices, users = {}, {}, {}
232         if node_ids:
233             node_list = self.shell.GetNodes({'node_ids':node_ids})
234             nodes = list_to_dict(node_list, 'node_id')
235         if slice_ids:
236             slice_list = self.shell.GetSlices({'slice_ids':slice_ids})
237             slices = list_to_dict(slice_list, 'slice_id')
238         if user_ids:
239             user_list = self.shell.GetUsers({'user_ids': user_ids})
240             users = list_to_dict(user_list, 'user_id')
241
242         dummy_records = {'node': nodes, 'slice': slices, 'user': users}
243
244
245         # fill record info
246         for record in records:
247             # records with pointer==-1 do not have dummy info.
248             if record['pointer'] == -1:
249                 continue
250            
251             for type in dummy_records:
252                 if record['type'] == type:
253                     if record['pointer'] in dummy_records[type]:
254                         record.update(dummy_records[type][record['pointer']])
255                         break
256             # fill in key info
257             if record['type'] == 'user':
258                 record['key_ids'] = []
259                 recors['keys'] = []
260                 for key in dummy_records['user'][record['pointer']]['keys']:
261                      record['key_ids'].append(-1)
262                      recors['keys'].append(key)
263
264         return records
265
266     def fill_record_hrns(self, records):
267         """
268         convert dummy ids to hrns
269         """
270
271         # get ids
272         slice_ids, user_ids, node_ids = [], [], []
273         for record in records:
274             if 'user_ids' in record:
275                 user_ids.extend(record['user_ids'])
276             if 'slice_ids' in record:
277                 slice_ids.extend(record['slice_ids'])
278             if 'node_ids' in record:
279                 node_ids.extend(record['node_ids'])
280
281         # get dummy records
282         slices, users, nodes = {}, {}, {}
283         if user_ids:
284             user_list = self.shell.GetUsers({'user_ids': user_ids})
285             users = list_to_dict(user_list, 'user_id')
286         if slice_ids:
287             slice_list = self.shell.GetSlices({'slice_ids': slice_ids})
288             slices = list_to_dict(slice_list, 'slice_id')       
289         if node_ids:
290             node_list = self.shell.GetNodes({'node_ids': node_ids})
291             nodes = list_to_dict(node_list, 'node_id')
292        
293         # convert ids to hrns
294         for record in records:
295             # get all relevant data
296             type = record['type']
297             pointer = record['pointer']
298             testbed_name = self.testbed_name()
299             auth_hrn = self.hrn
300             if pointer == -1:
301                 continue
302
303             if 'user_ids' in record:
304                 emails = [users[user_id]['email'] for user_id in record['user_ids'] \
305                           if user_id in  users]
306                 usernames = [email.split('@')[0] for email in emails]
307                 user_hrns = [".".join([auth_hrn, testbed_name, username]) for username in usernames]
308                 record['users'] = user_hrns 
309             if 'slice_ids' in record:
310                 slicenames = [slices[slice_id]['slice_name'] for slice_id in record['slice_ids'] \
311                               if slice_id in slices]
312                 slice_hrns = [slicename_to_hrn(auth_hrn, slicename) for slicename in slicenames]
313                 record['slices'] = slice_hrns
314             if 'node_ids' in record:
315                 hostnames = [nodes[node_id]['hostname'] for node_id in record['node_ids'] \
316                              if node_id in nodes]
317                 node_hrns = [hostname_to_hrn(auth_hrn, login_base, hostname) for hostname in hostnames]
318                 record['nodes'] = node_hrns
319
320             
321         return records   
322
323     def fill_record_sfa_info(self, records):
324
325         def startswith(prefix, values):
326             return [value for value in values if value.startswith(prefix)]
327
328         # get user ids
329         user_ids = []
330         for record in records:
331             user_ids.extend(record.get("user_ids", []))
332         
333         # get sfa records for all records associated with these records.   
334         # we'll replace pl ids (person_ids) with hrns from the sfa records
335         # we obtain
336         
337         # get the registry records
338         user_list, users = [], {}
339         user_list = dbsession.query (RegRecord).filter(RegRecord.pointer.in_(user_ids))
340         # create a hrns keyed on the sfa record's pointer.
341         # Its possible for multiple records to have the same pointer so
342         # the dict's value will be a list of hrns.
343         users = defaultdict(list)
344         for user in user_list:
345             users[user.pointer].append(user)
346
347         # get the dummy records
348         dummy_user_list, dummy_users = [], {}
349         dummy_user_list = self.shell.GetUsers({'user_ids': user_ids})
350         dummy_users = list_to_dict(dummy_user_list, 'user_id')
351
352         # fill sfa info
353         for record in records:
354             # skip records with no pl info (top level authorities)
355             #if record['pointer'] == -1:
356             #    continue 
357             sfa_info = {}
358             type = record['type']
359             logger.info("fill_record_sfa_info - incoming record typed %s"%type)
360             if (type == "slice"):
361                 # all slice users are researchers
362                 record['geni_urn'] = hrn_to_urn(record['hrn'], 'slice')
363                 record['PI'] = []
364                 record['researcher'] = []
365                 for user_id in record.get('user_ids', []):
366                     hrns = [user.hrn for user in users[user_id]]
367                     record['researcher'].extend(hrns)                
368
369             elif (type.startswith("authority")):
370                 record['url'] = None
371                 logger.info("fill_record_sfa_info - authority xherex")
372
373             elif (type == "node"):
374                 sfa_info['dns'] = record.get("hostname", "")
375                 # xxx TODO: URI, LatLong, IP, DNS
376     
377             elif (type == "user"):
378                 logger.info('setting user.email')
379                 sfa_info['email'] = record.get("email", "")
380                 sfa_info['geni_urn'] = hrn_to_urn(record['hrn'], 'user')
381                 sfa_info['geni_certificate'] = record['gid'] 
382                 # xxx TODO: PostalAddress, Phone
383             record.update(sfa_info)
384
385
386     ####################
387     def update_relation (self, subject_type, target_type, relation_name, subject_id, target_ids):
388         # hard-wire the code for slice/user for now, could be smarter if needed
389         if subject_type =='slice' and target_type == 'user' and relation_name == 'researcher':
390             subject=self.shell.GetSlices ({'slice_id': subject_id})[0]
391             if 'user_ids' not in subject.keys():
392                  subject['user_ids'] = []
393             current_target_ids = subject['user_ids']
394             add_target_ids = list ( set (target_ids).difference(current_target_ids))
395             del_target_ids = list ( set (current_target_ids).difference(target_ids))
396             logger.debug ("subject_id = %s (type=%s)"%(subject_id,type(subject_id)))
397             for target_id in add_target_ids:
398                 self.shell.AddUserToSlice ({'user_id': target_id, 'slice_id': subject_id})
399                 logger.debug ("add_target_id = %s (type=%s)"%(target_id,type(target_id)))
400             for target_id in del_target_ids:
401                 logger.debug ("del_target_id = %s (type=%s)"%(target_id,type(target_id)))
402                 self.shell.DeleteUserFromSlice ({'user_id': target_id, 'slice_id': subject_id})
403         else:
404             logger.info('unexpected relation %s to maintain, %s -> %s'%(relation_name,subject_type,target_type))
405
406         
407     ########################################
408     ########## aggregate oriented
409     ########################################
410
411     def testbed_name (self): return "dummy"
412
413     def aggregate_version (self):
414         return {}
415
416     def list_resources (self, version=None, options={}):
417         aggregate = DummyAggregate(self)
418         rspec =  aggregate.list_resources(version=version, options=options)
419         return rspec
420
421     def describe(self, urns, version, options={}):
422         aggregate = DummyAggregate(self)
423         return aggregate.describe(urns, version=version, options=options)
424     
425     def status (self, urns, options={}):
426         aggregate = DummyAggregate(self)
427         desc =  aggregate.describe(urns, version='GENI 3')
428         status = {'geni_urn': desc['geni_urn'],
429                   'geni_slivers': desc['geni_slivers']}
430         return status
431
432         
433     def allocate (self, urn, rspec_string, expiration, options={}):
434         xrn = Xrn(urn)
435         aggregate = DummyAggregate(self)
436         slices = DummySlices(self)
437         slice_record=None
438         users = options.get('geni_users', [])
439         if users:
440             slice_record = users[0].get('slice_record', {})
441
442         # parse rspec
443         rspec = RSpec(rspec_string)
444         requested_attributes = rspec.version.get_slice_attributes()
445
446         # ensure slice record exists
447         slice = slices.verify_slice(xrn.hrn, slice_record, expiration=expiration, options=options)
448         # ensure person records exists
449         #persons = slices.verify_persons(xrn.hrn, slice, users, peer, sfa_peer, options=options)
450
451         # add/remove slice from nodes
452         request_nodes = rspec.version.get_nodes_with_slivers()
453         nodes = slices.verify_slice_nodes(urn, slice, request_nodes)
454
455         return aggregate.describe([xrn.get_urn()], version=rspec.version)
456
457     def provision(self, urns, options={}):
458         # update users
459         slices = DummySlices(self)
460         aggregate = DummyAggregate(self)
461         slivers = aggregate.get_slivers(urns)
462         slice = slivers[0]
463         geni_users = options.get('geni_users', [])
464         #users = slices.verify_users(None, slice, geni_users, options=options)
465         # update sliver allocation states and set them to geni_provisioned
466         sliver_ids = [sliver['sliver_id'] for sliver in slivers]
467         SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned')
468         version_manager = VersionManager()
469         rspec_version = version_manager.get_version(options['geni_rspec_version'])
470         return self.describe(urns, rspec_version, options=options)
471
472     def delete(self, urns, options={}):
473         # collect sliver ids so we can update sliver allocation states after
474         # we remove the slivers.
475         aggregate = DummyAggregate(self)
476         slivers = aggregate.get_slivers(urns)
477         if slivers:
478             slice_id = slivers[0]['slice_id']
479             node_ids = []
480             sliver_ids = []
481             for sliver in slivers:
482                 node_ids.append(sliver['node_id'])
483                 sliver_ids.append(sliver['sliver_id'])
484
485             # determine if this is a peer slice
486             # xxx I wonder if this would not need to use PlSlices.get_peer instead 
487             # in which case plc.peers could be deprecated as this here
488             # is the only/last call to this last method in plc.peers
489             slice_hrn = DummyXrn(auth=self.hrn, slicename=slivers[0]['slice_name']).get_hrn()
490             try:
491                 self.shell.DeleteSliceFromNodes({'slice_id': slice_id, 'node_ids': node_ids})
492                 # delete sliver allocation states
493                 SliverAllocation.delete_allocations(sliver_ids)
494             finally:
495                 pass
496
497         # prepare return struct
498         geni_slivers = []
499         for sliver in slivers:
500             geni_slivers.append(
501                 {'geni_sliver_urn': sliver['sliver_id'],
502                  'geni_allocation_status': 'geni_unallocated',
503                  'geni_expires': datetime_to_string(utcparse(sliver['expires']))})  
504         return geni_slivers
505
506     def renew (self, urns, expiration_time, options={}):
507         aggregate = DummyAggregate(self)
508         slivers = aggregate.get_slivers(urns)
509         if not slivers:
510             raise SearchFailed(urns)
511         slice = slivers[0]
512         requested_time = utcparse(expiration_time)
513         record = {'expires': int(datetime_to_epoch(requested_time))}
514         self.shell.UpdateSlice({'slice_id': slice['slice_id'], 'fileds': record})
515         description = self.describe(urns, 'GENI 3', options)
516         return description['geni_slivers']
517
518     def perform_operational_action (self, urns, action, options={}):
519         # Dummy doesn't support operational actions. Lets pretend like it
520         # supports start, but reject everything else.
521         action = action.lower()
522         if action not in ['geni_start']:
523             raise UnsupportedOperation(action)
524
525         # fault if sliver is not full allocated (operational status is geni_pending_allocation)
526         description = self.describe(urns, 'GENI 3', options)
527         for sliver in description['geni_slivers']:
528             if sliver['geni_operational_status'] == 'geni_pending_allocation':
529                 raise UnsupportedOperation(action, "Sliver must be fully allocated (operational status is not geni_pending_allocation)")
530         #
531         # Perform Operational Action Here
532         #
533
534         geni_slivers = self.describe(urns, 'GENI 3', options)['geni_slivers']
535         return geni_slivers
536
537     def shutdown (self, xrn, options={}):
538         xrn = DummyXrn(xrn=xrn, type='slice')
539         slicename = xrn.pl_slicename()
540         slices = self.shell.GetSlices({'name': slicename}, ['slice_id'])
541         if not slices:
542             raise RecordNotFound(slice_hrn)
543         slice_id = slices[0]['slice_id']
544         slice_tags = self.shell.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'})
545         if not slice_tags:
546             self.shell.AddSliceTag(slice_id, 'enabled', '0')
547         elif slice_tags[0]['value'] != "0":
548             tag_id = slice_tags[0]['slice_tag_id']
549             self.shell.UpdateSliceTag(tag_id, '0')
550         return 1