X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=sfa%2Fsenslab%2Fslabdriver.py;h=f9c715da1a56680602032b6b97fb285d860f4a85;hb=e26914db04c9a129cf4d10a6bb20025837151f07;hp=337c5de810d33abc6a1bb3cb3c3b8a0e074a0650;hpb=81386c70035f6a184696bb5d07e01fb42fb3af0d;p=sfa.git diff --git a/sfa/senslab/slabdriver.py b/sfa/senslab/slabdriver.py index 337c5de8..f9c715da 100644 --- a/sfa/senslab/slabdriver.py +++ b/sfa/senslab/slabdriver.py @@ -1,12 +1,12 @@ import sys import subprocess -import datetime -from time import gmtime, strftime + +from datetime import datetime +from dateutil import tz +from time import strftime,gmtime from sfa.util.faults import MissingSfaInfo , SliverDoesNotExist -#from sfa.util.sfatime import datetime_to_string from sfa.util.sfalogging import logger -#from sfa.storage.table import SfaTable from sfa.util.defaultdict import defaultdict from sfa.storage.record import Record @@ -32,9 +32,9 @@ from sfa.util.plxrn import slicename_to_hrn, hostname_to_hrn, hrn_to_pl_slicenam ## thierry : please avoid wildcard imports :) from sfa.senslab.OARrestapi import OARrestapi from sfa.senslab.LDAPapi import LDAPapi -#from sfa.senslab.SenslabImportUsers import SenslabImportUsers + from sfa.senslab.parsing import parse_filter -from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SlabSliceDB +from sfa.senslab.slabpostgres import SlabDB, slab_dbsession,SliceSenslab from sfa.senslab.slabaggregate import SlabAggregate from sfa.senslab.slabslices import SlabSlices @@ -81,41 +81,51 @@ class SlabDriver(Driver): # shall return a structure as described in # http://groups.geni.net/geni/wiki/GAPI_AM_API_V2#SliverStatus # NT : not sure if we should implement this or not, but used by sface. - slices = self.GetSlices([slice_hrn]) - if len(slices) is 0: + + + sl = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn') + if len(sl) is 0: raise SliverDoesNotExist("%s slice_hrn" % (slice_hrn)) - sl = slices[0] - print >>sys.stderr, "\r\n \r\n_____________ Sliver status urn %s hrn %s slices %s \r\n " %(slice_urn,slice_hrn,slices) + + print >>sys.stderr, "\r\n \r\n_____________ Sliver status urn %s hrn %s sl %s \r\n " %(slice_urn,slice_hrn,sl) if sl['oar_job_id'] is not -1: # report about the local nodes only - nodes = self.GetNodes({'hostname':sl['node_ids']}, - ['node_id', 'hostname','site_login_base','boot_state']) + nodes_all = self.GetNodes({'hostname':sl['node_ids']}, + ['node_id', 'hostname','site','boot_state']) + nodeall_byhostname = dict([(n['hostname'], n) for n in nodes_all]) + nodes = sl['node_ids'] if len(nodes) is 0: raise SliverDoesNotExist("No slivers allocated ") - site_logins = [node['site_login_base'] for node in nodes] + result = {} top_level_status = 'unknown' if nodes: top_level_status = 'ready' result['geni_urn'] = slice_urn - result['slab_login'] = sl['job_user'] + result['pl_login'] = sl['job_user'] + #result['slab_login'] = sl['job_user'] - timestamp = float(sl['startTime']) + float(sl['walltime']) - result['slab_expires'] = strftime(self.time_format, gmtime(float(timestamp))) + timestamp = float(sl['startTime']) + float(sl['walltime']) + result['pl_expires'] = strftime(self.time_format, gmtime(float(timestamp))) + #result['slab_expires'] = strftime(self.time_format, gmtime(float(timestamp))) resources = [] for node in nodes: res = {} - res['slab_hostname'] = node['hostname'] - res['slab_boot_state'] = node['boot_state'] + #res['slab_hostname'] = node['hostname'] + #res['slab_boot_state'] = node['boot_state'] - sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'], node['node_id']) - res['geni_urn'] = sliver_id - if node['boot_state'] == 'Alive': + res['pl_hostname'] = nodeall_byhostname[node]['hostname'] + res['pl_boot_state'] = nodeall_byhostname[node]['boot_state'] + res['pl_last_contact'] = strftime(self.time_format, gmtime(float(timestamp))) + sliver_id = urn_to_sliver_id(slice_urn, sl['record_id_slice'],nodeall_byhostname[node]['node_id'] ) + res['geni_urn'] = sliver_id + if nodeall_byhostname[node]['boot_state'] == 'Alive': + #if node['boot_state'] == 'Alive': res['geni_status'] = 'ready' else: res['geni_status'] = 'failed' @@ -133,27 +143,16 @@ class SlabDriver(Driver): def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options): aggregate = SlabAggregate(self) - #aggregate = SlabAggregate(self) + slices = SlabSlices(self) peer = slices.get_peer(slice_hrn) sfa_peer = slices.get_sfa_peer(slice_hrn) slice_record=None - #print>>sys.stderr, " \r\n \r\n create_sliver creds %s \r\n \r\n users %s " %(creds,users) + if not isinstance(creds, list): creds = [creds] - #for cred in creds: - #cred_obj=Credential(string=cred) - #print >>sys.stderr," \r\n \r\n create_sliver cred %s " %(cred) - #GIDcall = cred_obj.get_gid_caller() - #GIDobj = cred_obj.get_gid_object() - #print >>sys.stderr," \r\n \r\n create_sliver GIDobj pubkey %s hrn %s " %(GIDobj.get_pubkey().get_pubkey_string(), GIDobj.get_hrn()) - #print >>sys.stderr," \r\n \r\n create_sliver GIDcall pubkey %s hrn %s" %(GIDcall.get_pubkey().get_pubkey_string(),GIDobj.get_hrn()) - - - #tmpcert = GID(string = users[0]['gid']) - #print >>sys.stderr," \r\n \r\n create_sliver tmpcer pubkey %s hrn %s " %(tmpcert.get_pubkey().get_pubkey_string(), tmpcert.get_hrn()) if users: slice_record = users[0].get('slice_record', {}) @@ -186,7 +185,7 @@ class SlabDriver(Driver): def delete_sliver (self, slice_urn, slice_hrn, creds, options): - slices = self.GetSlices({'slice_hrn': slice_hrn}) + slices = self.GetSlices(slice_filter= slice_hrn, filter_type = 'slice_hrn') if not slices: return 1 slice = slices[0] @@ -285,7 +284,7 @@ class SlabDriver(Driver): if key not in acceptable_fields: slab_record.pop(key) print>>sys.stderr, " \r\n \t\t SLABDRIVER.PY register" - slices = self.GetSlices([slab_record['hrn']]) + slices = self.GetSlices(slice_filter =slab_record['hrn'], filter_type = 'slice_hrn') if not slices: pointer = self.AddSlice(slab_record) else: @@ -390,7 +389,7 @@ class SlabDriver(Driver): if persons and persons[0]['site_ids']: self.DeletePerson(username) elif type == 'slice': - if self.GetSlices(hrn): + if self.GetSlices(slice_filter = hrn, filter_type = 'slice_hrn'): self.DeleteSlice(hrn) #elif type == 'authority': @@ -403,17 +402,21 @@ class SlabDriver(Driver): existing_records = {} existing_hrns_by_types= {} - all_records = dbsession.query(RegRecord).all + print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers auth = %s, peer_filter %s, return_field %s " %(auth , peer_filter, return_fields) + all_records = dbsession.query(RegRecord).filter(RegRecord.type.like('%authority%')).all() for record in all_records: existing_records[record.hrn] = record if record.type not in existing_hrns_by_types: existing_hrns_by_types[record.type] = [record.hrn] + print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t NOT IN existing_hrns_by_types %s " %( existing_hrns_by_types) else: + + print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers \t INNN type %s hrn %s " %( record.type,record.hrn ) existing_hrns_by_types.update({record.type:(existing_hrns_by_types[record.type].append(record.hrn))}) print >>sys.stderr, "\r\n \r\n SLABDRIVER GetPeers existing_hrns_by_types %s " %( existing_hrns_by_types) - return_records = [] - #records_list = table.findObjects({'type':'authority+sa'}) + records_list= [] + try: for hrn in existing_hrns_by_types['authority+sa']: records_list.append(existing_records[hrn]) @@ -446,8 +449,8 @@ class SlabDriver(Driver): return return_person_list def GetTimezone(self): - time = self.oar.parser.SendRequest("GET_timezone") - return time + server_timestamp,server_tz = self.oar.parser.SendRequest("GET_timezone") + return server_timestamp,server_tz def DeleteJobs(self, job_id, username): @@ -465,7 +468,6 @@ class SlabDriver(Driver): #'api_timestamp'] #assigned_res = ['resource_id', 'resource_uri'] #assigned_n = ['node', 'node_uri'] - if job_id and resources is False: req = "GET_jobs_id" @@ -474,8 +476,6 @@ class SlabDriver(Driver): if job_id and resources : req = "GET_jobs_id_resources" node_list_k = 'reserved_resources' - - #Get job info from OAR job_info = self.oar.parser.SendRequest(req, job_id, username) @@ -495,19 +495,15 @@ class SlabDriver(Driver): for node in node_list: node_hostname_list.append(node['hostname']) node_dict = dict(zip(node_hostname_list,node_list)) - - #print>>sys.stderr, "\r\n \r\n \r\n \r\n \r\n \t\t GetJobs GetNODES %s " %(node_list) try : - - #for n in job_info[node_list]: - #n = str(self.root_auth) + str(n) - liste =job_info[node_list_k] print>>sys.stderr, "\r\n \r\n \t\t GetJobs resources job_info liste%s" %(liste) for k in range(len(liste)): job_info[node_list_k][k] = node_dict[job_info[node_list_k][k]]['hostname'] print>>sys.stderr, "\r\n \r\n \t\t YYYYYYYYYYYYGetJobs resources job_info %s" %(job_info) + #Replaces the previous entry "assigned_network_address" / "reserved_resources" + #with "node_ids" job_info.update({'node_ids':job_info[node_list_k]}) del job_info[node_list_k] return job_info @@ -515,10 +511,14 @@ class SlabDriver(Driver): except KeyError: print>>sys.stderr, "\r\n \r\n \t\t GetJobs KEYERROR " - - - - + def GetReservedNodes(self): + # this function returns a list of all the nodes already involved in an oar job + + jobs=self.oar.parser.SendRequest("GET_jobs_details") + nodes=[] + for j in jobs : + nodes=j['assigned_network_address']+nodes + return nodes def GetNodes(self,node_filter= None, return_fields=None): @@ -544,38 +544,52 @@ class SlabDriver(Driver): return_site_list = parse_filter(site_dict.values(), site_filter,'site', return_fields) return return_site_list - #TODO : filtrer au niveau de la query voir sqlalchemy - #http://docs.sqlalchemy.org/en/latest/orm/tutorial.html#returning-lists-and-scalars - def GetSlices(self,slice_filter = None, return_fields=None): - #sliceslist = self.db.find('slice_senslab',columns = ['oar_job_id', 'slice_hrn', 'record_id_slice','record_id_user'], record_filter=slice_filter) - sliceslist = slab_dbsession.query(SlabSliceDB).all() - #sliceslist = slices_records.order_by("record_id_slice").all() - - print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(sliceslist,slice_filter) - - return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields) - - if return_slice_list: - for sl in return_slice_list: - #login = sl['slice_hrn'].split(".")[1].split("_")[0] - login = sl.slice_hrn.split(".")[1].split("_")[0] - print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices sl %s " %(sl) - if sl.oar_job_id is not -1: - rslt = self.GetJobs( sl.oar_job_id,resources=False, username = login ) + def GetSlices(self,slice_filter = None, filter_type = None, return_fields=None): + return_slice_list = [] + slicerec = {} + rec = {} + ftypes = ['slice_hrn', 'record_id_user'] + if filter_type and filter_type in ftypes: + if filter_type == 'slice_hrn': + slicerec = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = slice_filter).first() + if filter_type == 'record_id_user': + slicerec = slab_dbsession.query(SliceSenslab).filter_by(record_id_user = slice_filter).first() + + if slicerec: + rec = slicerec.dumpquerytodict() + login = slicerec.slice_hrn.split(".")[1].split("_")[0] + print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY slicerec GetSlices %s " %(slicerec) + if slicerec.oar_job_id is not -1: + rslt = self.GetJobs( slicerec.oar_job_id, resources=False, username = login ) print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices GetJobs %s " %(rslt) if rslt : - sl.update(rslt) - sl.update({'hrn':str(sl['slice_hrn'])}) - #If GetJobs is empty, this means the job is now in the 'Terminated' state - #Update the slice record + rec.update(rslt) + rec.update({'hrn':str(rec['slice_hrn'])}) + #If GetJobs is empty, this means the job is now in the 'Terminated' state + #Update the slice record else : - sl['oar_job_id'] = '-1' - sl.update({'hrn':str(sl['slice_hrn'])}) - #self.db.update_senslab_slice(sl) + self.db.update_job(slice_filter, job_id = -1) + rec['oar_job_id'] = -1 + rec.update({'hrn':str(rec['slice_hrn'])}) - print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices return_slice_list %s" %(return_slice_list) - return return_slice_list + print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices rec %s" %(rec) + + return rec + + + else: + return_slice_list = slab_dbsession.query(SliceSenslab).all() + + print >>sys.stderr, " \r\n \r\n \tSLABDRIVER.PY GetSlices slices %s slice_filter %s " %(return_slice_list,slice_filter) + + #if return_fields: + #return_slice_list = parse_filter(sliceslist, slice_filter,'slice', return_fields) + + + + return return_slice_list + @@ -692,14 +706,22 @@ class SlabDriver(Driver): reqdict['type'] = "deploy" reqdict['directory']= "" reqdict['name']= "TestSandrine" - timestamp = self.GetTimezone() + # reservations are performed in the oar server timebase, so : + # 1- we get the server time(in UTC tz )/server timezone + # 2- convert the server UTC time in its timezone + # 3- add a custom delay to this time + # 4- convert this time to a readable form and it for the reservation request. + server_timestamp,server_tz = self.GetTimezone() + s_tz=tz.gettz(server_tz) + UTC_zone = tz.gettz("UTC") + #weird... datetime.fromtimestamp should work since we do from datetime import datetime + utc_server= datetime.datetime.fromtimestamp(float(server_timestamp)+20,UTC_zone) + server_localtime=utc_server.astimezone(s_tz) + print>>sys.stderr, "\r\n \r\n AddSliceToNodes slice_name %s added_nodes %s username %s reqdict %s " %(slice_name,added_nodes,slice_user, reqdict) - readable_time = strftime(self.time_format, gmtime(float(timestamp))) - print >>sys.stderr," \r\n \r\n \t\t\t\t AVANT ParseTimezone readable_time %s timestanp %s " %(readable_time, timestamp ) - timestamp = timestamp+ 3620 #Add 3 min to server time - readable_time = strftime(self.time_format, gmtime(float(timestamp))) + readable_time = server_localtime.strftime(self.time_format) - print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time , timestamp) + print >>sys.stderr," \r\n \r\n \t\t\t\tAPRES ParseTimezone readable_time %s timestanp %s " %(readable_time ,server_timestamp) reqdict['reservation'] = readable_time # first step : start the OAR job @@ -707,8 +729,10 @@ class SlabDriver(Driver): #OAR = OARrestapi() answer = self.oar.POSTRequestToOARRestAPI('POST_job',reqdict,slice_user) print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid %s " %(answer) - self.db.update('slice',['oar_job_id'], [answer['id']], 'slice_hrn', slice_name) - + #self.db.update('slice',['oar_job_id'], [answer['id']], 'slice_hrn', slice_name) + + + self.db.update_job( slice_name, job_id = answer['id'] ) jobid=answer['id'] print>>sys.stderr, "\r\n \r\n AddSliceToNodes jobid %s added_nodes %s slice_user %s" %(jobid,added_nodes,slice_user) # second step : configure the experiment @@ -783,7 +807,7 @@ class SlabDriver(Driver): # get the sfa records #table = SfaTable() existing_records = {} - all_records = dbsession.query(RegRecord).all + all_records = dbsession.query(RegRecord).all() for record in all_records: existing_records[(record.type,record.pointer)] = record @@ -875,47 +899,54 @@ class SlabDriver(Driver): Given a SFA record, fill in the senslab specific and SFA specific fields in the record. """ - print >>sys.stderr, "\r\n \t\t BEFORE fill_record_info %s" %(records) + + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info 000000000 fill_record_info %s " %(records) if not isinstance(records, list): records = [records] - + parkour = records try: for record in parkour: if str(record['type']) == 'slice': - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info record %s" %(record) + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t \t record %s" %(record) #sfatable = SfaTable() - existing_records_by_id = {} - all_records = dbsession.query(RegRecord).all - for rec in all_records: - existing_records_by_id[rec.record_id] = rec - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info existing_records_by_id %s" %(existing_records_by_id) + #existing_records_by_id = {} + #all_records = dbsession.query(RegRecord).all() + #for rec in all_records: + #existing_records_by_id[rec.record_id] = rec + #print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t existing_records_by_id %s" %(existing_records_by_id[record['record_id']]) - recslice = self.db.find('slice',str(record['hrn'])) - if isinstance(recslice,list) and len(recslice) == 1: - recslice = recslice[0] - #recuser = sfatable.find( recslice['record_id_user'], ['hrn']) - recuser = existing_records_by_id[recslice['record_id_user']]['hrn'] - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info %s" %(recuser) + #recslice = self.db.find('slice',{'slice_hrn':str(record['hrn'])}) + #recslice = slab_dbsession.query(SliceSenslab).filter_by(slice_hrn = str(record['hrn'])).first() + recslice = self.GetSlices(slice_filter = str(record['hrn']), filter_type = 'slice_hrn') + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t HOY HOY reclise %s" %(recslice) + #if isinstance(recslice,list) and len(recslice) == 1: + #recslice = recslice[0] + + recuser = dbsession.query(RegRecord).filter_by(record_id = recslice['record_id_user']).first() + #existing_records_by_id[recslice['record_id_user']] + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info \t\t recuser %s" %(recuser) - if isinstance(recuser,list) and len(recuser) == 1: - recuser = recuser[0] - record.update({'PI':[recuser['hrn']], - 'researcher': [recuser['hrn']], + + record.update({'PI':[recuser.hrn], + 'researcher': [recuser.hrn], 'name':record['hrn'], 'oar_job_id':recslice['oar_job_id'], 'node_ids': [], 'person_ids':[recslice['record_id_user']]}) - elif str(record['type']) == 'user': - recslice = self.db.find('slice', record_filter={'record_id_user':record['record_id']}) - for rec in recslice: - rec.update({'type':'slice'}) - rec.update({'hrn':rec['slice_hrn'], 'record_id':rec['record_id_slice']}) - records.append(rec) - print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info ADDING SLIC EINFO recslice %s" %(recslice) + elif str(record['type']) == 'user': + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info USEEEEEEEEEERDESU!" + + rec = self.GetSlices(slice_filter = record['record_id'], filter_type = 'record_id_user') + #Append record in records list, therfore fetches user and slice info again(one more loop) + #Will update PIs and researcher for the slice + + rec.update({'type':'slice','hrn':rec['slice_hrn']}) + records.append(rec) + print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info ADDING SLIC EINFO rec %s" %(rec) print >>sys.stderr, "\r\n \t\t SLABDRIVER.PY fill_record_info OKrecords %s" %(records) except TypeError: @@ -927,6 +958,10 @@ class SlabDriver(Driver): #self.fill_record_sfa_info(records) #print >>sys.stderr, "\r\n \t\t after fill_record_sfa_info" + + + + #def update_membership_list(self, oldRecord, record, listName, addFunc, delFunc): ## get a list of the HRNs tht are members of the old and new records #if oldRecord: