from sfa.trust.certificate import Certificate
from sfa.storage.parameter import Parameter, Mixed
-from sfa.storage.record import SfaRecord
class GetSelfCredential(Method):
"""
self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, hrn, self.name))
- # authenticate the gid
+ ### authenticate the gid
+ # import here so we can load this module at build-time for sfa2wsdl
+ #from sfa.storage.alchemy import dbsession
+ from sfa.storage.persistentobjs import RegRecord
+
+ # xxx-local - the current code runs Resolve, which would forward to
+ # another registry if needed
+ # I wonder if this is truly the intention, or shouldn't we instead
+ # only look in the local db ?
records = self.api.manager.Resolve(self.api, xrn, type)
if not records:
raise RecordNotFound(hrn)
- record = SfaRecord(dict=records[0])
- gid = record.get_gid_object()
+
+ record_obj = RegRecord ('unknown')
+ record_obj.set_from_dict (records[0])
+ # xxx-local the local-only version would read
+ #record_obj = dbsession.query(RegRecord).filter_by(hrn=hrn).first()
+ #if not record_obj: raise RecordNotFound(hrn)
+ gid = record_obj.get_gid_object()
gid_str = gid.save_to_string(save_parents=True)
self.api.auth.authenticateGid(gid_str, [cert, type, hrn])
# authenticate the certificate against the gid in the db
from sfa.trust.credential import Credential
from sfa.storage.parameter import Parameter, Mixed
-from sfa.storage.record import SfaRecord
class List(Method):
"""
Parameter(type([str]), "List of credentials")),
]
- returns = [SfaRecord]
+ # xxx used to be [SfaRecord]
+ returns = [Parameter(dict, "registry record")]
def call(self, xrn, creds):
hrn, type = urn_to_hrn(xrn)
from sfa.trust.credential import Credential
from sfa.storage.parameter import Parameter, Mixed
-from sfa.storage.record import SfaRecord
class Resolve(Method):
"""
Parameter(list, "List of credentials)"))
]
- returns = [SfaRecord]
+ # xxx used to be [SfaRecord]
+ returns = [Parameter(dict, "registry record")]
def call(self, xrns, creds):
type = None
gid_urns = [gid.get_urn() for gid in gids]
hrns_expected = [gid.get_hrn() for gid in gids]
records_found = dbsession.query(RegRecord).\
- filter_by(pointer=-1)filter(RegRecord.hrn.in_(hrns_expected)).all()
+ filter_by(pointer=-1).filter(RegRecord.hrn.in_(hrns_expected)).all()
# remove old records
for record in records_found:
from testKeypair import *
# xxx broken-test
#from testHierarchy import *
-from testRecord import *
+from testStorage import *
if __name__ == "__main__":
unittest.main()
import unittest
from sfa.trust.gid import *
from sfa.util.config import *
-from sfa.storage.record import *
+from sfa.storage.persistentobjs import RegRecord
-class TestRecord(unittest.TestCase):
+class TestStorage(unittest.TestCase):
def setUp(self):
pass
def testCreate(self):
- r = SfaRecord()
+ r = RegRecord('user')
if __name__ == "__main__":
unittest.main()