from types import StringTypes, ListType
from optparse import OptionParser
import zlib
-import logging
+from sfa.util.sfalogging import sfa_logger,sfa_logger_goes_to_console
from sfa.trust.certificate import Keypair, Certificate
+ from sfa.trust.gid import GID
from sfa.trust.credential import Credential
from sfa.util.sfaticket import SfaTicket
from sfa.util.record import SfaRecord, UserRecord, SliceRecord, NodeRecord, AuthorityRecord
from sfa.util.xmlrpcprotocol import ServerException
import sfa.util.xmlrpcprotocol as xmlrpcprotocol
from sfa.util.config import Config
-from sfa.util.sfalogging import console_logger
-
AGGREGATE_PORT=12346
CM_PORT=12346
self.authority = None
self.options = None
self.hashrequest = False
- self.logger=console_logger
+ sfa_logger_goes_to_console()
+ self.logger=sfa_logger()
def create_cmd_parser(self, command, additional_cmdargs=None):
cmdargs = {"list": "name",
help="user name", metavar="HRN", default=None)
parser.add_option("-a", "--auth", dest="auth",
help="authority name", metavar="HRN", default=None)
- parser.add_option("-v", "--verbose",
- action="store_true", dest="verbose", default=False,
- help="verbose mode")
+ parser.add_option("-v", "--verbose", action="count", dest="verbose", default=0,
+ help="verbose mode - cumulative")
parser.add_option("-D", "--debug",
action="store_true", dest="debug", default=False,
help="Debug (xml-rpc) protocol messages")
- parser.add_option("-p", "--protocol",
- dest="protocol", default="xmlrpc",
+ parser.add_option("-p", "--protocol", dest="protocol", default="xmlrpc",
help="RPC protocol (xmlrpc or soap)")
parser.add_option("-k", "--hashrequest",
action="store_true", dest="hashrequest", default=False,
except:
self.logger.critical("Failed to read configuration file %s"%config_file)
self.logger.info("Make sure to remove the export clauses and to add quotes")
- if not self.options.verbose:
+ if self.options.verbose==0:
self.logger.info("Re-run with -v for more details")
else:
self.logger.log_exc("Could not read config file %s"%config_file)
self.key_file = key_file
self.cert_file = cert_file
self.cert = Certificate(filename=cert_file)
- # instruct xmlrpcprotocol to redirect logs to console_logger
- self.options.client=True
# Establish connection to server(s)
self.logger.info("Contacting Registry at: %s"%reg_url)
self.registry = xmlrpcprotocol.get_server(reg_url, key_file, cert_file, self.options)
if (os.path.isfile(file)):
credential = Credential(filename=file)
# make sure it isnt expired
- if not credential.get_lifetime or \
- datetime.datetime.today() < credential.get_lifetime():
+ if not credential.get_expiration or \
+ datetime.datetime.today() < credential.get_expiration():
return credential
return None
(options, args) = parser.parse_args()
self.options = options
- if self.options.verbose: self.logger.setLevel(logging.DEBUG)
+ self.logger.setLevelFromOptVerbose(self.options.verbose)
if options.hashrequest:
self.hashrequest = True
self.set_servers()
- self.logger.info("Command %s" % command)
- self.logger.info("dir %s, user %s, auth %s, reg %s, sm %s" % (
- self. options.sfi_dir, self.options.user,self.options.auth,
- self.options.registry, self.options.sm))
+ self.logger.info("Command=%s" % command)
if command in ("resources"):
self.logger.debug("resources cmd_opts %s" % cmd_opts.format)
elif command in ("list", "show", "remove"):
return
if __name__ == "__main__":
- Sfi().main()
+ Sfi().main()
-### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
-### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
-
import datetime
import time
import traceback
import sys
import re
from types import StringTypes
-from sfa.util.namespace import *
+
+from sfa.util.namespace import get_authority, urn_to_hrn, slicename_to_hrn, hrn_to_pl_slicename, hrn_to_urn
from sfa.util.rspec import *
from sfa.util.specdict import *
from sfa.util.faults import *
from sfa.plc.network import *
from sfa.plc.api import SfaAPI
from sfa.plc.slices import *
+ from dateutil.parser import parse
def __get_registry_objects(slice_xrn, creds, users):
reg_objects['site'] = site
slice = {}
- slice['expires'] = int(time.mktime(Credential(string=creds[0]).get_lifetime().timetuple()))
+ slice['expires'] = int(time.mktime(Credential(string=creds[0]).get_expiration().timetuple()))
slice['hrn'] = hrn
slice['name'] = hrn_to_pl_slicename(hrn)
slice['url'] = hrn
return version
def slice_status(api, slice_xrn, creds):
+ hrn, type = urn_to_hrn(slice_xrn)
+ # find out where this slice is currently running
+ api.logger.info(hrn)
+ slicename = hrn_to_pl_slicename(hrn)
+
+ slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
+ if len(slices) == 0:
+ raise Exception("Slice %s not found (used %s as slicename internally)" % slice_xrn, slicename)
+ slice = slices[0]
+
+ nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
+ ['hostname', 'boot_state', 'last_contact'])
+ api.logger.info(slice)
+ api.logger.info(nodes)
+
result = {}
result['geni_urn'] = slice_xrn
result['geni_status'] = 'unknown'
- result['geni_resources'] = {}
+ result['pl_login'] = slice['name']
+ result['pl_expires'] = slice['expires']
+
+ resources = []
+
+ for node in nodes:
+ res = {}
+ res['pl_hostname'] = node['hostname']
+ res['pl_boot_state'] = node['boot_state']
+ res['pl_last_contact'] = node['last_contact']
+ res['geni_urn'] = ''
+ res['geni_status'] = 'unknown'
+ res['geni_error'] = ''
+
+ resources.append(res)
+
+ result['geni_resources'] = resources
return result
def create_slice(api, slice_xrn, creds, rspec, users):
return True
- def renew_slice(api, xrn, creds, exipration_time):
+ def renew_slice(api, xrn, creds, expiration_time):
hrn, type = urn_to_hrn(xrn)
slicename = hrn_to_pl_slicename(hrn)
slices = api.plshell.GetSlices(api.plauth, {'name': slicename}, ['slice_id'])
if not slices:
raise RecordNotFound(hrn)
slice = slices[0]
- slice['expires'] = expiration_time
- api.plshell.UpdateSlice(api.plauth, slice['slice_id'], slice)
+ requested_time = parse(expiration_time)
+ record = {'expires': int(time.mktime(requested_time.timetuple()))}
+ api.plshell.UpdateSlice(api.plauth, slice['slice_id'], record)
return 1
def start_slice(api, xrn, creds):
from sfa.util.table import SfaTable
from sfa.util.record import SfaRecord
from sfa.trust.gid import GID
-from sfa.util.namespace import *
+from sfa.util.namespace import get_leaf, get_authority, hrn_to_urn, hrn_to_pl_login_base, urn_to_hrn
from sfa.trust.credential import *
from sfa.trust.certificate import *
from sfa.util.faults import *
#new_cred.set_pubkey(object_gid.get_pubkey())
new_cred.set_privileges(rights)
new_cred.get_privileges().delegate_all_privileges(True)
+ if 'expires' in record:
+ new_cred.set_expiration(int(record['expires']))
auth_kind = "authority,ma,sa"
# Parent not necessary, verify with certs
#new_cred.set_parent(api.auth.hierarchy.get_auth_cred(auth_hrn, kind=auth_kind))
### $Id: slices.py 15842 2009-11-22 09:56:13Z anil $
### $URL: https://svn.planet-lab.org/svn/sfa/trunk/sfa/plc/slices.py $
-import datetime
-import time
-import traceback
import sys
-from copy import deepcopy
-from lxml import etree
+import time,datetime
from StringIO import StringIO
from types import StringTypes
+from copy import deepcopy
+from copy import copy
+from lxml import etree
+
+from sfa.util.sfalogging import sfa_logger
from sfa.util.rspecHelper import merge_rspecs
-from sfa.util.namespace import *
+from sfa.util.namespace import urn_to_hrn, hrn_to_urn
from sfa.util.rspec import *
from sfa.util.specdict import *
from sfa.util.faults import *
from sfa.util.threadmanager import ThreadManager
import sfa.util.xmlrpcprotocol as xmlrpcprotocol
import sfa.plc.peers as peers
-from copy import copy
def get_version():
version = {}
return version
def slice_status(api, slice_xrn, creds ):
+ hrn, type = urn_to_hrn(slice_xrn)
+ # find out where this slice is currently running
+ api.logger.info(hrn)
+ slicename = hrn_to_pl_slicename(hrn)
+ api.logger.info("Checking status for %s" % slicename)
+ slices = api.plshell.GetSlices(api.plauth, [slicename], ['node_ids','person_ids','name','expires'])
+ if len(slices) == 0:
+ raise Exception("Slice %s not found (used %s as slicename internally)" % (slice_xrn, slicename))
+ slice = slices[0]
+
+ nodes = api.plshell.GetNodes(api.plauth, slice['node_ids'],
+ ['hostname', 'boot_state', 'last_contact'])
+ api.logger.info(slice)
+ api.logger.info(nodes)
+
result = {}
result['geni_urn'] = slice_xrn
result['geni_status'] = 'unknown'
- result['geni_resources'] = {}
+ result['pl_login'] = slice['name']
+ result['pl_expires'] = slice['expires']
+
+ resources = []
+
+ for node in nodes:
+ res = {}
+ res['pl_hostname'] = node['hostname']
+ res['pl_boot_state'] = node['boot_state']
+ res['pl_last_contact'] = node['last_contact']
+ res['geni_urn'] = ''
+ res['geni_status'] = 'unknown'
+ res['geni_error'] = ''
+
+ resources.append(res)
+
+ result['geni_resources'] = resources
return result
def create_slice(api, xrn, creds, rspec, users):
for request in root.iterfind("./request"):
rspec.append(deepcopy(request))
+ sfa_logger().debug('get_rspec: rspec=%r'%rspec)
rspec = etree.tostring(rspec, xml_declaration=True, pretty_print=True)
# cache the result
if api.cache and not xrn:
from sfa.util.faults import *
-from sfa.util.namespace import *
+from sfa.util.namespace import urn_to_hrn
from sfa.util.method import Method
from sfa.util.parameter import Parameter
from sfa.trust.credential import Credential
# Validate that the time does not go beyond the credential's expiration time
requested_time = parse(expiration_time)
- if requested_time > Credential(string=valid_creds[0]).get_lifetime():
+ if requested_time > Credential(string=valid_creds[0]).get_expiration():
raise InsufficientRights('SliverStatus: Credential expires before requested expiration time')
manager = self.api.get_interface_manager()
- manager.renew_slice(self.api, xrn, valid_creds, requested_time)
+ manager.renew_slice(self.api, slice_xrn, valid_creds, expiration_time)
return 1
# This module exports two classes: Keypair and Certificate.
##
#
-### $Id$
-### $URL$
-#
import os
import tempfile
try:
k.load_pubkey_from_file(ssl_fn)
except:
- sfa_logger.log_exc("convert_public_key caught exception")
+ sfa_logger().log_exc("convert_public_key caught exception")
k = None
# remove the temporary files
def save_to_file(self, filename):
open(filename, 'w').write(self.as_pem())
+ self.filename=filename
##
# Load the private key from a file. Implicity the private key includes the public key.
def load_from_file(self, filename):
buffer = open(filename, 'r').read()
self.load_from_string(buffer)
+ self.filename=filename
##
# Load the private key from a string. Implicitly the private key includes the public key.
# get the pyopenssl pkey from the pyopenssl x509
self.key = pyx509.get_pubkey()
+ self.filename=filename
##
# Load the public key from a string. No private key is loaded.
def get_openssl_pkey(self):
return self.key
-
##
# Given another Keypair object, return TRUE if the two keys are the same.
def compute_hash(self, value):
return self.sign_string(str(value))
+ # only informative
+ def get_filename(self):
+ return getattr(self,'filename',None)
+
+ def dump (self, *args, **kwargs):
+ print self.dump_string(*args, **kwargs)
+
+ def dump_string (self):
+ result=""
+ result += "KEYPAIR: pubkey=%40s..."%self.get_pubkey_string()
+ filename=self.get_filename()
+ if filename: result += "Filename %s\n"%filename
+ return result
+
##
# The certificate class implements a general purpose X509 certificate, making
# use of the appropriate pyOpenSSL or M2Crypto abstractions. It also adds
# load it (support for the ---parent--- tag as well as normal chained certs)
string = string.strip()
-
-
- if not string.startswith('-----'):
+
+ # If it's not in proper PEM format, wrap it
+ if string.count('-----BEGIN CERTIFICATE') == 0:
string = '-----BEGIN CERTIFICATE-----\n%s\n-----END CERTIFICATE-----' % string
+ # If there is a PEM cert in there, but there is some other text first
+ # such as the text of the certificate, skip the text
+ beg = string.find('-----BEGIN CERTIFICATE')
+ if beg > 0:
+ # skipping over non cert beginning
+ string = string[beg:]
+
parts = []
if string.count('-----BEGIN CERTIFICATE-----') > 1 and \
file = open(filename)
string = file.read()
self.load_from_string(string)
+ self.filename=filename
##
# Save the certificate to a string.
f = open(filename, 'w')
f.write(string)
f.close()
+ self.filename=filename
##
# Save the certificate to a random file in /tmp/
# Get an X509 extension from the certificate
def get_extension(self, name):
+
# pyOpenSSL does not have a way to get extensions
m2x509 = X509.load_cert_string(self.save_to_string())
value = m2x509.get_ext(name).get_value()
+
return value
##
# Sign the certificate using the issuer private key and issuer subject previous set with set_issuer().
def sign(self):
+ sfa_logger().debug('certificate.sign')
assert self.cert != None
assert self.issuerSubject != None
assert self.issuerKey != None
# @param cert certificate object
def is_signed_by_cert(self, cert):
+ print 'is_signed_by_cert'
k = cert.get_pubkey()
result = self.verify(k)
return result
# verify expiration time
if self.cert.has_expired():
+ sfa_logger().debug("verify_chain: NO our certificate has expired")
raise CertExpired(self.get_subject(), "client cert")
# if this cert is signed by a trusted_cert, then we are set
for trusted_cert in trusted_certs:
if self.is_signed_by_cert(trusted_cert):
- sfa_logger.debug("Cert %s signed by trusted cert %s", self.get_subject(), trusted_cert.get_subject())
# verify expiration of trusted_cert ?
if not trusted_cert.cert.has_expired():
+ sfa_logger().debug("verify_chain: YES cert %s signed by trusted cert %s"%(
+ self.get_subject(), trusted_cert.get_subject()))
return trusted_cert
else:
- sfa_logger.debug("Trusted cert %s is expired", trusted_cert.get_subject())
+ sfa_logger().debug("verify_chain: NO cert %s is signed by trusted_cert %s, but this is expired..."%(
+ self.get_subject(),trusted_cert.get_subject()))
+ raise CertExpired(self.get_subject(),"trusted_cert %s"%trusted_cert.get_subject())
# if there is no parent, then no way to verify the chain
if not self.parent:
- sfa_logger.debug("%r has no parent"%self.get_subject())
+ sfa_logger().debug("verify_chain: NO %s has no parent and is not in trusted roots"%self.get_subject())
raise CertMissingParent(self.get_subject())
# if it wasn't signed by the parent...
if not self.is_signed_by_cert(self.parent):
- sfa_logger.debug("%r is not signed by parent"%self.get_subject())
+ sfa_logger().debug("verify_chain: NO %s is not signed by parent"%self.get_subject())
return CertNotSignedByParent(self.get_subject())
# if the parent isn't verified...
+ sfa_logger().debug("verify_chain: .. %s, -> verifying parent %s",self.get_subject(),self.parent.get_subject())
self.parent.verify_chain(trusted_certs)
return
+
+ ### more introspection
+ def get_extensions(self):
+ # pyOpenSSL does not have a way to get extensions
+ triples=[]
+ m2x509 = X509.load_cert_string(self.save_to_string())
+ nb_extensions=m2x509.get_ext_count()
+ sfa_logger().debug("X509 had %d extensions"%nb_extensions)
+ for i in range(nb_extensions):
+ ext=m2x509.get_ext_at(i)
+ triples.append( (ext.get_name(), ext.get_value(), ext.get_critical(),) )
+ return triples
+
+ def get_data_names(self):
+ return self.data.keys()
+
+ def get_all_datas (self):
+ triples=self.get_extensions()
+ for name in self.get_data_names():
+ triples.append( (name,self.get_data(name),'data',) )
+ return triples
+
+ # only informative
+ def get_filename(self):
+ return getattr(self,'filename',None)
+
+ def dump (self, *args, **kwargs):
+ print self.dump_string(*args, **kwargs)
+
+ def dump_string (self,show_extensions=False):
+ result = ""
+ result += "CERTIFICATE for %s\n"%self.get_subject()
+ result += "Issued by %s\n"%self.get_issuer()
+ filename=self.get_filename()
+ if filename: result += "Filename %s\n"%filename
+ if show_extensions:
+ all_datas=self.get_all_datas()
+ result += " has %d extensions/data attached"%len(all_datas)
+ for (n,v,c) in all_datas:
+ if c=='data':
+ result += " data: %s=%s\n"%(n,v)
+ else:
+ result += " ext: %s (crit=%s)=<<<%s>>>\n"%(n,c,v)
+ return result
from sfa.trust.credential_legacy import CredentialLegacy
from sfa.trust.rights import Right, Rights
from sfa.trust.gid import GID
-from sfa.util.namespace import *
+from sfa.util.namespace import urn_to_hrn
- # Two years, in seconds
- DEFAULT_CREDENTIAL_LIFETIME = 60 * 60 * 24 * 365 * 2
+ # 2 weeks, in seconds
+ DEFAULT_CREDENTIAL_LIFETIME = 86400 * 14
# TODO:
str = string
elif filename:
str = file(filename).read()
+ self.filename=filename
if str.strip().startswith("-----"):
self.legacy = CredentialLegacy(False,string=str)
self.gidObject = legacy.get_gid_object()
lifetime = legacy.get_lifetime()
if not lifetime:
- # Default to two years
- self.set_lifetime(DEFAULT_CREDENTIAL_LIFETIME)
+ self.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=DEFAULT_CREDENTIAL_LIFETIME))
else:
- self.set_lifetime(int(lifetime))
+ self.set_expiration(int(lifetime))
self.lifeTime = legacy.get_lifetime()
self.set_privileges(legacy.get_privileges())
self.get_privileges().delegate_all_privileges(legacy.get_delegate())
self.decode()
return self.gidObject
+
+
##
- # set the lifetime of this credential
- #
- # @param lifetime lifetime of credential
- # . if lifeTime is a datetime object, it is used for the expiration time
- # . if lifeTime is an integer value, it is considered the number of seconds
- # remaining before expiration
-
- def set_lifetime(self, lifeTime):
- if isinstance(lifeTime, int):
- self.expiration = datetime.timedelta(seconds=lifeTime) + datetime.datetime.utcnow()
+ # Expiration: an absolute UTC time of expiration (as either an int or datetime)
+ #
+ def set_expiration(self, expiration):
+ if isinstance(expiration, int):
+ self.expiration = datetime.datetime.fromtimestamp(expiration)
else:
- self.expiration = lifeTime
+ self.expiration = expiration
+
##
# get the lifetime of the credential (in datetime format)
- def get_lifetime(self):
+ def get_expiration(self):
if not self.expiration:
self.decode()
return self.expiration
+ ##
+ # For legacy sake
+ def get_lifetime(self):
+ return self.get_expiration()
##
# set the privileges
append_sub(doc, cred, "target_urn", self.gidObject.get_urn())
append_sub(doc, cred, "uuid", "")
if not self.expiration:
- self.set_lifetime(DEFAULT_CREDENTIAL_LIFETIME)
+ self.set_expiration(datetime.datetime.utcnow() + datetime.timedelta(seconds=DEFAULT_CREDENTIAL_LIFETIME))
self.expiration = self.expiration.replace(microsecond=0)
append_sub(doc, cred, "expires", self.expiration.isoformat())
privileges = doc.createElement("privileges")
f = open(filename, "w")
f.write(self.xml)
f.close()
+ self.filename=filename
def save_to_string(self, save_parents=True):
if not self.xml:
self.set_refid(cred.getAttribute("xml:id"))
- self.set_lifetime(parse(getTextNode(cred, "expires")))
+ self.set_expiration(parse(getTextNode(cred, "expires")))
self.gidCaller = GID(string=getTextNode(cred, "owner_gid"))
self.gidObject = GID(string=getTextNode(cred, "target_gid"))
trusted_cert_objects.append(GID(filename=f))
ok_trusted_certs.append(f)
except Exception, exc:
- sfa_logger.error("Failed to load trusted cert from %s: %r", f, exc)
+ sfa_logger().error("Failed to load trusted cert from %s: %r", f, exc)
trusted_certs = ok_trusted_certs
# Use legacy verification if this is a legacy credential
return True
# make sure it is not expired
- if self.get_lifetime() < datetime.datetime.utcnow():
+ if self.get_expiration() < datetime.datetime.utcnow():
raise CredentialNotVerifiable("Credential expired at %s" % self.expiration.isoformat())
# Verify the signatures
# Maybe should be (hrn, type) = urn_to_hrn(root_cred_signer.get_urn())
root_cred_signer_type = root_cred_signer.get_type()
if (root_cred_signer_type == 'authority'):
- #sfa_logger.debug('Cred signer is an authority')
+ #sfa_logger().debug('Cred signer is an authority')
# signer is an authority, see if target is in authority's domain
hrn = root_cred_signer.get_hrn()
if root_target_gid.get_hrn().startswith(hrn):
raise CredentialNotVerifiable("Target gid not equal between parent and child")
# make sure my expiry time is <= my parent's
- if not parent_cred.get_lifetime() >= self.get_lifetime():
+ if not parent_cred.get_expiration() >= self.get_expiration():
raise CredentialNotVerifiable("Delegated credential expires after parent")
# make sure my signer is the parent's caller
dcred.set_gid_caller(delegee_gid)
dcred.set_gid_object(object_gid)
dcred.set_parent(self)
- dcred.set_lifetime(self.get_lifetime())
+ dcred.set_expiration(self.get_expiration())
dcred.set_privileges(self.get_privileges())
dcred.get_privileges().delegate_all_privileges(True)
#dcred.set_issuer_keys(keyfile, delegee_gidfile)
dcred.sign()
return dcred
- ##
- # Dump the contents of a credential to stdout in human-readable format
- #
- # @param dump_parents If true, also dump the parent certificates
- def dump(self, dump_parents=False):
- print "CREDENTIAL", self.get_subject()
+ # only informative
+ def get_filename(self):
+ return getattr(self,'filename',None)
- print " privs:", self.get_privileges().save_to_string()
-
- print " gidCaller:"
+ # @param dump_parents If true, also dump the parent certificates
+ def dump (self, *args, **kwargs):
+ print self.dump_string(*args, **kwargs)
+
+ def dump_string(self, dump_parents=False):
+ result=""
+ result += "CREDENTIAL %s\n" % self.get_subject()
+ filename=self.get_filename()
+ if filename: result += "Filename %s\n"%filename
+ result += " privs: %s\n" % self.get_privileges().save_to_string()
gidCaller = self.get_gid_caller()
if gidCaller:
- gidCaller.dump(8, dump_parents)
+ result += " gidCaller:\n"
+ result += gidCaller.dump_string(8, dump_parents)
- print " gidObject:"
gidObject = self.get_gid_object()
if gidObject:
- gidObject.dump(8, dump_parents)
-
+ result += " gidObject:\n"
+ result += gidObject.dump_string(8, dump_parents)
if self.parent and dump_parents:
- print "PARENT",
- self.parent.dump_parents()
+ result += "PARENT"
+ result += self.parent.dump_string(dump_parents)
+ return result
-### $Id$
-### $URL$
import re
from sfa.util.faults import *
URN_PREFIX = "urn:publicid:IDN"
+ def __get_hierarchy_delim_indexes(hrn):
+ # find all non escaped '.'
+ hierarchy_delim = '([a-zA-Z0-9][\.])'
+ parts = re.findall(hierarchy_delim, hrn)
+ # list of indexes for every hierarchy delimieter
+ indexes = []
+ for part in parts:
+ indexes.append(hrn.index(part) + 1)
+ return indexes
+
def get_leaf(hrn):
- parts = hrn.split(".")
- return ".".join(parts[-1:])
+ delim_indexes = __get_hierarchy_delim_indexes(hrn)
+ if not delim_indexes:
+ return hrn
+
+ last_delim_index = delim_indexes[-1:][0] + 1
+ return hrn[last_delim_index:]
def get_authority(xrn):
hrn, type = urn_to_hrn(xrn)
if type and type == 'authority':
return hrn
+
+ delim_indexes = __get_hierarchy_delim_indexes(hrn)
+ if not delim_indexes:
+ return ''
+ last_delim_index = delim_indexes[-1:][0]
+ return hrn[:last_delim_index]
- parts = hrn.split(".")
- return ".".join(parts[:-1])
-
def hrn_to_pl_slicename(hrn):
# remove any escaped no alpah numeric characters
#hrn = re.sub('\\\[^a-zA-Z0-9]', '', hrn)
- # remove any escaped '.' (i.e. '\.')
- hrn = hrn.replace('\\.', '')
+ hrn = re.sub(r'\\(.)', '', hrn)
parts = hrn.split(".")
return parts[-2] + "_" + parts[-1]
# assuming hrn is the hrn of an authority, return the plc authority name
def hrn_to_pl_authname(hrn):
- # remove any escaped '.' (i.e. '\.')
- hrn = hrn.replace('\\.', '')
+ # remove any escaped no alpah numeric characters
+ hrn = re.sub(r'\\(.)', '', hrn)
parts = hrn.split(".")
return parts[-1]
# assuming hrn is the hrn of an authority, return the plc login_base
def hrn_to_pl_login_base(hrn):
- # remove any escaped '.' (i.e. '\.')
- hrn = hrn.replace('\\.', '')
+ # remove any escaped no alpah numeric characters
+ hrn = re.sub(r'\\(.)', '', hrn)
return hrn_to_pl_authname(hrn)
def hostname_to_hrn(auth_hrn, login_base, hostname):
return urn, None
name = urn[len(URN_PREFIX):]
- hrn_parts = name.split("+")
- type = hrn_parts.pop(2)
+ urn_parts = name.split("+")
+ type = urn_parts.pop(2)
# Remove the authority name (e.g. '.sa')
if type == 'authority':
- hrn_parts = hrn_parts[:-1]
+ urn_parts = urn_parts[:-1]
# convert hrn_parts (list) into hrn (str) by doing the following
# 1. remove blank elements
- # 2. escape '.' # '.' exists in protogeni object names and are not delimiters
- # 3. replace ':' with '.' # ':' is the urn hierarchy delimiter
+ # 2. escape all non alpha numeric chars (excluding ':')
+ # 3. replace ':' with '.' (':' is the urn hierarchy delimiter)
# 4. join list elements using '.'
- hrn = '.'.join([part.replace('.', '\\.').replace(':', '.') for part in hrn_parts if part])
+ #hrn = '.'.join([part.replace('.', '\\.').replace(':', '.') for part in hrn_parts if part])
+ hrn = '.'.join([re.sub(r'([^a-zA-Z0-9\:])', r'\\\1', part).replace(':', '.') for part in urn_parts if part])
return str(hrn), str(type)
else:
authority = get_authority(hrn)
name = get_leaf(hrn)
-
- # We have to do the following conversion
- # '\\.' -> '.' # where '.' belongs in the urn name
- # '.' -> ':" # where ':' is the urn hierarchy delimiter
- # by doing the following
- # 1. split authority around '\\.'
- # 2. replace '.' with ':' in all parts
- # 3. join parts around '.'
- parts = authority.split('\\.')
- authority = '.'.join([part.replace('.', ':') for part in parts])
+
+ # convert from hierarchy delimiter from '.' to ':'
+ authority = re.sub(r'([a-zA-Z0-9])[\.]', r'\1:', authority)
+ # unescape escaped characters
+ authority = re.sub(r'\\(.)', r'\1', authority)
if type == None:
urn = "+".join(['',authority,name])