From: Mohamed Larabi Date: Mon, 25 Mar 2013 16:06:12 +0000 (+0100) Subject: Merge Master in geni-v3 conflict resolution X-Git-Tag: sfa-3.0-0 X-Git-Url: http://git.onelab.eu/?p=sfa.git;a=commitdiff_plain;h=1cc8e9613cab8b5b22478de369f259e591c54e6d;hp=f357d5c677573e29f260f82318c9450119474dce Merge Master in geni-v3 conflict resolution --- diff --git a/init.d/sfa b/init.d/sfa index 4815e372..08e6893e 100755 --- a/init.d/sfa +++ b/init.d/sfa @@ -166,87 +166,90 @@ function db_start () { # only if enabled [ "$SFA_DB_ENABLED" == 1 -o "$SFA_DB_ENABLED" == True ] || return - if [ ! -f /etc/myplc-release ] ; then - - ######## standalone deployment - no colocated myplc - - ######## sysconfig - # Set data directory and redirect startup output to /var/log/pgsql - mkdir -p $(dirname $postgresql_sysconfig) - # remove previous definitions - touch $postgresql_sysconfig - tmp=${postgresql_sysconfig}.new - ( egrep -v '^(PGDATA=|PGLOG=|PGPORT=)' $postgresql_sysconfig - echo "PGDATA=$PGDATA" - echo "PGLOG=/var/log/pgsql" - echo "PGPORT=$SFA_DB_PORT" - ) >> $tmp ; mv -f $tmp $postgresql_sysconfig - - ######## /var/lib/pgsql/data - # Fix ownership (rpm installation may have changed it) - chown -R -H postgres:postgres $(dirname $PGDATA) - - # PostgreSQL must be started at least once to bootstrap - # /var/lib/pgsql/data - if [ ! -f $postgresql_conf ] ; then - service postgresql initdb &> /dev/null || : - check - fi + #if ! rpm -q myplc >& /dev/null; then + + ######## standalone deployment - no colocated myplc + + ######## sysconfig + # Set data directory and redirect startup output to /var/log/pgsql + mkdir -p $(dirname $postgresql_sysconfig) + # remove previous definitions + touch $postgresql_sysconfig + tmp=${postgresql_sysconfig}.new + ( egrep -v '^(PGDATA=|PGLOG=|PGPORT=)' $postgresql_sysconfig + echo "PGDATA=$PGDATA" + echo "PGLOG=/var/log/pgsql" + echo "PGPORT=$SFA_DB_PORT" + ) >> $tmp ; mv -f $tmp $postgresql_sysconfig + + ######## /var/lib/pgsql/data + # Fix ownership (rpm installation may have changed it) + chown -R -H postgres:postgres $(dirname $PGDATA) + + # PostgreSQL must be started at least once to bootstrap + # /var/lib/pgsql/data + if [ ! -f $postgresql_conf ] ; then + service postgresql initdb &> /dev/null || : + check + fi + + ######## /var/lib/pgsql/data/postgresql.conf + registry_ip="" + foo=$(python -c "import socket; print socket.gethostbyname(\"$SFA_REGISTRY_HOST\")") && registry_ip="$foo" + # Enable DB server. drop Postgresql<=7.x + # PostgreSQL >=8.0 defines listen_addresses + # listen on a specific IP + localhost, more robust when run within a vserver + sed -i -e '/^listen_addresses/d' $postgresql_conf + if [ -z "$registry_ip" ] ; then + echo "listen_addresses = 'localhost'" >> $postgresql_conf + else + echo "listen_addresses = '${registry_ip},localhost'" >> $postgresql_conf + fi + # tweak timezone to be 'UTC' + sed -i -e '/^timezone=/d' $postgresql_conf + echo "timezone='UTC'" >> $postgresql_conf + + ######## /var/lib/pgsql/data/pg_hba.conf + # Disable access to all DBs from all hosts + sed -i -e '/^\(host\|local\)/d' $pghba_conf + + # Enable passwordless localhost access + echo "local all all trust" >>$pghba_conf + # grant access + ( + echo "host $SFA_DB_NAME $SFA_DB_USER 127.0.0.1/32 password" + [ -n "$registry_ip" ] && echo "host $SFA_DB_NAME $SFA_DB_USER ${registry_ip}/32 password" + ) >>$pghba_conf + + if [ "$SFA_GENERIC_FLAVOUR" == "openstack" ] ; then + [ -n "$registry_ip" ] && echo "host nova nova ${registry_ip}/32 password" >> $pghba_conf + fi + + # Fix ownership (sed -i changes it) + chown postgres:postgres $postgresql_conf $pghba_conf - ######## /var/lib/pgsql/data/postgresql.conf - registry_ip="" - foo=$(python -c "import socket; print socket.gethostbyname(\"$SFA_REGISTRY_HOST\")") && registry_ip="$foo" - # Enable DB server. drop Postgresql<=7.x - # PostgreSQL >=8.0 defines listen_addresses - # listen on a specific IP + localhost, more robust when run within a vserver - sed -i -e '/^listen_addresses/d' $postgresql_conf - if [ -z "$registry_ip" ] ; then - echo "listen_addresses = 'localhost'" >> $postgresql_conf - else - echo "listen_addresses = '${registry_ip},localhost'" >> $postgresql_conf - fi - # tweak timezone to be 'UTC' - sed -i -e '/^timezone=/d' $postgresql_conf - echo "timezone='UTC'" >> $postgresql_conf - - ######## /var/lib/pgsql/data/pg_hba.conf - # Disable access to all DBs from all hosts - sed -i -e '/^\(host\|local\)/d' $pg_hba_conf - - # Enable passwordless localhost access - echo "local all all trust" >>$pg_hba_conf - # grant access - ( - echo "host $SFA_DB_NAME $SFA_DB_USER 127.0.0.1/32 password" - [ -n "$registry_ip" ] && echo "host $SFA_DB_NAME $SFA_DB_USER ${registry_ip}/32 password" - ) >>$pg_hba_conf - - if [ "$SFA_GENERIC_FLAVOUR" == "openstack" ] ; then - [ -n "$registry_ip" ] && echo "host nova nova ${registry_ip}/32 password" >> $pg_hba_conf - fi - - # Fix ownership (sed -i changes it) - chown postgres:postgres $postgresql_conf $pg_hba_conf - - ######## compute a password if needed - if [ -z "$SFA_DB_PASSWORD" ] ; then - SFA_DB_PASSWORD=$(uuidgen) - sfa-config --category=sfa_db --variable=password --value="$SFA_DB_PASSWORD" --save=$sfa_local_config $sfa_local_config >& /dev/null - reload force - fi + ######## compute a password if needed + if [ -z "$SFA_DB_PASSWORD" ] ; then + SFA_DB_PASSWORD=$(uuidgen) + sfa-config --category=sfa_db --variable=password --value="$SFA_DB_PASSWORD" --save=$sfa_local_config $sfa_local_config >& /dev/null + reload force + fi - else + #else ######## we are colocated with a myplc - # no need to worry about the pgsql setup (see /etc/plc.d/postgresql) - # myplc enforces the password for its user - PLC_DB_USER=$(plc-config --category=plc_db --variable=user) - PLC_DB_PASSWORD=$(plc-config --category=plc_db --variable=password) - # store this as the SFA user/password - sfa-config --category=sfa_db --variable=user --value=$PLC_DB_USER --save=$sfa_local_config $sfa_local_config >& /dev/null - sfa-config --category=sfa_db --variable=password --value=$PLC_DB_PASSWORD --save=$sfa_local_config $sfa_local_config >& /dev/null - reload force - fi + # no need to worry about the pgsql setup (see /etc/plc.d/postgresql) + # myplc enforces the password for its user + + # The code below overwrites the site specific sfa db info with myplc db info. + # This is most likely unncecessary and wrong so I'm commenting it out for now. + # PLC_DB_USER=$(plc-config --category=plc_db --variable=user) + # PLC_DB_PASSWORD=$(plc-config --category=plc_db --variable=password) + # store this as the SFA user/password + # sfa-config --category=sfa_db --variable=user --value=$PLC_DB_USER --save=$sfa_local_config $sfa_local_config >& /dev/null + # sfa-config --category=sfa_db --variable=password --value=$PLC_DB_PASSWORD --save=$sfa_local_config $sfa_local_config >& /dev/null + # reload force + #fi ######## Start up the server # not too nice, but.. when co-located with myplc we'll let it start/stop postgresql diff --git a/setup.py b/setup.py index 28f7ec96..8ce7dbae 100755 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ packages = [ 'sfa/rspecs', 'sfa/rspecs/elements', 'sfa/rspecs/elements/versions', + 'sfa/rspecs/elements/v3', 'sfa/rspecs/versions', 'sfa/client', 'sfa/planetlab', diff --git a/sfa/client/client_helper.py b/sfa/client/client_helper.py index 2117b146..48160a49 100644 --- a/sfa/client/client_helper.py +++ b/sfa/client/client_helper.py @@ -19,9 +19,9 @@ def pg_users_arg(records): for record in records: if record['type'] != 'user': continue - user = {'urn': record['reg-urn'], - 'keys': record['reg-keys'], - } + user = {'urn': record['geni_urn'], + 'keys': record['keys'], + 'email': record['email']} users.append(user) return users diff --git a/sfa/client/sfaadmin.py b/sfa/client/sfaadmin.py index df4629fe..662592ae 100755 --- a/sfa/client/sfaadmin.py +++ b/sfa/client/sfaadmin.py @@ -354,66 +354,81 @@ class AggregateCommands(Commands): version = self.api.manager.GetVersion(self.api, {}) pprinter.pprint(version) - def slices(self): - """List the running slices at this Aggregate""" - print self.api.manager.ListSlices(self.api, [], {}) @args('-x', '--xrn', dest='xrn', metavar='', help='object hrn/urn (mandatory)') def status(self, xrn): """Display the status of a slice or slivers""" urn = Xrn(xrn, 'slice').get_urn() - status = self.api.manager.SliverStatus(self.api, urn, [], {}) + status = self.api.manager.SliverStatus(self.api, [urn], {}, {}) pprinter.pprint(status) - @args('-x', '--xrn', dest='xrn', metavar='', help='object hrn/urn', default=None) @args('-r', '--rspec-version', dest='rspec_version', metavar='', default='GENI', help='version/format of the resulting rspec response') - def resources(self, xrn=None, rspec_version='GENI'): - """Display the available resources at an aggregate -or the resources allocated by a slice""" + def resources(self, rspec_version='GENI'): + """Display the available resources at an aggregate""" options = {'geni_rspec_version': rspec_version} if xrn: options['geni_slice_urn'] = Xrn(xrn, 'slice').get_urn() - print options - resources = self.api.manager.ListResources(self.api, [], options) + resources = self.api.manager.ListResources(self.api, {}, options) print resources - + + @args('-x', '--xrn', dest='xrn', metavar='', help='object hrn/urn', default=None) + @args('-r', '--rspec-version', dest='rspec_version', metavar='', + default='GENI', help='version/format of the resulting rspec response') + def describe(self, xrn, rspec_version='GENI'): + """Display the resources allocated by a slice or slivers""" + urn = Xrn(xrn, 'slice').get_urn() + options = {'geni_rspec_version': rspec_version} + status = self.api.manager.Describe(self.api, {}, [urn], options) + print status + @args('-x', '--xrn', dest='xrn', metavar='', help='slice hrn/urn (mandatory)') @args('-r', '--rspec', dest='rspec', metavar='', help='rspec file (mandatory)') @args('-u', '--user', dest='user', metavar='', help='hrn/urn of slice user (mandatory)') @args('-k', '--key', dest='key', metavar='', help="path to user's public key file (mandatory)") - def create(self, xrn, rspec, user, key): + def allocate(self, xrn, rspec, user, key): """Allocate slivers""" xrn = Xrn(xrn, 'slice') - slice_urn=xrn.get_urn() + urn=xrn.get_urn() rspec_string = open(rspec).read() user_xrn = Xrn(user, 'user') user_urn = user_xrn.get_urn() user_key_string = open(key).read() users = [{'urn': user_urn, 'keys': [user_key_string]}] - options={} - self.api.manager.CreateSliver(self, slice_urn, [], rspec_string, users, options) + options={'geni_users': users} + status = self.api.manager.Allocate(self.api, urn, {}, rspec_string, options) + print status + + @args('-x', '--xrn', dest='xrn', metavar='', help='slice hrn/urn (mandatory)') + def provision(self, xrns): + status = self.api.manager.Provision(self.api, [xrns], {}, {}) + print status @args('-x', '--xrn', dest='xrn', metavar='', help='slice hrn/urn (mandatory)') def delete(self, xrn): """Delete slivers""" - self.api.manager.DeleteSliver(self.api, xrn, [], {}) - + result = self.api.manager.DeleteSliver(self.api, [xrn], {}, {}) + print result + @args('-x', '--xrn', dest='xrn', metavar='', help='slice hrn/urn (mandatory)') - def start(self, xrn): + @args('-e', '--expiration', dest='expiration', metavar='', help='Expiration date (mandatory)') + def renew(self, xrn, expiration): """Start slivers""" - self.api.manager.start_slice(self.api, xrn, []) + result = self.api.manager.start_slice(self.api, xrn, {}, expiration, {}) + print result @args('-x', '--xrn', dest='xrn', metavar='', help='slice hrn/urn (mandatory)') - def stop(self, xrn): + def shutdown(self, xrn): """Stop slivers""" - self.api.manager.stop_slice(self.api, xrn, []) + result = self.api.manager.Shutdown(self.api, xrn, {}, {}) + print result @args('-x', '--xrn', dest='xrn', metavar='', help='slice hrn/urn (mandatory)') - def reset(self, xrn): + @args('-a', '--action', dest='action', metavar='', help='Action name (mandatory)') + def operation(self, xrn, action): """Reset sliver""" - self.api.manager.reset_slice(self.api, xrn) - + result = self.api.manager.PerformOperationalAction(self.api, [xrn], {}, action, {}) + print result # @args('-x', '--xrn', dest='xrn', metavar='', help='object hrn/urn', default=None) # @args('-r', '--rspec', dest='rspec', metavar='', help='request rspec', default=None) diff --git a/sfa/client/sfi.py b/sfa/client/sfi.py index 15c83cd6..53d655b8 100644 --- a/sfa/client/sfi.py +++ b/sfa/client/sfi.py @@ -85,14 +85,23 @@ def display_record(record, dump=False): return -def credential_printable (credential_string): - credential=Credential(string=credential_string) +def filter_records(type, records): + filtered_records = [] + for record in records: + if (record['type'] == type) or (type == "all"): + filtered_records.append(record) + return filtered_records + + +def credential_printable (cred): + credential=Credential(cred=cred) result="" result += credential.get_summary_tostring() result += "\n" rights = credential.get_privileges() - result += "rights=%s"%rights - result += "\n" + result += "type=%s\n" % credential.type + result += "version=%s\n" % credential.version + result += "rights=%s\n"%rights return result def show_credentials (cred_s): @@ -253,14 +262,14 @@ class Sfi: ("add", "[record]"), ("update", "[record]"), ("remove", "name"), - ("slices", ""), - ("resources", "[slice_hrn]"), + ("resources", ""), + ("describe", "slice_hrn"), ("create", "slice_hrn rspec"), + ("allocate", "slice_hrn rspec"), + ("provision", "slice_hrn"), + ("action", "slice_hrn action"), ("delete", "slice_hrn"), ("status", "slice_hrn"), - ("start", "slice_hrn"), - ("stop", "slice_hrn"), - ("reset", "slice_hrn"), ("renew", "slice_hrn time"), ("shutdown", "slice_hrn"), ("get_ticket", "slice_hrn rspec"), @@ -323,8 +332,16 @@ class Sfi: action="callback", callback=optparse_dictvalue_callback, nargs=1, help="set extra/testbed-dependent flags, e.g. --extra enabled=true") + # user specifies remote aggregate/sm/component + if command in ("resources", "describe", "allocate", "provision", "create", "delete", "allocate", "provision", + "action", "shutdown", "get_ticket", "renew", "status"): + parser.add_option("-d", "--delegate", dest="delegate", default=None, + action="store_true", + help="Include a credential delegated to the user's root"+\ + "authority in set of credentials for this call") + # show_credential option - if command in ("list","resources","create","add","update","remove","slices","delete","status","renew"): + if command in ("list","resources", "describe", "provision", "allocate", "create","add","update","remove","slices","delete","status","renew"): parser.add_option("-C","--credential",dest='show_credential',action='store_true',default=False, help="show credential(s) used in human-readable form") # registy filter option @@ -336,7 +353,7 @@ class Sfi: if command in ("show"): parser.add_option("-k","--key",dest="keys",action="append",default=[], help="specify specific keys to be displayed from record") - if command in ("resources"): + if command in ("resources", "describe"): # rspec version parser.add_option("-r", "--rspec-version", dest="rspec_version", default="SFA 1", help="schema type and version of resulting RSpec") @@ -358,7 +375,7 @@ class Sfi: # 'create' does return the new rspec, makes sense to save that too - if command in ("resources", "show", "list", "gid", 'create'): + if command in ("resources", "describe", "allocate", "provision", "show", "list", "gid", 'create'): parser.add_option("-o", "--output", dest="file", help="output XML to file", metavar="FILE", default=None) @@ -619,6 +636,9 @@ use this if you mean an authority instead""") # extract what's needed self.private_key = client_bootstrap.private_key() self.my_credential_string = client_bootstrap.my_credential_string () + self.my_credential = {'geni_type': 'geni_sfa', + 'geni_version': '3.0', + 'geni_value': self.my_credential_string} self.my_gid = client_bootstrap.my_gid () self.client_bootstrap = client_bootstrap @@ -635,6 +655,32 @@ use this if you mean an authority instead""") def slice_credential_string(self, name): return self.client_bootstrap.slice_credential_string (name) + def slice_credential(self, name): + return {'geni_type': 'geni_sfa', + 'geni_version': '3.0', + 'geni_value': self.slice_credential_string(name)} + + # xxx should be supported by sfaclientbootstrap as well + def delegate_cred(self, object_cred, hrn, type='authority'): + # the gid and hrn of the object we are delegating + if isinstance(object_cred, str): + object_cred = Credential(string=object_cred) + object_gid = object_cred.get_gid_object() + object_hrn = object_gid.get_hrn() + + if not object_cred.get_privileges().get_all_delegate(): + self.logger.error("Object credential %s does not have delegate bit set"%object_hrn) + return + + # the delegating user's gid + caller_gidfile = self.my_gid() + + # the gid of the user who will be delegated to + delegee_gid = self.client_bootstrap.gid(hrn,type) + delegee_hrn = delegee_gid.get_hrn() + dcred = object_cred.delegate(delegee_gid, self.private_key, caller_gidfile) + return dcred.save_to_string(save_parents=True) + # # Management of the servers # @@ -943,7 +989,7 @@ or version information about sfi itself creds = [self.my_credential_string] # options and call_id when supported api_options = {} - api_options['call_id']=unique_call_id() + api_options['call_id']=unique_call_id() if options.show_credential: show_credentials(creds) result = server.ListSlices(creds, *self.ois(server,api_options)) @@ -957,19 +1003,15 @@ or version information about sfi itself # show rspec for named slice def resources(self, options, args): """ - with no arg, discover available resources, (ListResources) + discover available resources or with an slice hrn, shows currently provisioned resources """ server = self.sliceapi() # set creds - creds = [] - if args: - the_credential=self.slice_credential_string(args[0]) - creds.append(the_credential) - else: - the_credential=self.my_credential_string - creds.append(the_credential) + creds = [self.my_credential] + if options.delegate: + creds.append(self.delegate_cred(cred, get_authority(self.authority))) if options.show_credential: show_credentials(creds) @@ -980,9 +1022,6 @@ or with an slice hrn, shows currently provisioned resources api_options ['call_id'] = unique_call_id() # ask for cached value if available api_options ['cached'] = True - if args: - hrn = args[0] - api_options['geni_slice_urn'] = hrn_to_urn(hrn, 'slice') if options.info: api_options['info'] = options.info if options.list_leases: @@ -1013,6 +1052,45 @@ or with an slice hrn, shows currently provisioned resources return + def describe(self, options, args): + """ + Shows currently provisioned resources. + """ + server = self.sliceapi() + + # set creds + creds = [self.slice_credential(args[0])] + if options.delegate: + creds.append(self.delegate_cred(cred, get_authority(self.authority))) + if options.show_credential: + show_credentials(creds) + + api_options = {'call_id': unique_call_id(), + 'cached': True, + 'info': options.info, + 'list_leases': options.list_leases, + 'geni_rspec_version': {'type': 'geni', 'version': '3.0'}, + } + if options.rspec_version: + version_manager = VersionManager() + server_version = self.get_cached_server_version(server) + if 'sfa' in server_version: + # just request the version the client wants + api_options['geni_rspec_version'] = version_manager.get_version(options.rspec_version).to_dict() + else: + api_options['geni_rspec_version'] = {'type': 'geni', 'version': '3.0'} + urn = Xrn(args[0], type='slice').get_urn() + result = server.Describe([urn], creds, api_options) + value = ReturnValue.get_value(result) + if self.options.raw: + save_raw_to_file(result, self.options.raw, self.options.rawformat, self.options.rawbanner) + if options.file is not None: + save_rspec_to_file(value, options.file) + if (self.options.raw is None) and (options.file is None): + display_rspec(value, options.format) + + return + def create(self, options, args): """ create or update named slice with given rspec @@ -1074,8 +1152,6 @@ or with an slice hrn, shows currently provisioned resources # do not append users, keys, or slice tags. Anything # not contained in this request will be removed from the slice - # CreateSliver has supported the options argument for a while now so it should - # be safe to assume this server support it api_options = {} api_options ['append'] = False api_options ['call_id'] = unique_call_id() @@ -1101,7 +1177,7 @@ or with an slice hrn, shows currently provisioned resources slice_urn = hrn_to_urn(slice_hrn, 'slice') # creds - slice_cred = self.slice_credential_string(slice_hrn) + slice_cred = self.slice_credential(slice_hrn) creds = [slice_cred] # options and call_id when supported @@ -1109,14 +1185,108 @@ or with an slice hrn, shows currently provisioned resources api_options ['call_id'] = unique_call_id() if options.show_credential: show_credentials(creds) - result = server.DeleteSliver(slice_urn, creds, *self.ois(server, api_options ) ) + result = server.Delete([slice_urn], creds, *self.ois(server, api_options ) ) value = ReturnValue.get_value(result) if self.options.raw: save_raw_to_file(result, self.options.raw, self.options.rawformat, self.options.rawbanner) else: print value - return value - + return value + + def allocate(self, options, args): + server = self.sliceapi() + server_version = self.get_cached_server_version(server) + slice_hrn = args[0] + slice_urn = Xrn(slice_hrn, type='slice').get_urn() + + # credentials + creds = [self.slice_credential(slice_hrn)] + + delegated_cred = None + if server_version.get('interface') == 'slicemgr': + # delegate our cred to the slice manager + # do not delegate cred to slicemgr...not working at the moment + pass + #if server_version.get('hrn'): + # delegated_cred = self.delegate_cred(slice_cred, server_version['hrn']) + #elif server_version.get('urn'): + # delegated_cred = self.delegate_cred(slice_cred, urn_to_hrn(server_version['urn'])) + + if options.show_credential: + show_credentials(creds) + + # rspec + rspec_file = self.get_rspec_file(args[1]) + rspec = open(rspec_file).read() + api_options = {} + api_options ['call_id'] = unique_call_id() + result = server.Allocate(slice_urn, creds, rspec, api_options) + value = ReturnValue.get_value(result) + if self.options.raw: + save_raw_to_file(result, self.options.raw, self.options.rawformat, self.options.rawbanner) + if options.file is not None: + save_rspec_to_file (value, options.file) + if (self.options.raw is None) and (options.file is None): + print value + + return value + + + def provision(self, options, args): + server = self.sliceapi() + server_version = self.get_cached_server_version(server) + slice_hrn = args[0] + slice_urn = Xrn(slice_hrn, type='slice').get_urn() + + # credentials + creds = [self.slice_credential(slice_hrn)] + delegated_cred = None + if server_version.get('interface') == 'slicemgr': + # delegate our cred to the slice manager + # do not delegate cred to slicemgr...not working at the moment + pass + #if server_version.get('hrn'): + # delegated_cred = self.delegate_cred(slice_cred, server_version['hrn']) + #elif server_version.get('urn'): + # delegated_cred = self.delegate_cred(slice_cred, urn_to_hrn(server_version['urn'])) + + if options.show_credential: + show_credentials(creds) + + api_options = {} + api_options ['call_id'] = unique_call_id() + + # set the requtested rspec version + version_manager = VersionManager() + rspec_version = version_manager._get_version('geni', '3.0').to_dict() + api_options['geni_rspec_version'] = rspec_version + + # users + # need to pass along user keys to the aggregate. + # users = [ + # { urn: urn:publicid:IDN+emulab.net+user+alice + # keys: [, ] + # }] + users = [] + slice_records = self.registry().Resolve(slice_urn, [self.my_credential_string]) + if slice_records and 'researcher' in slice_records[0] and slice_records[0]['researcher']!=[]: + slice_record = slice_records[0] + user_hrns = slice_record['researcher'] + user_urns = [hrn_to_urn(hrn, 'user') for hrn in user_hrns] + user_records = self.registry().Resolve(user_urns, [self.my_credential_string]) + users = pg_users_arg(user_records) + + api_options['geni_users'] = users + result = server.Provision([slice_urn], creds, api_options) + value = ReturnValue.get_value(result) + if self.options.raw: + save_raw_to_file(result, self.options.raw, self.options.rawformat, self.options.rawbanner) + if options.file is not None: + save_rspec_to_file (value, options.file) + if (self.options.raw is None) and (options.file is None): + print value + return value + def status(self, options, args): """ retrieve slice status (SliverStatus) @@ -1128,7 +1298,7 @@ or with an slice hrn, shows currently provisioned resources slice_urn = hrn_to_urn(slice_hrn, 'slice') # creds - slice_cred = self.slice_credential_string(slice_hrn) + slice_cred = self.slice_credential(slice_hrn) creds = [slice_cred] # options and call_id when supported @@ -1136,7 +1306,7 @@ or with an slice hrn, shows currently provisioned resources api_options['call_id']=unique_call_id() if options.show_credential: show_credentials(creds) - result = server.SliverStatus(slice_urn, creds, *self.ois(server,api_options)) + result = server.Status([slice_urn], creds, *self.ois(server,api_options)) value = ReturnValue.get_value(result) if self.options.raw: save_raw_to_file(result, self.options.raw, self.options.rawformat, self.options.rawbanner) @@ -1185,18 +1355,24 @@ or with an slice hrn, shows currently provisioned resources return value # reset named slice - def reset(self, options, args): + def action(self, options, args): """ - reset named slice (reset_slice) + Perform the named operational action on the named slivers """ server = self.sliceapi() + api_options = {} # slice urn slice_hrn = args[0] - slice_urn = hrn_to_urn(slice_hrn, 'slice') + action = args[1] + slice_urn = Xrn(slice_hrn, type='slice').get_urn() # cred - slice_cred = self.slice_credential_string(args[0]) + slice_cred = self.slice_credential(args[0]) creds = [slice_cred] - result = server.reset_slice(creds, slice_urn) + if options.delegate: + delegated_cred = self.delegate_cred(slice_cred, get_authority(self.authority)) + creds.append(delegated_cred) + + result = server.PerformOperationalAction([slice_urn], creds, action , api_options) value = ReturnValue.get_value(result) if self.options.raw: save_raw_to_file(result, self.options.raw, self.options.rawformat, self.options.rawbanner) @@ -1217,14 +1393,14 @@ or with an slice hrn, shows currently provisioned resources slice_urn = hrn_to_urn(slice_hrn, 'slice') # time: don't try to be smart on the time format, server-side will # creds - slice_cred = self.slice_credential_string(args[0]) + slice_cred = self.slice_credential(args[0]) creds = [slice_cred] # options and call_id when supported api_options = {} - api_options['call_id']=unique_call_id() + api_options['call_id']=unique_call_id() if options.show_credential: show_credentials(creds) - result = server.RenewSliver(slice_urn, creds, input_time, *self.ois(server,api_options)) + result = server.Renew([slice_urn], creds, input_time, *self.ois(server,api_options)) value = ReturnValue.get_value(result) if self.options.raw: save_raw_to_file(result, self.options.raw, self.options.rawformat, self.options.rawbanner) @@ -1242,7 +1418,7 @@ or with an slice hrn, shows currently provisioned resources slice_hrn = args[0] slice_urn = hrn_to_urn(slice_hrn, 'slice') # creds - slice_cred = self.slice_credential_string(slice_hrn) + slice_cred = self.slice_credential(slice_hrn) creds = [slice_cred] result = server.Shutdown(slice_urn, creds) value = ReturnValue.get_value(result) @@ -1269,7 +1445,7 @@ or with an slice hrn, shows currently provisioned resources rspec = open(rspec_file).read() # options and call_id when supported api_options = {} - api_options['call_id']=unique_call_id() + api_options['call_id']=unique_call_id() # get ticket at the server ticket_string = server.GetTicket(slice_urn, creds, rspec, *self.ois(server,api_options)) # save diff --git a/sfa/importer/openstackimporter.py b/sfa/importer/openstackimporter.py index 1f2af928..0cf729c3 100644 --- a/sfa/importer/openstackimporter.py +++ b/sfa/importer/openstackimporter.py @@ -73,7 +73,7 @@ class OpenstackImporter: else: self.logger.warn("OpenstackImporter: person %s does not have a PL public key"%hrn) pkey = Keypair(create=True) - user_gid = self.auth_hierarchy.create_gid(urn, create_uuid(), pkey) + user_gid = self.auth_hierarchy.create_gid(urn, create_uuid(), pkey, email=user.email) user_record = RegUser () user_record.type='user' user_record.hrn=hrn diff --git a/sfa/importer/plimporter.py b/sfa/importer/plimporter.py index 7994c8c9..8d197b63 100644 --- a/sfa/importer/plimporter.py +++ b/sfa/importer/plimporter.py @@ -254,7 +254,7 @@ class PlImporter: pass node_record.stale=False - site_pis=[] + site_pis=set() # import persons for person_id in site['person_ids']: proceed=False @@ -362,17 +362,12 @@ class PlImporter: # this is valid for all sites she is in.. # PI is coded with role_id==20 if 20 in person['role_ids']: - site_pis.append (user_record) + site_pis.add (user_record) except: self.logger.log_exc("PlImporter: failed to import person %d %s"%(person['person_id'],person['email'])) # maintain the list of PIs for a given site - # for the record, Jordan had proposed the following addition as a welcome hotfix to a previous version: - # site_pis = list(set(site_pis)) - # this was likely due to a bug in the above logic, that had to do with disabled persons - # being improperly handled, and where the whole loop on persons - # could be performed twice with the same person... - # so hopefully we do not need to eliminate duplicates explicitly here anymore + site_record.reg_pis = list(site_pis) site_record.reg_pis = site_pis dbsession.commit() @@ -400,10 +395,13 @@ class PlImporter: except: self.logger.log_exc("PlImporter: failed to import slice %s (%s)"%(slice_hrn,slice['name'])) else: + # update the pointer if it has changed + if slice_id != slice_record.pointer: + self.logger.info("updating record (slice) pointer") + slice_record.pointer = slice_id + dbsession.commit() # xxx update the record ... - # given that we record the current set of users anyways, there does not seem to be much left to do here - # self.logger.warning ("Slice update not yet implemented on slice %s (%s)"%(slice_hrn,slice['name'])) - pass + #self.logger.warning ("Slice update not yet implemented") # record current users affiliated with the slice slice_record.reg_researchers = \ [ self.locate_by_type_pointer ('user',user_id) for user_id in slice['person_ids'] ] diff --git a/sfa/managers/aggregate_manager.py b/sfa/managers/aggregate_manager.py index fe6b07b4..0ba512e8 100644 --- a/sfa/managers/aggregate_manager.py +++ b/sfa/managers/aggregate_manager.py @@ -1,120 +1,158 @@ import socket +from sfa.rspecs.version_manager import VersionManager from sfa.util.version import version_core from sfa.util.xrn import Xrn from sfa.util.callids import Callids +from sfa.util.sfalogging import logger +from sfa.util.faults import SfaInvalidArgument, InvalidRSpecVersion from sfa.server.api_versions import ApiVersions + class AggregateManager: def __init__ (self, config): pass # essentially a union of the core version, the generic version (this code) and # whatever the driver needs to expose + + def rspec_versions(self): + version_manager = VersionManager() + ad_rspec_versions = [] + request_rspec_versions = [] + for rspec_version in version_manager.versions: + if rspec_version.content_type in ['*', 'ad']: + ad_rspec_versions.append(rspec_version.to_dict()) + if rspec_version.content_type in ['*', 'request']: + request_rspec_versions.append(rspec_version.to_dict()) + return { + 'geni_request_rspec_versions': request_rspec_versions, + 'geni_ad_rspec_versions': ad_rspec_versions, + } + + def get_rspec_version_string(self, rspec_version, options={}): + version_string = "rspec_%s" % (rspec_version) + + #panos adding the info option to the caching key (can be improved) + if options.get('info'): + version_string = version_string + "_"+options.get('info', 'default') + + # Adding the list_leases option to the caching key + if options.get('list_leases'): + version_string = version_string + "_"+options.get('list_leases', 'default') + + # Adding geni_available to caching key + if options.get('geni_available'): + version_string = version_string + "_" + str(options.get('geni_available')) + + return version_string + def GetVersion(self, api, options): - xrn=Xrn(api.hrn) + xrn=Xrn(api.hrn, type='authority') version = version_core() + cred_types = [{'geni_type': 'geni_sfa', 'geni_version': str(i)} for i in range(4)[-2:]] geni_api_versions = ApiVersions().get_versions() - geni_api_versions['2'] = 'http://%s:%s' % (api.config.SFA_AGGREGATE_HOST, api.config.SFA_AGGREGATE_PORT) + geni_api_versions['3'] = 'http://%s:%s' % (api.config.sfa_aggregate_host, api.config.sfa_aggregate_port) version_generic = { + 'testbed': self.driver.testbed_name(), 'interface':'aggregate', - 'sfa': 2, - 'geni_api': 2, - 'geni_api_versions': geni_api_versions, 'hrn':xrn.get_hrn(), 'urn':xrn.get_urn(), - } + 'geni_api': 3, + 'geni_api_versions': geni_api_versions, + 'geni_single_allocation': 0, # Accept operations that act on as subset of slivers in a given state. + 'geni_allocate': 'geni_many',# Multiple slivers can exist and be incrementally added, including those which connect or overlap in some way. + 'geni_credential_types': cred_types, + } version.update(version_generic) + version.update(self.rspec_versions()) testbed_version = self.driver.aggregate_version() version.update(testbed_version) return version - def ListSlices(self, api, creds, options): + def ListResources(self, api, creds, options): call_id = options.get('call_id') - if Callids().already_handled(call_id): return [] - return self.driver.list_slices (creds, options) + if Callids().already_handled(call_id): return "" - def ListResources(self, api, creds, options): + # get the rspec's return format from options + version_manager = VersionManager() + rspec_version = version_manager.get_version(options.get('geni_rspec_version')) + version_string = self.get_rspec_version_string(rspec_version, options) + + # look in cache first + cached_requested = options.get('cached', True) + if cached_requested and self.driver.cache: + rspec = self.driver.cache.get(version_string) + if rspec: + logger.debug("%s.ListResources returning cached advertisement" % (self.driver.__module__)) + return rspec + + rspec = self.driver.list_resources (rspec_version, options) + if self.driver.cache: + logger.debug("%s.ListResources stores advertisement in cache" % (self.driver.__module__)) + self.driver.cache.add(version_string, rspec) + return rspec + + def Describe(self, api, creds, urns, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return "" - # get slice's hrn from options - slice_xrn = options.get('geni_slice_urn', None) - # pass None if no slice is specified - if not slice_xrn: - slice_hrn, slice_urn = None, None - else: - xrn = Xrn(slice_xrn) - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() - return self.driver.list_resources (slice_urn, slice_hrn, creds, options) + version_manager = VersionManager() + rspec_version = version_manager.get_version(options.get('geni_rspec_version')) + return self.driver.describe(urns, rspec_version, options) + - def SliverStatus (self, api, xrn, creds, options): + def Status (self, api, urns, creds, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return {} - - xrn = Xrn(xrn,'slice') - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() - return self.driver.sliver_status (slice_urn, slice_hrn) - - def CreateSliver(self, api, xrn, creds, rspec_string, users, options): + return self.driver.status (urns, options=options) + + + def Allocate(self, api, xrn, creds, rspec_string, expiration, options): + """ + Allocate resources as described in a request RSpec argument + to a slice with the named URN. + """ + call_id = options.get('call_id') + if Callids().already_handled(call_id): return "" + return self.driver.allocate(xrn, rspec_string, expiration, options) + + def Provision(self, api, xrns, creds, options): """ Create the sliver[s] (slice) at this aggregate. Verify HRN and initialize the slice record in PLC if necessary. """ call_id = options.get('call_id') if Callids().already_handled(call_id): return "" - - xrn = Xrn(xrn, 'slice') - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() - return self.driver.create_sliver (slice_urn, slice_hrn, creds, rspec_string, users, options) + # make sure geni_rspec_version is specified in options + if 'geni_rspec_version' not in options: + msg = 'geni_rspec_version is required and must be set in options struct' + raise SfaInvalidArgument(msg, 'geni_rspec_version') + # make sure we support the requested rspec version + version_manager = VersionManager() + rspec_version = version_manager.get_version(options['geni_rspec_version']) + if not rspec_version: + raise InvalidRSpecVersion(options['geni_rspec_version']) + + return self.driver.provision(xrns, options) - def DeleteSliver(self, api, xrn, creds, options): + def Delete(self, api, xrns, creds, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return True + return self.driver.delete(xrns, options) - xrn = Xrn(xrn, 'slice') - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() - return self.driver.delete_sliver (slice_urn, slice_hrn, creds, options) - - def RenewSliver(self, api, xrn, creds, expiration_time, options): + def Renew(self, api, xrns, creds, expiration_time, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return True - - xrn = Xrn(xrn, 'slice') - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() - return self.driver.renew_sliver (slice_urn, slice_hrn, creds, expiration_time, options) - - ### these methods could use an options extension for at least call_id - def start_slice(self, api, xrn, creds): - xrn = Xrn(xrn) - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() - return self.driver.start_slice (slice_urn, slice_hrn, creds) - - def stop_slice(self, api, xrn, creds): - xrn = Xrn(xrn) - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() - return self.driver.stop_slice (slice_urn, slice_hrn, creds) - - def reset_slice(self, api, xrn): - xrn = Xrn(xrn) - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() - return self.driver.reset_slice (slice_urn, slice_hrn) - - def GetTicket(self, api, xrn, creds, rspec, users, options): - - xrn = Xrn(xrn) - slice_urn=xrn.get_urn() - slice_hrn=xrn.get_hrn() + return self.driver.renew(xrns, expiration_time, options) - # xxx sounds like GetTicket is dead, but if that gets resurrected we might wish - # to pass 'users' over to the driver as well - return self.driver.get_ticket (slice_urn, slice_hrn, creds, rspec, options) + def PerformOperationalAction(self, api, xrns, creds, action, options={}): + call_id = options.get('call_id') + if Callids().already_handled(call_id): return True + return self.driver.perform_operational_action(xrns, action, options) + def Shutdown(self, api, xrn, creds, options={}): + call_id = options.get('call_id') + if Callids().already_handled(call_id): return True + return self.driver.shutdown(xrn, options) + diff --git a/sfa/managers/registry_manager.py b/sfa/managers/registry_manager.py index c29130ae..c24c1f51 100644 --- a/sfa/managers/registry_manager.py +++ b/sfa/managers/registry_manager.py @@ -46,7 +46,13 @@ class RegistryManager: hrn = urn_to_hrn(xrn)[0] else: hrn, type = urn_to_hrn(xrn) - + + # Slivers don't have credentials but users should be able to + # specify a sliver xrn and receive the slice's credential + if type == 'sliver' or '-' in Xrn(hrn).leaf: + slice_xrn = self.driver.sliver_to_slice_xrn(hrn) + hrn = slice_xrn.hrn + # Is this a root or sub authority auth_hrn = api.auth.get_authority(hrn) if not auth_hrn or hrn == api.config.SFA_INTERFACE_HRN: diff --git a/sfa/managers/slice_manager.py b/sfa/managers/slice_manager.py index 02729b00..1c1c0622 100644 --- a/sfa/managers/slice_manager.py +++ b/sfa/managers/slice_manager.py @@ -42,6 +42,7 @@ class SliceManager: version_manager = VersionManager() ad_rspec_versions = [] request_rspec_versions = [] + cred_types = [{'geni_type': 'geni_sfa', 'geni_version': str(i)} for i in range(4)[-2:]] for rspec_version in version_manager.versions: if rspec_version.content_type in ['*', 'ad']: ad_rspec_versions.append(rspec_version.to_dict()) @@ -51,13 +52,14 @@ class SliceManager: version_more = { 'interface':'slicemgr', 'sfa': 2, - 'geni_api': 2, - 'geni_api_versions': {'2': 'http://%s:%s' % (api.config.SFA_SM_HOST, api.config.SFA_SM_PORT)}, + 'geni_api': 3, + 'geni_api_versions': {'3': 'http://%s:%s' % (api.config.SFA_SM_HOST, api.config.SFA_SM_PORT)}, 'hrn' : xrn.get_hrn(), 'urn' : xrn.get_urn(), 'peers': peers, - 'geni_request_rspec_versions': request_rspec_versions, - 'geni_ad_rspec_versions': ad_rspec_versions, + 'geni_single_allocation': 0, # Accept operations that act on as subset of slivers in a given state. + 'geni_allocate': 'geni_many',# Multiple slivers can exist and be incrementally added, including those which connect or overlap in some way. + 'geni_credential_types': cred_types, } sm_version=version_core(version_more) # local aggregate if present needs to have localhost resolved @@ -112,13 +114,9 @@ class SliceManager: try: version = api.get_cached_server_version(server) # force ProtoGENI aggregates to give us a v2 RSpec - if 'sfa' in version.keys(): - forward_options['rspec_version'] = version_manager.get_version('SFA 1').to_dict() - else: - forward_options['rspec_version'] = version_manager.get_version('ProtoGENI 2').to_dict() - forward_options['geni_rspec_version'] = {'type': 'geni', 'version': '3.0'} - rspec = server.ListResources(credential, forward_options) - return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} + forward_options['geni_rspec_version'] = options.get('geni_rspec_version') + result = server.ListResources(credential, forward_options) + return {"aggregate": aggregate, "result": result, "elapsed": time.time()-tStart, "status": "success"} except Exception, e: api.logger.log_exc("ListResources failed at %s" %(server.url)) return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()} @@ -143,7 +141,7 @@ class SliceManager: # get the callers hrn valid_cred = api.auth.checkCredentials(creds, 'listnodes', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first cred = api.getDelegatedCredential(creds) @@ -173,8 +171,9 @@ class SliceManager: self.add_slicemgr_stat(rspec, "ListResources", result["aggregate"], result["elapsed"], result["status"], result.get("exc_info",None)) if result["status"]=="success": + res = result['result']['value'] try: - rspec.version.merge(ReturnValue.get_value(result["rspec"])) + rspec.version.merge(ReturnValue.get_value(res)) except: api.logger.log_exc("SM.ListResources: Failed to merge aggregate rspec") @@ -186,30 +185,28 @@ class SliceManager: return rspec.toxml() - def CreateSliver(self, api, xrn, creds, rspec_str, users, options): + def Allocate(self, api, xrn, creds, rspec_str, expiration, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return "" version_manager = VersionManager() - def _CreateSliver(aggregate, server, xrn, credential, rspec, users, options): + def _Allocate(aggregate, server, xrn, credential, rspec, options): tStart = time.time() try: # Need to call GetVersion at an aggregate to determine the supported # rspec type/format beofre calling CreateSliver at an Aggregate. - server_version = api.get_cached_server_version(server) - requested_users = users - if 'sfa' not in server_version and 'geni_api' in server_version: + #server_version = api.get_cached_server_version(server) + #if 'sfa' not in server_version and 'geni_api' in server_version: # sfa aggregtes support both sfa and pg rspecs, no need to convert # if aggregate supports sfa rspecs. otherwise convert to pg rspec - rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request')) - filter = {'component_manager_id': server_version['urn']} - rspec.filter(filter) - rspec = rspec.toxml() - requested_users = sfa_to_pg_users_arg(users) - rspec = server.CreateSliver(xrn, credential, rspec, requested_users, options) - return {"aggregate": aggregate, "rspec": rspec, "elapsed": time.time()-tStart, "status": "success"} + #rspec = RSpec(RSpecConverter.to_pg_rspec(rspec, 'request')) + #filter = {'component_manager_id': server_version['urn']} + #rspec.filter(filter) + #rspec = rspec.toxml() + result = server.Allocate(xrn, credential, rspec, options) + return {"aggregate": aggregate, "result": result, "elapsed": time.time()-tStart, "status": "success"} except: - logger.log_exc('Something wrong in _CreateSliver with URL %s'%server.url) + logger.log_exc('Something wrong in _Allocate with URL %s'%server.url) return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()} # Validate the RSpec against PlanetLab's schema --disabled for now @@ -232,7 +229,7 @@ class SliceManager: # get the callers hrn hrn, type = urn_to_hrn(xrn) valid_cred = api.auth.checkCredentials(creds, 'createsliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn() threads = ThreadManager() for aggregate in api.aggregates: # prevent infinite loop. Dont send request back to caller @@ -242,28 +239,99 @@ class SliceManager: interface = api.aggregates[aggregate] server = api.server_proxy(interface, cred) # Just send entire RSpec to each aggregate - threads.run(_CreateSliver, aggregate, server, xrn, [cred], rspec.toxml(), users, options) + threads.run(_Allocate, aggregate, server, xrn, [cred], rspec.toxml(), options) results = threads.get_results() manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest') result_rspec = RSpec(version=manifest_version) + geni_urn = None + geni_slivers = [] + for result in results: - self.add_slicemgr_stat(result_rspec, "CreateSliver", result["aggregate"], result["elapsed"], + self.add_slicemgr_stat(result_rspec, "Allocate", result["aggregate"], result["elapsed"], result["status"], result.get("exc_info",None)) if result["status"]=="success": try: - result_rspec.version.merge(ReturnValue.get_value(result["rspec"])) + res = result['result']['value'] + geni_urn = res['geni_urn'] + result_rspec.version.merge(ReturnValue.get_value(res['geni_rspec'])) + geni_slivers.extend(res['geni_slivers']) except: - api.logger.log_exc("SM.CreateSliver: Failed to merge aggregate rspec") - return result_rspec.toxml() + api.logger.log_exc("SM.Allocate: Failed to merge aggregate rspec") + return { + 'geni_urn': geni_urn, + 'geni_rspec': result_rspec.toxml(), + 'geni_slivers': geni_slivers + } + + + def Provision(self, api, xrn, creds, options): + call_id = options.get('call_id') + if Callids().already_handled(call_id): return "" + + version_manager = VersionManager() + def _Provision(aggregate, server, xrn, credential, options): + tStart = time.time() + try: + # Need to call GetVersion at an aggregate to determine the supported + # rspec type/format beofre calling CreateSliver at an Aggregate. + server_version = api.get_cached_server_version(server) + result = server.Provision(xrn, credential, options) + return {"aggregate": aggregate, "result": result, "elapsed": time.time()-tStart, "status": "success"} + except: + logger.log_exc('Something wrong in _Allocate with URL %s'%server.url) + return {"aggregate": aggregate, "elapsed": time.time()-tStart, "status": "exception", "exc_info": sys.exc_info()} + + # attempt to use delegated credential first + cred = api.getDelegatedCredential(creds) + if not cred: + cred = api.getCredential() + + # get the callers hrn + valid_cred = api.auth.checkCredentials(creds, 'createsliver', xrn)[0] + caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn() + threads = ThreadManager() + for aggregate in api.aggregates: + # prevent infinite loop. Dont send request back to caller + # unless the caller is the aggregate's SM + if caller_hrn == aggregate and aggregate != api.hrn: + continue + interface = api.aggregates[aggregate] + server = api.server_proxy(interface, cred) + # Just send entire RSpec to each aggregate + threads.run(_Provision, aggregate, server, xrn, [cred], options) + + results = threads.get_results() + manifest_version = version_manager._get_version('GENI', '3', 'manifest') + result_rspec = RSpec(version=manifest_version) + geni_slivers = [] + geni_urn = None + for result in results: + self.add_slicemgr_stat(result_rspec, "Provision", result["aggregate"], result["elapsed"], + result["status"], result.get("exc_info",None)) + if result["status"]=="success": + try: + res = result['result']['value'] + geni_urn = res['geni_urn'] + result_rspec.version.merge(ReturnValue.get_value(res['geni_rspec'])) + geni_slivers.extend(res['geni_slivers']) + except: + api.logger.log_exc("SM.Provision: Failed to merge aggregate rspec") + return { + 'geni_urn': geni_urn, + 'geni_rspec': result_rspec.toxml(), + 'geni_slivers': geni_slivers + } + + - def RenewSliver(self, api, xrn, creds, expiration_time, options): + def Renew(self, api, xrn, creds, expiration_time, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return True - def _RenewSliver(aggregate, server, xrn, creds, expiration_time, options): + def _Renew(aggregate, server, xrn, creds, expiration_time, options): try: - result=server.RenewSliver(xrn, creds, expiration_time, options) + result=server.Renew(xrn, creds, expiration_time, options) if type(result)!=dict: result = {'code': {'geni_code': 0}, 'value': result} result['aggregate'] = aggregate @@ -274,10 +342,9 @@ class SliceManager: 'code': {'geni_code': -1}, 'value': False, 'output': ""} - (hrn, urn_type) = urn_to_hrn(xrn) # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'renewsliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + valid_cred = api.auth.checkCredentials(creds, 'renewsliver', xrn)[0] + caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first cred = api.getDelegatedCredential(creds) @@ -291,7 +358,7 @@ class SliceManager: continue interface = api.aggregates[aggregate] server = api.server_proxy(interface, cred) - threads.run(_RenewSliver, aggregate, server, xrn, [cred], expiration_time, options) + threads.run(_Renew, aggregate, server, xrn, [cred], expiration_time, options) results = threads.get_results() @@ -307,17 +374,17 @@ class SliceManager: return results - def DeleteSliver(self, api, xrn, creds, options): + def Delete(self, api, xrn, creds, options): call_id = options.get('call_id') if Callids().already_handled(call_id): return "" - def _DeleteSliver(server, xrn, creds, options): - return server.DeleteSliver(xrn, creds, options) + def _Delete(server, xrn, creds, options): + return server.Delete(xrn, creds, options) (hrn, type) = urn_to_hrn(xrn) # get the callers hrn valid_cred = api.auth.checkCredentials(creds, 'deletesliver', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first cred = api.getDelegatedCredential(creds) @@ -337,9 +404,9 @@ class SliceManager: # first draft at a merging SliverStatus - def SliverStatus(self, api, slice_xrn, creds, options): - def _SliverStatus(server, xrn, creds, options): - return server.SliverStatus(xrn, creds, options) + def Status(self, api, slice_xrn, creds, options): + def _Status(server, xrn, creds, options): + return server.Status(xrn, creds, options) call_id = options.get('call_id') if Callids().already_handled(call_id): return {} @@ -351,84 +418,71 @@ class SliceManager: for aggregate in api.aggregates: interface = api.aggregates[aggregate] server = api.server_proxy(interface, cred) - threads.run (_SliverStatus, server, slice_xrn, [cred], options) + threads.run (_Status, server, slice_xrn, [cred], options) results = [ReturnValue.get_value(result) for result in threads.get_results()] # get rid of any void result - e.g. when call_id was hit, where by convention we return {} - results = [ result for result in results if result and result['geni_resources']] + results = [ result for result in results if result and result['geni_slivers']] # do not try to combine if there's no result if not results : return {} # otherwise let's merge stuff - overall = {} - - # mmh, it is expected that all results carry the same urn - overall['geni_urn'] = results[0]['geni_urn'] - overall['pl_login'] = None + geni_slivers = [] + geni_urn = None for result in results: - if result.get('pl_login'): - overall['pl_login'] = result['pl_login'] - break - elif isinstance(result.get('value'), dict) and result['value'].get('pl_login'): - overall['pl_login'] = result['value']['pl_login'] - break - # append all geni_resources - overall['geni_resources'] = \ - reduce (lambda x,y: x+y, [ result['geni_resources'] for result in results] , []) - overall['status'] = 'unknown' - if overall['geni_resources']: - overall['status'] = 'ready' - - return overall - - def ListSlices(self, api, creds, options): - call_id = options.get('call_id') - if Callids().already_handled(call_id): return [] - - def _ListSlices(server, creds, options): - return server.ListSlices(creds, options) + try: + geni_urn = result['geni_urn'] + geni_slivers.extend(result['result']['geni_slivers']) + except: + api.logger.log_exc("SM.Provision: Failed to merge aggregate rspec") + return { + 'geni_urn': geni_urn, + 'geni_slivers': geni_slivers + } - # look in cache first - # xxx is this really frequent enough that it is worth being cached ? - if self.cache: - slices = self.cache.get('slices') - if slices: - api.logger.debug("SliceManager.ListSlices returns from cache") - return slices - - # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'listslices', None)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() - + + def Describe(self, api, xrns, creds, options): + def _Describe(server, xrn, creds, options): + return server.Describe(xrn, creds, options) + + call_id = options.get('call_id') + if Callids().already_handled(call_id): return {} # attempt to use delegated credential first - cred= api.getDelegatedCredential(creds) + cred = api.getDelegatedCredential(creds) if not cred: cred = api.getCredential() threads = ThreadManager() - # fetch from aggregates for aggregate in api.aggregates: - # prevent infinite loop. Dont send request back to caller - # unless the caller is the aggregate's SM - if caller_hrn == aggregate and aggregate != api.hrn: - continue interface = api.aggregates[aggregate] server = api.server_proxy(interface, cred) - threads.run(_ListSlices, server, [cred], options) - - # combime results + threads.run (_Describe, server, slice_xrn, [cred], options) results = [ReturnValue.get_value(result) for result in threads.get_results()] - slices = [] + + # get rid of any void result - e.g. when call_id was hit, where by convention we return {} + results = [ result for result in results if result and result.get('geni_urn')] + + # do not try to combine if there's no result + if not results : return {} + + # otherwise let's merge stuff + manifest_version = version_manager._get_version('GENI', '3', 'manifest') + result_rspec = RSpec(version=manifest_version) + geni_slivers = [] + geni_urn = None for result in results: - slices.extend(result) - - # cache the result - if self.cache: - api.logger.debug("SliceManager.ListSlices caches value") - self.cache.add('slices', slices) - - return slices - + try: + geni_urn = result['geni_urn'] + result_rspec.version.merge(ReturnValue.get_value(result['result']['geni_rspec'])) + geni_slivers.extend(result['result']['geni_slivers']) + except: + api.logger.log_exc("SM.Provision: Failed to merge aggregate rspec") + return { + 'geni_urn': geni_urn, + 'geni_rspec': result_rspec.toxml(), + 'geni_slivers': geni_slivers + } + def GetTicket(self, api, xrn, creds, rspec, users, options): slice_hrn, type = urn_to_hrn(xrn) @@ -442,7 +496,7 @@ class SliceManager: # get the callers hrn valid_cred = api.auth.checkCredentials(creds, 'getticket', slice_hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first cred = api.getDelegatedCredential(creds) @@ -496,12 +550,10 @@ class SliceManager: ticket.sign() return ticket.save_to_string(save_parents=True) - def start_slice(self, api, xrn, creds): - hrn, type = urn_to_hrn(xrn) - + def PerformOperationalAction(self, api, xrn, creds, action, options): # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'startslice', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + valid_cred = api.auth.checkCredentials(creds, 'createsliver', xrn)[0] + caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first cred = api.getDelegatedCredential(creds) @@ -515,16 +567,15 @@ class SliceManager: continue interface = api.aggregates[aggregate] server = api.server_proxy(interface, cred) - threads.run(server.Start, xrn, cred) + threads.run(server.PerformOperationalAction, xrn, cred, action, options) threads.get_results() return 1 - def stop_slice(self, api, xrn, creds): - hrn, type = urn_to_hrn(xrn) - + def Shutdown(self, api, xrn, creds, options={}): + xrn = Xrn(xrn) # get the callers hrn - valid_cred = api.auth.checkCredentials(creds, 'stopslice', hrn)[0] - caller_hrn = Credential(string=valid_cred).get_gid_caller().get_hrn() + valid_cred = api.auth.checkCredentials(creds, 'stopslice', xrn.hrn)[0] + caller_hrn = Credential(cred=valid_cred).get_gid_caller().get_hrn() # attempt to use delegated credential first cred = api.getDelegatedCredential(creds) @@ -538,25 +589,7 @@ class SliceManager: continue interface = api.aggregates[aggregate] server = api.server_proxy(interface, cred) - threads.run(server.Stop, xrn, cred) + threads.run(server.Shutdown, xrn.urn, cred) threads.get_results() return 1 - def reset_slice(self, api, xrn): - """ - Not implemented - """ - return 1 - - def shutdown(self, api, xrn, creds): - """ - Not implemented - """ - return 1 - - def status(self, api, xrn, creds): - """ - Not implemented - """ - return 1 - diff --git a/sfa/methods/Allocate.py b/sfa/methods/Allocate.py new file mode 100644 index 00000000..4105f31f --- /dev/null +++ b/sfa/methods/Allocate.py @@ -0,0 +1,66 @@ +from sfa.util.faults import SfaInvalidArgument, InvalidRSpec, SfatablesRejected +from sfa.util.sfatime import datetime_to_string +from sfa.util.xrn import Xrn +from sfa.util.method import Method +from sfa.util.sfatablesRuntime import run_sfatables +from sfa.trust.credential import Credential +from sfa.storage.parameter import Parameter, Mixed +from sfa.rspecs.rspec import RSpec +from sfa.util.sfalogging import logger + +class Allocate(Method): + """ + Allocate resources as described in a request RSpec argument + to a slice with the named URN. On success, one or more slivers + are allocated, containing resources satisfying the request, and + assigned to the given slice. This method returns a listing and + description of the resources reserved for the slice by this + operation, in the form of a manifest RSpec. Allocated slivers + are held for an aggregate-determined period. Clients must Renew + or Provision slivers before the expiration time (given in the + return struct), or the aggregate will automatically Delete them. + + @param slice_urn (string) URN of slice to allocate to + @param credentials (dict) of credentials + @param rspec (string) rspec to allocate + + """ + interfaces = ['aggregate', 'slicemgr'] + accepts = [ + Parameter(str, "Slice URN"), + Parameter(type([dict]), "List of credentials"), + Parameter(str, "RSpec"), + Parameter(dict, "options"), + ] + returns = Parameter(str, "Allocated RSpec") + + def call(self, xrn, creds, rspec, options): + xrn = Xrn(xrn, type='slice') + self.api.logger.info("interface: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, xrn.get_hrn(), self.name)) + + # Find the valid credentials + valid_creds = self.api.auth.checkCredentials(creds, 'createsliver', xrn.get_hrn()) + # use the expiration from the first valid credential to determine when + # the slivers should expire. + expiration = datetime_to_string(Credential(cred=valid_creds[0]).expiration) + + # make sure request is not empty + slivers = RSpec(rspec).version.get_nodes_with_slivers() + if not slivers: + raise InvalidRSpec("Missing or element. Request rspec must explicitly allocate slivers") + + # flter rspec through sfatables + if self.api.interface in ['aggregate']: + chain_name = 'INCOMING' + elif self.api.interface in ['slicemgr']: + chain_name = 'FORWARD-INCOMING' + self.api.logger.debug("Allocate: sfatables on chain %s"%chain_name) + origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() + self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, xrn, self.name)) + rspec = run_sfatables(chain_name, xrn.get_hrn(), origin_hrn, rspec) + slivers = RSpec(rspec).version.get_nodes_with_slivers() + if not slivers: + raise SfatablesRejected(slice_xrn) + + result = self.api.manager.Allocate(self.api, xrn.get_urn(), creds, rspec, expiration, options) + return result diff --git a/sfa/methods/CreateSliver.py b/sfa/methods/CreateSliver.py deleted file mode 100644 index 27974891..00000000 --- a/sfa/methods/CreateSliver.py +++ /dev/null @@ -1,57 +0,0 @@ -from sfa.util.faults import SfaInvalidArgument, InvalidRSpec -from sfa.util.xrn import urn_to_hrn -from sfa.util.method import Method -from sfa.util.sfatablesRuntime import run_sfatables -from sfa.trust.credential import Credential -from sfa.storage.parameter import Parameter, Mixed -from sfa.rspecs.rspec import RSpec - -class CreateSliver(Method): - """ - Allocate resources to a slice. This operation is expected to - start the allocated resources asynchornously after the operation - has successfully completed. Callers can check on the status of - the resources using SliverStatus. - - @param slice_urn (string) URN of slice to allocate to - @param credentials ([string]) of credentials - @param rspec (string) rspec to allocate - - """ - interfaces = ['aggregate', 'slicemgr'] - accepts = [ - Parameter(str, "Slice URN"), - Mixed(Parameter(str, "Credential string"), - Parameter(type([str]), "List of credentials")), - Parameter(str, "RSpec"), - Parameter(type([]), "List of user information"), - Parameter(dict, "options"), - ] - returns = Parameter(str, "Allocated RSpec") - - def call(self, slice_xrn, creds, rspec, users, options): - hrn, type = urn_to_hrn(slice_xrn) - - self.api.logger.info("interface: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, hrn, self.name)) - - # Find the valid credentials - valid_creds = self.api.auth.checkCredentials(creds, 'createsliver', hrn) - origin_hrn = Credential(string=valid_creds[0]).get_gid_caller().get_hrn() - - # make sure users info is specified - if not users: - msg = "'users' must be specified and cannot be null. You may need to update your client." - raise SfaInvalidArgument(name='users', extra=msg) - - # flter rspec through sfatables - if self.api.interface in ['aggregate']: - chain_name = 'INCOMING' - elif self.api.interface in ['slicemgr']: - chain_name = 'FORWARD-INCOMING' - self.api.logger.debug("CreateSliver: sfatables on chain %s"%chain_name) - rspec = run_sfatables(chain_name, hrn, origin_hrn, rspec) - slivers = RSpec(rspec).version.get_nodes_with_slivers() - if not slivers: - raise InvalidRSpec("Missing or element. Request rspec must explicitly allocate slivers") - result = self.api.manager.CreateSliver(self.api, slice_xrn, creds, rspec, users, options) - return result diff --git a/sfa/methods/Delete.py b/sfa/methods/Delete.py new file mode 100644 index 00000000..e8c5128f --- /dev/null +++ b/sfa/methods/Delete.py @@ -0,0 +1,34 @@ +from sfa.util.xrn import urn_to_hrn +from sfa.util.method import Method +from sfa.storage.parameter import Parameter, Mixed +from sfa.trust.auth import Auth +from sfa.trust.credential import Credential + +class Delete(Method): + """ + Remove the slice or slivers and free the allocated resources + + @param xrns human readable name of slice to instantiate (hrn or urn) + @param creds credential string specifying the rights of the caller + @return 1 is successful, faults otherwise + """ + + interfaces = ['aggregate', 'slicemgr', 'component'] + + accepts = [ + Parameter(type([str]), "Human readable name of slice to delete (hrn or urn)"), + Parameter(type([dict]), "Credentials"), + Parameter(dict, "options"), + ] + + returns = Parameter(int, "1 if successful") + + def call(self, xrns, creds, options): + valid_creds = self.api.auth.checkCredentials(creds, 'deletesliver', xrns, + check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + + #log the call + origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() + self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, xrns, self.name)) + + return self.api.manager.Delete(self.api, xrns, creds, options) diff --git a/sfa/methods/DeleteSliver.py b/sfa/methods/DeleteSliver.py deleted file mode 100644 index c9e40a4a..00000000 --- a/sfa/methods/DeleteSliver.py +++ /dev/null @@ -1,36 +0,0 @@ -from sfa.util.xrn import urn_to_hrn -from sfa.util.method import Method -from sfa.storage.parameter import Parameter, Mixed -from sfa.trust.auth import Auth -from sfa.trust.credential import Credential - -class DeleteSliver(Method): - """ - Remove the slice from all nodes and free the allocated resources - - @param xrn human readable name of slice to instantiate (hrn or urn) - @param cred credential string specifying the rights of the caller - @return 1 is successful, faults otherwise - """ - - interfaces = ['aggregate', 'slicemgr', 'component'] - - accepts = [ - Parameter(str, "Human readable name of slice to delete (hrn or urn)"), - Mixed(Parameter(str, "Credential string"), - Parameter(type([str]), "List of credentials")), - Parameter(dict, "options"), - ] - - returns = Parameter(bool, "True if successful") - - def call(self, xrn, creds, options): - (hrn, type) = urn_to_hrn(xrn) - valid_creds = self.api.auth.checkCredentials(creds, 'deletesliver', hrn) - - #log the call - origin_hrn = Credential(string=valid_creds[0]).get_gid_caller().get_hrn() - self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, hrn, self.name)) - - return self.api.manager.DeleteSliver(self.api, xrn, creds, options) - diff --git a/sfa/methods/Describe.py b/sfa/methods/Describe.py new file mode 100644 index 00000000..b66780a4 --- /dev/null +++ b/sfa/methods/Describe.py @@ -0,0 +1,62 @@ +import zlib + +from sfa.util.xrn import urn_to_hrn +from sfa.util.method import Method +from sfa.util.sfatablesRuntime import run_sfatables +from sfa.util.faults import SfaInvalidArgument +from sfa.trust.credential import Credential + +from sfa.storage.parameter import Parameter, Mixed + +class Describe(Method): + """ + Retrieve a manifest RSpec describing the resources contained by the + named entities, e.g. a single slice or a set of the slivers in a + slice. This listing and description should be sufficiently + descriptive to allow experimenters to use the resources. + @param credential list + @param options dictionary + @return dict + """ + interfaces = ['aggregate', 'slicemgr'] + accepts = [ + Parameter(type([str]), "List of URNs"), + Mixed(Parameter(str, "Credential string"), + Parameter(type([str]), "List of credentials")), + Parameter(dict, "Options") + ] + returns = Parameter(str, "List of resources") + + def call(self, urns, creds, options): + self.api.logger.info("interface: %s\tmethod-name: %s" % (self.api.interface, self.name)) + + # client must specify a version + if not options.get('geni_rspec_version'): + if options.get('rspec_version'): + options['geni_rspec_version'] = options['rspec_version'] + else: + raise SfaInvalidArgument('Must specify an rspec version option. geni_rspec_version cannot be null') + + valid_creds = self.api.auth.checkCredentials(creds, 'listnodes', urns, \ + check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + + # get hrn of the original caller + origin_hrn = options.get('origin_hrn', None) + if not origin_hrn: + origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() + desc = self.api.manager.Describe(self.api, creds, urns, options) + + # filter rspec through sfatables + if self.api.interface in ['aggregate']: + chain_name = 'OUTGOING' + elif self.api.interface in ['slicemgr']: + chain_name = 'FORWARD-OUTGOING' + self.api.logger.debug("ListResources: sfatables on chain %s"%chain_name) + desc['geni_rspec'] = run_sfatables(chain_name, '', origin_hrn, desc['geni_rspec']) + + if options.has_key('geni_compressed') and options['geni_compressed'] == True: + desc['geni_rspec'] = zlib.compress(desc['geni_rspec']).encode('base64') + + return desc + + diff --git a/sfa/methods/GetTicket.py b/sfa/methods/GetTicket.py deleted file mode 100644 index 54978e7e..00000000 --- a/sfa/methods/GetTicket.py +++ /dev/null @@ -1,57 +0,0 @@ -from sfa.util.xrn import urn_to_hrn -from sfa.util.method import Method -from sfa.util.sfatablesRuntime import run_sfatables - -from sfa.trust.credential import Credential - -from sfa.storage.parameter import Parameter, Mixed - -class GetTicket(Method): - """ - Retrieve a ticket. This operation is currently implemented on PLC - only (see SFA, engineering decisions); it is not implemented on - components. - - The ticket is filled in with information from the PLC database. This - information includes resources, and attributes such as user keys and - initscripts. - - @param cred credential string - @param name name of the slice to retrieve a ticket for (hrn or urn) - @param rspec resource specification dictionary - - @return the string representation of a ticket object - """ - - interfaces = ['aggregate', 'slicemgr'] - - accepts = [ - Parameter(str, "Human readable name of slice to retrive a ticket for (hrn or urn)"), - Mixed(Parameter(str, "Credential string"), - Parameter(type([str]), "List of credentials")), - Parameter(str, "Resource specification (rspec)"), - Parameter(type([]), "List of user information"), - Parameter(dict, "Options") - ] - - returns = Parameter(str, "String representation of a ticket object") - - def call(self, xrn, creds, rspec, users, options): - hrn, type = urn_to_hrn(xrn) - # Find the valid credentials - valid_creds = self.api.auth.checkCredentials(creds, 'getticket', hrn) - origin_hrn = Credential(string=valid_creds[0]).get_gid_caller().get_hrn() - - #log the call - self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, hrn, self.name)) - - # filter rspec through sfatables - if self.api.interface in ['aggregate']: - chain_name = 'OUTGOING' - elif self.api.interface in ['slicemgr']: - chain_name = 'FORWARD-OUTGOING' - rspec = run_sfatables(chain_name, hrn, origin_hrn, rspec) - - # remove nodes that are not available at this interface from the rspec - return self.api.manager.GetTicket(self.api, xrn, creds, rspec, users, options) - diff --git a/sfa/methods/ListResources.py b/sfa/methods/ListResources.py index 04359a04..b7ac0b72 100644 --- a/sfa/methods/ListResources.py +++ b/sfa/methods/ListResources.py @@ -10,7 +10,7 @@ from sfa.storage.parameter import Parameter, Mixed class ListResources(Method): """ - Returns information about available resources or resources allocated to this slice + Returns information about available resources @param credential list @param options dictionary @return string @@ -33,17 +33,13 @@ class ListResources(Method): else: raise SfaInvalidArgument('Must specify an rspec version option. geni_rspec_version cannot be null') - # get slice's hrn from options - xrn = options.get('geni_slice_urn', '') - (hrn, _) = urn_to_hrn(xrn) - # Find the valid credentials - valid_creds = self.api.auth.checkCredentials(creds, 'listnodes', hrn) + valid_creds = self.api.auth.checkCredentials(creds, 'listnodes') # get hrn of the original caller origin_hrn = options.get('origin_hrn', None) if not origin_hrn: - origin_hrn = Credential(string=valid_creds[0]).get_gid_caller().get_hrn() + origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() rspec = self.api.manager.ListResources(self.api, creds, options) # filter rspec through sfatables @@ -52,7 +48,7 @@ class ListResources(Method): elif self.api.interface in ['slicemgr']: chain_name = 'FORWARD-OUTGOING' self.api.logger.debug("ListResources: sfatables on chain %s"%chain_name) - filtered_rspec = run_sfatables(chain_name, hrn, origin_hrn, rspec) + filtered_rspec = run_sfatables(chain_name, '', origin_hrn, rspec) if options.has_key('geni_compressed') and options['geni_compressed'] == True: filtered_rspec = zlib.compress(filtered_rspec).encode('base64') diff --git a/sfa/methods/PerformOperationalAction.py b/sfa/methods/PerformOperationalAction.py new file mode 100644 index 00000000..073929f3 --- /dev/null +++ b/sfa/methods/PerformOperationalAction.py @@ -0,0 +1,37 @@ +from sfa.util.faults import SfaInvalidArgument, InvalidRSpec +from sfa.util.xrn import urn_to_hrn +from sfa.util.method import Method +from sfa.util.sfatablesRuntime import run_sfatables +from sfa.trust.credential import Credential +from sfa.storage.parameter import Parameter, Mixed + +class PerformOperationalAction(Method): + """ + Request that the named geni_allocated slivers be made + geni_provisioned, instantiating or otherwise realizing the + resources, such that they have a valid geni_operational_status + and may possibly be made geni_ready for experimenter use. This + operation is synchronous, but may start a longer process, such + as creating and imaging a virtual machine + + @param slice urns ([string]) URNs of slivers to provision to + @param credentials (dict) of credentials + @param options (dict) options + + """ + interfaces = ['aggregate', 'slicemgr'] + accepts = [ + Parameter(type([str]), "URNs"), + Parameter(type([dict]), "Credentials"), + Parameter(str, "Action"), + Parameter(dict, "Options"), + ] + returns = Parameter(dict, "Provisioned Resources") + + def call(self, xrns, creds, action, options): + self.api.logger.info("interface: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, xrns, self.name)) + + # Find the valid credentials + valid_creds = self.api.auth.checkCredentials(creds, 'createsliver', xrns) + result = self.api.manager.PerformOperationalAction(self.api, xrns, creds, action, options) + return result diff --git a/sfa/methods/Provision.py b/sfa/methods/Provision.py new file mode 100644 index 00000000..74ee350e --- /dev/null +++ b/sfa/methods/Provision.py @@ -0,0 +1,40 @@ +from sfa.util.faults import SfaInvalidArgument, InvalidRSpec +from sfa.util.xrn import urn_to_hrn +from sfa.util.method import Method +from sfa.util.sfatablesRuntime import run_sfatables +from sfa.trust.credential import Credential +from sfa.storage.parameter import Parameter, Mixed +from sfa.rspecs.rspec import RSpec + +class Provision(Method): + """ + Request that the named geni_allocated slivers be made + geni_provisioned, instantiating or otherwise realizing the + resources, such that they have a valid geni_operational_status + and may possibly be made geni_ready for experimenter use. This + operation is synchronous, but may start a longer process, such + as creating and imaging a virtual machine + + @param slice urns ([string]) URNs of slivers to provision to + @param credentials (dict) of credentials + @param options (dict) options + + """ + interfaces = ['aggregate', 'slicemgr'] + accepts = [ + Parameter(type([str]), "URNs"), + Parameter(type([dict]), "Credentials"), + Parameter(dict, "options"), + ] + returns = Parameter(dict, "Provisioned Resources") + + def call(self, xrns, creds, options): + self.api.logger.info("interface: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, xrns, self.name)) + + # Find the valid credentials + valid_creds = self.api.auth.checkCredentials(creds, 'createsliver', xrns, + check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() + self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, xrns, self.name)) + result = self.api.manager.Provision(self.api, xrns, creds, options) + return result diff --git a/sfa/methods/RenewSliver.py b/sfa/methods/Renew.py similarity index 66% rename from sfa/methods/RenewSliver.py rename to sfa/methods/Renew.py index c64b8841..288e9700 100644 --- a/sfa/methods/RenewSliver.py +++ b/sfa/methods/Renew.py @@ -9,38 +9,39 @@ from sfa.trust.credential import Credential from sfa.storage.parameter import Parameter -class RenewSliver(Method): +class Renew(Method): """ - Renews the resources in a sliver, extending the lifetime of the slice. - @param slice_urn (string) URN of slice to renew + Renews the resources in the specified slice or slivers by + extending the lifetime. + + @param surn ([string]) List of URNs of to renew @param credentials ([string]) of credentials @param expiration_time (string) requested time of expiration - + @param options (dict) options """ interfaces = ['aggregate', 'slicemgr'] accepts = [ - Parameter(str, "Slice URN"), + Parameter(type([str]), "Slice URN"), Parameter(type([str]), "List of credentials"), Parameter(str, "Expiration time in RFC 3339 format"), Parameter(dict, "Options"), ] returns = Parameter(bool, "Success or Failure") - def call(self, slice_xrn, creds, expiration_time, options): - - (hrn, type) = urn_to_hrn(slice_xrn) + def call(self, urns, creds, expiration_time, options): - self.api.logger.info("interface: %s\ttarget-hrn: %s\tcaller-creds: %s\tmethod-name: %s"%(self.api.interface, hrn, creds, self.name)) + self.api.logger.info("interface: %s\ttarget-hrn: %s\tcaller-creds: %s\tmethod-name: %s"%(self.api.interface, urns, creds, self.name)) # Find the valid credentials - valid_creds = self.api.auth.checkCredentials(creds, 'renewsliver', hrn) + valid_creds = self.api.auth.checkCredentials(creds, 'renewsliver', urns, + check_sliver_callback = self.api.manager.driver.check_sliver_credentials) # Validate that the time does not go beyond the credential's expiration time requested_time = utcparse(expiration_time) max_renew_days = int(self.api.config.SFA_MAX_SLICE_RENEW) - if requested_time > Credential(string=valid_creds[0]).get_expiration(): + if requested_time > Credential(cred=valid_creds[0]).get_expiration(): raise InsufficientRights('Renewsliver: Credential expires before requested expiration time') if requested_time > datetime.datetime.utcnow() + datetime.timedelta(days=max_renew_days): raise Exception('Cannot renew > %s days from now' % max_renew_days) - return self.api.manager.RenewSliver(self.api, slice_xrn, valid_creds, expiration_time, options) + return self.api.manager.Renew(self.api, urns, creds, expiration_time, options) diff --git a/sfa/methods/Shutdown.py b/sfa/methods/Shutdown.py index 273b9914..8641bd0e 100644 --- a/sfa/methods/Shutdown.py +++ b/sfa/methods/Shutdown.py @@ -1,8 +1,8 @@ from sfa.storage.parameter import Parameter +from sfa.trust.credential import Credential +from sfa.util.method import Method -from sfa.methods.Stop import Stop - -class Shutdown(Stop): +class Shutdown(Method): """ Perform an emergency shut down of a sliver. This operation is intended for administrative use. The sliver is shut down but remains available for further forensics. @@ -13,11 +13,17 @@ class Shutdown(Stop): interfaces = ['aggregate', 'slicemgr'] accepts = [ Parameter(str, "Slice URN"), - Parameter(type([str]), "List of credentials"), + Parameter(type([dict]), "Credentials"), ] returns = Parameter(bool, "Success or Failure") - def call(self, slice_xrn, creds): + def call(self, xrn, creds): + + valid_creds = self.api.auth.checkCredentials(creds, 'stopslice', xrn, + check_sliver_callback = self.api.manager.driver.check_sliver_credentials) + #log the call + origin_hrn = Credential(cred=valid_creds[0]).get_gid_caller().get_hrn() + self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, xrn, self.name)) - return Stop.call(self, slice_xrn, creds) + return self.api.manager.Shutdown(self.api, xrn, creds) diff --git a/sfa/methods/Start.py b/sfa/methods/Start.py deleted file mode 100644 index 14122225..00000000 --- a/sfa/methods/Start.py +++ /dev/null @@ -1,37 +0,0 @@ -from sfa.util.xrn import urn_to_hrn -from sfa.util.method import Method - -from sfa.trust.credential import Credential - -from sfa.storage.parameter import Parameter, Mixed - -class Start(Method): - """ - Start the specified slice - - @param xrn human readable name of slice to instantiate (hrn or urn) - @param cred credential string specifying the rights of the caller - @return 1 is successful, faults otherwise - """ - - interfaces = ['aggregate', 'slicemgr', 'component'] - - accepts = [ - Parameter(str, "Human readable name of slice to start (hrn or urn)"), - Mixed(Parameter(str, "Credential string"), - Parameter(type([str]), "List of credentials")), - ] - - returns = Parameter(int, "1 if successful") - - def call(self, xrn, creds): - hrn, type = urn_to_hrn(xrn) - valid_creds = self.api.auth.checkCredentials(creds, 'startslice', hrn) - - #log the call - origin_hrn = Credential(string=valid_creds[0]).get_gid_caller().get_hrn() - self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, hrn, self.name)) - - self.api.manager.start_slice(self.api, xrn, creds) - - return 1 diff --git a/sfa/methods/SliverStatus.py b/sfa/methods/Status.py similarity index 54% rename from sfa/methods/SliverStatus.py rename to sfa/methods/Status.py index deb79983..044e2529 100644 --- a/sfa/methods/SliverStatus.py +++ b/sfa/methods/Status.py @@ -3,7 +3,7 @@ from sfa.util.method import Method from sfa.storage.parameter import Parameter, Mixed -class SliverStatus(Method): +class Status(Method): """ Get the status of a sliver @@ -12,20 +12,16 @@ class SliverStatus(Method): """ interfaces = ['aggregate', 'slicemgr', 'component'] accepts = [ - Parameter(str, "Slice URN"), - Mixed(Parameter(str, "Credential string"), - Parameter(type([str]), "List of credentials")), + Parameter(type([str]), "Slice or sliver URNs"), + Parameter(type([dict]), "credentials"), Parameter(dict, "Options") ] returns = Parameter(dict, "Status details") - def call(self, slice_xrn, creds, options): - hrn, type = urn_to_hrn(slice_xrn) - valid_creds = self.api.auth.checkCredentials(creds, 'sliverstatus', hrn) + def call(self, xrns, creds, options): + valid_creds = self.api.auth.checkCredentials(creds, 'sliverstatus', xrns, + check_sliver_callback = self.api.manager.driver.check_sliver_credentials) - self.api.logger.info("interface: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, hrn, self.name)) - - status = self.api.manager.SliverStatus(self.api, hrn, valid_creds, options) - - return status + self.api.logger.info("interface: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, xrns, self.name)) + return self.api.manager.Status(self.api, xrns, creds, options) diff --git a/sfa/methods/Stop.py b/sfa/methods/Stop.py deleted file mode 100644 index 0d802827..00000000 --- a/sfa/methods/Stop.py +++ /dev/null @@ -1,37 +0,0 @@ -from sfa.util.xrn import urn_to_hrn -from sfa.util.method import Method - -from sfa.trust.credential import Credential - -from sfa.storage.parameter import Parameter, Mixed - -class Stop(Method): - """ - Stop the specified slice - - @param cred credential string specifying the rights of the caller - @param xrn human readable name of slice to instantiate (hrn or urn) - @return 1 is successful, faults otherwise - """ - - interfaces = ['aggregate', 'slicemgr', 'component'] - - accepts = [ - Parameter(str, "Human readable name of slice to instantiate (hrn or urn)"), - Mixed(Parameter(str, "Credential string"), - Parameter(type([str]), "List of credentials")), - ] - - returns = Parameter(int, "1 if successful") - - def call(self, xrn, creds): - hrn, type = urn_to_hrn(xrn) - valid_creds = self.api.auth.checkCredentials(creds, 'stopslice', hrn) - - #log the call - origin_hrn = Credential(string=valid_creds[0]).get_gid_caller().get_hrn() - self.api.logger.info("interface: %s\tcaller-hrn: %s\ttarget-hrn: %s\tmethod-name: %s"%(self.api.interface, origin_hrn, hrn, self.name)) - - self.api.manager.stop_slice(self.api, xrn, creds) - - return 1 diff --git a/sfa/methods/UpdateSliver.py b/sfa/methods/UpdateSliver.py deleted file mode 100644 index 72910d74..00000000 --- a/sfa/methods/UpdateSliver.py +++ /dev/null @@ -1,33 +0,0 @@ -from sfa.storage.parameter import Parameter, Mixed - -from sfa.methods.CreateSliver import CreateSliver - -class UpdateSliver(CreateSliver): - """ - Allocate resources to a slice. This operation is expected to - start the allocated resources asynchornously after the operation - has successfully completed. Callers can check on the status of - the resources using SliverStatus. - - @param slice_urn (string) URN of slice to allocate to - @param credentials ([string]) of credentials - @param rspec (string) rspec to allocate - - """ - interfaces = ['aggregate', 'slicemgr'] - accepts = [ - Parameter(str, "Slice URN"), - Mixed(Parameter(str, "Credential string"), - Parameter(type([str]), "List of credentials")), - Parameter(str, "RSpec"), - Parameter(type([]), "List of user information"), - Parameter(dict, "options"), - ] - returns = Parameter(str, "Allocated RSpec") - - - - def call(self, slice_xrn, creds, rspec, users, options): - - return CreateSliver.call(self, slice_xrn, creds, rspec, users, options) - diff --git a/sfa/methods/__init__.py b/sfa/methods/__init__.py index 8f35200e..10766fe5 100644 --- a/sfa/methods/__init__.py +++ b/sfa/methods/__init__.py @@ -1,29 +1,27 @@ ## Please use make index to update this file all = """ -CreateSliver +Allocate CreateGid -DeleteSliver +Delete +Describe GetCredential GetGids GetSelfCredential -GetTicket GetVersion List ListResources ListSlices +PerformOperationalAction +Provision RedeemTicket Register Remove -RenewSliver +Renew Resolve ResolveGENI Shutdown -SliverStatus -Start -Stop +Status Update -UpdateSliver get_key_from_incoming_ip get_trusted_certs -reset_slice """.split() diff --git a/sfa/methods/reset_slice.py b/sfa/methods/reset_slice.py deleted file mode 100644 index c3975ffd..00000000 --- a/sfa/methods/reset_slice.py +++ /dev/null @@ -1,30 +0,0 @@ -from sfa.util.xrn import urn_to_hrn -from sfa.util.method import Method - -from sfa.storage.parameter import Parameter, Mixed - -class reset_slice(Method): - """ - Reset the specified slice - - @param cred credential string specifying the rights of the caller - @param xrn human readable name of slice to instantiate (hrn or urn) - @return 1 is successful, faults otherwise - """ - - interfaces = ['aggregate', 'slicemgr', 'component'] - - accepts = [ - Parameter(str, "Credential string"), - Parameter(str, "Human readable name of slice to instantiate (hrn or urn)"), - Mixed(Parameter(str, "Human readable name of the original caller"), - Parameter(None, "Origin hrn not specified")) - ] - - returns = Parameter(int, "1 if successful") - - def call(self, cred, xrn, origin_hrn=None): - hrn, type = urn_to_hrn(xrn) - self.api.auth.check(cred, 'resetslice', hrn) - self.api.manager.reset_slice (self.api, xrn) - return 1 diff --git a/sfa/openstack/nova_driver.py b/sfa/openstack/nova_driver.py index d9fa24e6..e0afd07f 100644 --- a/sfa/openstack/nova_driver.py +++ b/sfa/openstack/nova_driver.py @@ -2,21 +2,21 @@ import time import datetime from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \ - RecordNotFound, SfaNotImplemented, SliverDoesNotExist, \ - SfaInvalidArgument + RecordNotFound, SfaNotImplemented, SfaInvalidArgument, UnsupportedOperation from sfa.util.sfalogging import logger from sfa.util.defaultdict import defaultdict from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch -from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf +from sfa.util.xrn import Xrn, hrn_to_urn, get_leaf from sfa.openstack.osxrn import OSXrn, hrn_to_os_slicename, hrn_to_os_tenant_name from sfa.util.cache import Cache from sfa.trust.credential import Credential # used to be used in get_ticket #from sfa.trust.sfaticket import SfaTicket - from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec +from sfa.storage.alchemy import dbsession +from sfa.storage.model import RegRecord, SliverAllocation # the driver interface, mostly provides default behaviours from sfa.managers.driver import Driver @@ -49,6 +49,41 @@ class NovaDriver(Driver): if NovaDriver.cache is None: NovaDriver.cache = Cache() self.cache = NovaDriver.cache + + def sliver_to_slice_xrn(self, xrn): + sliver_id_parts = Xrn(xrn).get_sliver_id_parts() + slice = self.shell.auth_manager.tenants.find(id=sliver_id_parts[0]) + if not slice: + raise Forbidden("Unable to locate slice record for sliver: %s" % xrn) + slice_xrn = OSXrn(name=slice.name, type='slice') + return slice_xrn + + def check_sliver_credentials(self, creds, urns): + # build list of cred object hrns + slice_cred_names = [] + for cred in creds: + slice_cred_hrn = Credential(cred=cred).get_gid_object().get_hrn() + slice_cred_names.append(OSXrn(xrn=slice_cred_hrn).get_slicename()) + + # look up slice name of slivers listed in urns arg + slice_ids = [] + for urn in urns: + sliver_id_parts = Xrn(xrn=urn).get_sliver_id_parts() + slice_ids.append(sliver_id_parts[0]) + + if not slice_ids: + raise Forbidden("sliver urn not provided") + + sliver_names = [] + for slice_id in slice_ids: + slice = self.shell.auth_manager.tenants.find(slice_id) + sliver_names.append(slice['name']) + + # make sure we have a credential for every specified sliver ierd + for sliver_name in sliver_names: + if sliver_name not in slice_cred_names: + msg = "Valid credential not found for target: %s" % sliver_name + raise Forbidden(msg) ######################################## ########## registry oriented @@ -316,127 +351,28 @@ class NovaDriver(Driver): def testbed_name (self): return "openstack" - # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory def aggregate_version (self): - version_manager = VersionManager() - ad_rspec_versions = [] - request_rspec_versions = [] - for rspec_version in version_manager.versions: - if rspec_version.content_type in ['*', 'ad']: - ad_rspec_versions.append(rspec_version.to_dict()) - if rspec_version.content_type in ['*', 'request']: - request_rspec_versions.append(rspec_version.to_dict()) - return { - 'testbed':self.testbed_name(), - 'geni_request_rspec_versions': request_rspec_versions, - 'geni_ad_rspec_versions': ad_rspec_versions, - } - - def list_slices (self, creds, options): - # look in cache first - if self.cache: - slices = self.cache.get('slices') - if slices: - logger.debug("OpenStackDriver.list_slices returns from cache") - return slices - - # get data from db - instance_urns = [] - instances = self.shell.nova_manager.servers.findall() - for instance in instances: - if instance.name not in instance_urns: - instance_urns.append(OSXrn(instance.name, type='slice').urn) + return {} - # cache the result - if self.cache: - logger.debug ("OpenStackDriver.list_slices stores value in cache") - self.cache.add('slices', instance_urns) - - return instance_urns - # first 2 args are None in case of resource discovery - def list_resources (self, slice_urn, slice_hrn, creds, options): - cached_requested = options.get('cached', True) - - version_manager = VersionManager() - # get the rspec's return format from options - rspec_version = version_manager.get_version(options.get('geni_rspec_version')) - version_string = "rspec_%s" % (rspec_version) - - #panos adding the info option to the caching key (can be improved) - if options.get('info'): - version_string = version_string + "_"+options.get('info', 'default') - - # look in cache first - if cached_requested and self.cache and not slice_hrn: - rspec = self.cache.get(version_string) - if rspec: - logger.debug("OpenStackDriver.ListResources: returning cached advertisement") - return rspec - - #panos: passing user-defined options - #print "manager options = ",options + def list_resources (self, version=None, options={}): aggregate = OSAggregate(self) - rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, - options=options) - - # cache the result - if self.cache and not slice_hrn: - logger.debug("OpenStackDriver.ListResources: stores advertisement in cache") - self.cache.add(version_string, rspec) - + rspec = aggregate.list_resources(version=version, options=options) return rspec - - def sliver_status (self, slice_urn, slice_hrn): - # update nova connection - tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name() - self.shell.nova_manager.connect(tenant=tenant_name) - # find out where this slice is currently running - project_name = hrn_to_os_slicename(slice_hrn) - instances = self.shell.nova_manager.servers.findall(name=project_name) - if len(instances) == 0: - raise SliverDoesNotExist("You have not allocated any slivers here") - - result = {} - top_level_status = 'ready' - result['geni_urn'] = slice_urn - result['plos_login'] = 'root' - # do we need real dates here? - result['plos_expires'] = None - result['geni_expires'] = None - - resources = [] - for instance in instances: - res = {} - # instances are accessed by ip, not hostname. We need to report the ip - # somewhere so users know where to ssh to. - res['geni_expires'] = None - #res['plos_hostname'] = instance.hostname - res['plos_created_at'] = datetime_to_string(utcparse(instance.created)) - res['plos_boot_state'] = instance.status - res['plos_sliver_type'] = self.shell.nova_manager.flavors.find(id=instance.flavor['id']).name - res['geni_urn'] = Xrn(slice_urn, type='slice', id=instance.id).get_urn() - - if instance.status.lower() == 'active': - res['boot_state'] = 'ready' - res['geni_status'] = 'ready' - elif instance.status.lower() == 'error': - res['boot_state'] = 'failed' - res['geni_status'] = 'failed' - top_level_status = 'failed' - else: - res['boot_state'] = 'notready' - res['geni_status'] = 'notready' - top_level_status = 'notready' - resources.append(res) - - result['geni_status'] = top_level_status - result['geni_resources'] = resources - return result + def describe(self, urns, version=None, options={}): + aggregate = OSAggregate(self) + return aggregate.describe(urns, version=version, options=options) + + def status (self, urns, options={}): + aggregate = OSAggregate(self) + desc = aggregate.describe(urns) + status = {'geni_urn': desc['geni_urn'], + 'geni_slivers': desc['geni_slivers']} + return status - def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options): - + def allocate (self, urn, rspec_string, expiration, options={}): + xrn = Xrn(urn) aggregate = OSAggregate(self) # assume first user is the caller and use their context @@ -444,9 +380,10 @@ class NovaDriver(Driver): # key as the project key. key_name = None if len(users) > 1: - key_name = aggregate.create_instance_key(slice_hrn, users[0]) + key_name = aggregate.create_instance_key(xrn.get_hrn(), users[0]) # collect public keys + users = options.get('geni_users', []) pubkeys = [] for user in users: pubkeys.extend(user['keys']) @@ -454,110 +391,92 @@ class NovaDriver(Driver): rspec = RSpec(rspec_string) instance_name = hrn_to_os_slicename(slice_hrn) tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name() - instances = aggregate.run_instances(instance_name, tenant_name, rspec_string, key_name, pubkeys) - rspec_nodes = [] + slivers = aggregate.run_instances(instance_name, tenant_name, \ + rspec_string, key_name, pubkeys) + + # update all sliver allocation states setting then to geni_allocated + sliver_ids = [sliver.id for sliver in slivers] + SliverAllocation.set_allocations(sliver_ids, 'geni_allocated') + + return aggregate.describe(urns=[urn], version=rspec.version) + + def provision(self, urns, options={}): + # update sliver allocation states and set them to geni_provisioned + aggregate = OSAggregate(self) + instances = aggregate.get_instances(urns) + sliver_ids = [] for instance in instances: - rspec_nodes.append(aggregate.instance_to_rspec_node(slice_urn, instance)) + sliver_hrn = "%s.%s" % (self.driver.hrn, instance.id) + sliver_ids.append(Xrn(sliver_hrn, type='sliver').urn) + SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned') version_manager = VersionManager() - manifest_version = version_manager._get_version(rspec.version.type, rspec.version.version, 'manifest') - manifest_rspec = RSpec(version=manifest_version, user_options=options) - manifest_rspec.version.add_nodes(rspec_nodes) - - return manifest_rspec.toxml() + rspec_version = version_manager.get_version(options['geni_rspec_version']) + return self.describe(urns, rspec_version, options=options) - def delete_sliver (self, slice_urn, slice_hrn, creds, options): + def delete (self, urns, options={}): + # collect sliver ids so we can update sliver allocation states after + # we remove the slivers. aggregate = OSAggregate(self) - tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name() - project_name = hrn_to_os_slicename(slice_hrn) - return aggregate.delete_instances(project_name, tenant_name) - - def update_sliver(self, slice_urn, slice_hrn, rspec, creds, options): - name = hrn_to_os_slicename(slice_hrn) - tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name() + instances = aggregate.get_instances(urns) + sliver_ids = [] + for instance in instances: + sliver_hrn = "%s.%s" % (self.driver.hrn, instance.id) + sliver_ids.append(Xrn(sliver_hrn, type='sliver').urn) + + # delete the instance + aggregate.delete_instance(instance) + + # delete sliver allocation states + SliverAllocation.delete_allocations(sliver_ids) + + # return geni_slivers + geni_slivers = [] + for sliver_id in sliver_ids: + geni_slivers.append( + {'geni_sliver_urn': sliver['sliver_id'], + 'geni_allocation_status': 'geni_unallocated', + 'geni_expires': None}) + return geni_slivers + + def renew (self, urns, expiration_time, options={}): + description = self.describe(urns, None, options) + return description['geni_slivers'] + + def perform_operational_action (self, urns, action, options={}): aggregate = OSAggregate(self) - return aggregate.update_instances(name) - - def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options): + action = action.lower() + if action == 'geni_start': + action_method = aggregate.start_instances + elif action == 'geni_stop': + action_method = aggregate.stop_instances + elif action == 'geni_restart': + action_method = aggreate.restart_instances + else: + raise UnsupportedOperation(action) + + # fault if sliver is not full allocated (operational status is geni_pending_allocation) + description = self.describe(urns, None, options) + for sliver in description['geni_slivers']: + if sliver['geni_operational_status'] == 'geni_pending_allocation': + raise UnsupportedOperation(action, "Sliver must be fully allocated (operational status is not geni_pending_allocation)") + # + # Perform Operational Action Here + # + + instances = aggregate.get_instances(urns) + for instance in instances: + tenant_name = self.driver.shell.auth_manager.client.tenant_name + action_method(tenant_name, instance.name, instance.id) + description = self.describe(urns) + geni_slivers = self.describe(urns, None, options)['geni_slivers'] + return geni_slivers + + def shutdown(self, xrn, options={}): + xrn = OSXrn(xrn=xrn, type='slice') + tenant_name = xrn.get_tenant_name() + name = xrn.get_slicename() + self.driver.shell.nova_manager.connect(tenant=tenant_name) + instances = self.driver.shell.nova_manager.servers.findall(name=name) + for instance in instances: + self.driver.shell.nova_manager.servers.shutdown(instance) return True - - def start_slice (self, slice_urn, slice_hrn, creds): - return 1 - - def stop_slice (self, slice_urn, slice_hrn, creds): - tenant_name = OSXrn(xrn=slice_hrn, type='slice').get_tenant_name() - name = OSXrn(xrn=slice_urn).name - aggregate = OSAggregate(self) - return aggregate.stop_instances(name, tenant_name) - - def reset_slice (self, slice_urn, slice_hrn, creds): - raise SfaNotImplemented ("reset_slice not available at this interface") - - # xxx this code is quite old and has not run for ages - # it is obviously totally broken and needs a rewrite - def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options): - raise SfaNotImplemented,"OpenStackDriver.get_ticket needs a rewrite" -# please keep this code for future reference -# slices = PlSlices(self) -# peer = slices.get_peer(slice_hrn) -# sfa_peer = slices.get_sfa_peer(slice_hrn) -# -# # get the slice record -# credential = api.getCredential() -# interface = api.registries[api.hrn] -# registry = api.server_proxy(interface, credential) -# records = registry.Resolve(xrn, credential) -# -# # make sure we get a local slice record -# record = None -# for tmp_record in records: -# if tmp_record['type'] == 'slice' and \ -# not tmp_record['peer_authority']: -# #Error (E0602, GetTicket): Undefined variable 'SliceRecord' -# slice_record = SliceRecord(dict=tmp_record) -# if not record: -# raise RecordNotFound(slice_hrn) -# -# # similar to CreateSliver, we must verify that the required records exist -# # at this aggregate before we can issue a ticket -# # parse rspec -# rspec = RSpec(rspec_string) -# requested_attributes = rspec.version.get_slice_attributes() -# -# # ensure site record exists -# site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer) -# # ensure slice record exists -# slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer) -# # ensure person records exists -# # xxx users is undefined in this context -# persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer) -# # ensure slice attributes exists -# slices.verify_slice_attributes(slice, requested_attributes) -# -# # get sliver info -# slivers = slices.get_slivers(slice_hrn) -# -# if not slivers: -# raise SliverDoesNotExist(slice_hrn) -# -# # get initscripts -# initscripts = [] -# data = { -# 'timestamp': int(time.time()), -# 'initscripts': initscripts, -# 'slivers': slivers -# } -# -# # create the ticket -# object_gid = record.get_gid_object() -# new_ticket = SfaTicket(subject = object_gid.get_subject()) -# new_ticket.set_gid_caller(api.auth.client_gid) -# new_ticket.set_gid_object(object_gid) -# new_ticket.set_issuer(key=api.key, subject=self.hrn) -# new_ticket.set_pubkey(object_gid.get_pubkey()) -# new_ticket.set_attributes(data) -# new_ticket.set_rspec(rspec) -# #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) -# new_ticket.encode() -# new_ticket.sign() -# -# return new_ticket.save_to_string(save_parents=True) diff --git a/sfa/openstack/osaggregate.py b/sfa/openstack/osaggregate.py index d603b083..de349955 100644 --- a/sfa/openstack/osaggregate.py +++ b/sfa/openstack/osaggregate.py @@ -4,11 +4,12 @@ import socket import base64 import string import random -import time +import time from collections import defaultdict from nova.exception import ImageNotFound from nova.api.ec2.cloud import CloudController -from sfa.util.faults import SfaAPIError, InvalidRSpec +from sfa.util.faults import SfaAPIError, SliverDoesNotExist +from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch from sfa.rspecs.rspec import RSpec from sfa.rspecs.elements.hardware_type import HardwareType from sfa.rspecs.elements.node import Node @@ -36,15 +37,6 @@ def pubkeys_to_user_data(pubkeys): user_data += "\n" return user_data -def instance_to_sliver(instance, slice_xrn=None): - sliver_id = None - sliver = Sliver({'name': instance.name, - 'type': instance.name, - 'cpus': str(instance.vcpus), - 'memory': str(instance.ram), - 'storage': str(instance.disk)}) - return sliver - def image_to_rspec_disk_image(image): img = DiskImage() img['name'] = image['name'] @@ -58,30 +50,78 @@ class OSAggregate: def __init__(self, driver): self.driver = driver - def get_rspec(self, slice_xrn=None, version=None, options={}): - version_manager = VersionManager() - version = version_manager.get_version(version) - if not slice_xrn: - rspec_version = version_manager._get_version(version.type, version.version, 'ad') - nodes = self.get_aggregate_nodes() - else: - rspec_version = version_manager._get_version(version.type, version.version, 'manifest') - nodes = self.get_slice_nodes(slice_xrn) - rspec = RSpec(version=rspec_version, user_options=options) - rspec.version.add_nodes(nodes) - return rspec.toxml() - def get_availability_zones(self): - # essex release zones = self.driver.shell.nova_manager.dns_domains.domains() - if not zones: zones = ['cloud'] else: zones = [zone.name for zone in zones] return zones - def instance_to_rspec_node(self, slice_xrn, instance): + def list_resources(self, version=None, options={}): + version_manager = VersionManager() + version = version_manager.get_version(version) + rspec_version = version_manager._get_version(version.type, version.version, 'ad') + rspec = RSpec(version=version, user_options=options) + nodes = self.get_aggregate_nodes() + rspec.version.add_nodes(nodes) + return rspec.toxml() + + def describe(self, urns, version=None, options={}): + # update nova connection + tenant_name = OSXrn(xrn=urns[0], type='slice').get_tenant_name() + self.driver.shell.nova_manager.connect(tenant=tenant_name) + instances = self.get_instances(urns) + # lookup the sliver allocations + sliver_ids = [sliver['sliver_id'] for sliver in slivers] + constraint = SliverAllocation.sliver_id.in_(sliver_ids) + sliver_allocations = dbsession.query(SliverAllocation).filter(constraint) + sliver_allocation_dict = {} + for sliver_allocation in sliver_allocations: + sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation + + geni_slivers = [] + rspec_nodes = [] + for instance in instances: + rspec_nodes.append(self.instance_to_rspec_node(instance)) + geni_sliver = self.instance_to_geni_sliver(instance, sliver_sllocation_dict) + geni_slivers.append(geni_sliver) + version_manager = VersionManager() + version = version_manager.get_version(version) + rspec_version = version_manager._get_version(version.type, version.version, 'manifest') + rspec = RSpec(version=rspec_version, user_options=options) + rspec.xml.set('expires', datetime_to_string(utcparse(time.time()))) + rspec.version.add_nodes(rspec_nodes) + result = {'geni_urn': Xrn(urns[0]).get_urn(), + 'geni_rspec': rspec.toxml(), + 'geni_slivers': geni_slivers} + + return result + + def get_instances(self, urns): + # parse slice names and sliver ids + names = set() + ids = set() + for urn in urns: + xrn = OSXrn(xrn=urn) + if xrn.type == 'slice': + names.add(xrn.get_slice_name()) + elif xrn.type == 'sliver': + ids.add(xrn.leaf) + + # look up instances + instances = [] + filter = {} + if names: + filter['name'] = names + if ids: + filter['id'] = ids + servers = self.driver.shell.nova_manager.servers.findall(**filter) + instances.extend(servers) + + return instances + + def instance_to_rspec_node(self, instance): # determine node urn node_xrn = instance.metadata.get('component_id') if not node_xrn: @@ -93,14 +133,13 @@ class OSAggregate: rspec_node['component_id'] = node_xrn.urn rspec_node['component_name'] = node_xrn.name rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn() + rspec_node['sliver_id'] = OSXrn(name=instance.name, type='slice', id=instance.id).get_urn() if instance.metadata.get('client_id'): rspec_node['client_id'] = instance.metadata.get('client_id') # get sliver details - sliver_xrn = OSXrn(xrn=slice_xrn, type='slice', id=instance.id) - rspec_node['sliver_id'] = sliver_xrn.get_urn() flavor = self.driver.shell.nova_manager.flavors.find(id=instance.flavor['id']) - sliver = instance_to_sliver(flavor) + sliver = self.instance_to_sliver(flavor) # get firewall rules fw_rules = [] group_name = instance.metadata.get('security_groups') @@ -112,8 +151,9 @@ class OSAggregate: 'port_range': port_range, 'cidr_ip': rule['ip_range']['cidr']}) fw_rules.append(fw_rule) - sliver['fw_rules'] = fw_rules - rspec_node['slivers']= [sliver] + sliver['fw_rules'] = fw_rules + rspec_node['slivers'] = [sliver] + # get disk image image = self.driver.shell.image_manager.get_images(id=instance.image['id']) if isinstance(image, list) and len(image) > 0: @@ -121,24 +161,23 @@ class OSAggregate: disk_image = image_to_rspec_disk_image(image) sliver['disk_image'] = [disk_image] - # get interfaces + # get interfaces rspec_node['services'] = [] rspec_node['interfaces'] = [] addresses = instance.addresses - # HACK: public ips are stored in the list of private, but - # this seems wrong. Assume pub ip is the last in the list of - # private ips until openstack bug is fixed. + # HACK: public ips are stored in the list of private, but + # this seems wrong. Assume pub ip is the last in the list of + # private ips until openstack bug is fixed. if addresses.get('private'): login = Login({'authentication': 'ssh-keys', 'hostname': addresses.get('private')[-1]['addr'], 'port':'22', 'username': 'root'}) service = Services({'login': login}) - rspec_node['services'].append(service) - - if_index = 0 + rspec_node['services'].append(service) + for private_ip in addresses.get('private', []): - if_xrn = PlXrn(auth=self.driver.hrn, - interface='node%s' % (instance.hostId)) + if_xrn = PlXrn(auth=self.driver.hrn, + interface='node%s' % (instance.hostId)) if_client_id = Xrn(if_xrn.urn, type='interface', id="eth%s" %if_index).urn if_sliver_id = Xrn(rspec_node['sliver_id'], type='slice', id="eth%s" %if_index).urn interface = Interface({'component_id': if_xrn.urn, @@ -146,32 +185,66 @@ class OSAggregate: 'sliver_id': if_sliver_id}) interface['ips'] = [{'address': private_ip['addr'], #'netmask': private_ip['network'], - 'type': 'ipv%s' % str(private_ip['version'])}] - rspec_node['interfaces'].append(interface) - if_index += 1 - + 'type': private_ip['version']}] + rspec_node['interfaces'].append(interface) + # slivers always provide the ssh service for public_ip in addresses.get('public', []): - login = Login({'authentication': 'ssh-keys', - 'hostname': public_ip['addr'], + login = Login({'authentication': 'ssh-keys', + 'hostname': public_ip['addr'], 'port':'22', 'username': 'root'}) service = Services({'login': login}) rspec_node['services'].append(service) - return rspec_node + return rspec_node - def get_slice_nodes(self, slice_xrn): - # update nova connection - tenant_name = OSXrn(xrn=slice_xrn, type='slice').get_tenant_name() - self.driver.shell.nova_manager.connect(tenant=tenant_name) - - zones = self.get_availability_zones() - name = hrn_to_os_slicename(slice_xrn) - instances = self.driver.shell.nova_manager.servers.findall(name=name) - rspec_nodes = [] - for instance in instances: - rspec_nodes.append(self.instance_to_rspec_node(slice_xrn, instance)) - return rspec_nodes + def instance_to_sliver(self, instance, xrn=None): + if xrn: + sliver_hrn = '%s.%s' % (self.driver.hrn, instance.id) + sliver_id = Xrn(sliver_hrn, type='sliver').urn + + sliver = Sliver({'sliver_id': sliver_id, + 'name': instance.name, + 'type': instance.name, + 'cpus': str(instance.vcpus), + 'memory': str(instance.ram), + 'storage': str(instance.disk)}) + return sliver + + def instance_to_geni_sliver(self, instance, sliver_allocations = {}): + sliver_hrn = '%s.%s' % (self.driver.hrn, instance.id) + sliver_id = Xrn(sliver_hrn, type='sliver').urn + + # set sliver allocation and operational status + sliver_allocation = sliver_allocations[sliver_id] + if sliver_allocation: + allocation_status = sliver_allocation.allocation_state + if allocation_status == 'geni_allocated': + op_status = 'geni_pending_allocation' + elif allocation_status == 'geni_provisioned': + state = instance.state.lower() + if state == 'active': + op_status = 'geni_ready' + elif state == 'building': + op_status = 'geni_notready' + elif state == 'failed': + op_status =' geni_failed' + else: + op_status = 'geni_unknown' + else: + allocation_status = 'geni_unallocated' + # required fields + geni_sliver = {'geni_sliver_urn': sliver_id, + 'geni_expires': None, + 'geni_allocation_status': allocation_status, + 'geni_operational_status': op_status, + 'geni_error': None, + 'plos_created_at': datetime_to_string(utcparse(instance.created)), + 'plos_sliver_type': self.shell.nova_manager.flavors.find(id=instance.flavor['id']).name, + } + + return geni_sliver + def get_aggregate_nodes(self): zones = self.get_availability_zones() # available sliver/instance/vm types @@ -193,16 +266,15 @@ class OSAggregate: HardwareType({'name': 'pc'})] slivers = [] for instance in instances: - sliver = instance_to_sliver(instance) + sliver = self.instance_to_sliver(instance) sliver['disk_image'] = disk_images slivers.append(sliver) - + rspec_node['available'] = 'true' rspec_node['slivers'] = slivers rspec_nodes.append(rspec_node) return rspec_nodes - def create_tenant(self, tenant_name): tenants = self.driver.shell.auth_manager.tenants.findall(name=tenant_name) if not tenants: @@ -212,7 +284,6 @@ class OSAggregate: tenant = tenants[0] return tenant - def create_instance_key(self, slice_hrn, user): slice_name = Xrn(slice_hrn).leaf user_name = Xrn(user['urn']).leaf @@ -289,7 +360,7 @@ class OSAggregate: requested_instances = defaultdict(list) # iterate over clouds/zones/nodes - created_instances = [] + slivers = [] for node in rspec.version.get_nodes_with_slivers(): instances = node.get('slivers', []) if not instances: @@ -310,55 +381,75 @@ class OSAggregate: if node.get('component_id'): metadata['component_id'] = node['component_id'] if node.get('client_id'): - metadata['client_id'] = node['client_id'] - server = self.driver.shell.nova_manager.servers.create(flavor=flavor_id, + metadata['client_id'] = node['client_id'] + server = self.driver.shell.nova_manager.servers.create( + flavor=flavor_id, image=image_id, key_name = key_name, security_groups = [group_name], files=files, meta=metadata, name=instance_name) - created_instances.append(server) - + slivers.append(server) except Exception, err: logger.log_exc(err) - return created_instances - - - def delete_instances(self, instance_name, tenant_name): + return slivers - def _delete_security_group(instance): - security_group = instance.metadata.get('security_groups', '') + def delete_instance(self, instance): + + def _delete_security_group(inst): + security_group = inst.metadata.get('security_groups', '') if security_group: manager = SecurityGroup(self.driver) timeout = 10.0 # wait a maximum of 10 seconds before forcing the security group delete start_time = time.time() instance_deleted = False while instance_deleted == False and (time.time() - start_time) < timeout: - inst = self.driver.shell.nova_manager.servers.findall(id=instance.id) - if not inst: + tmp_inst = self.driver.shell.nova_manager.servers.findall(id=inst.id) + if not tmp_inst: instance_deleted = True - time.sleep(1) + time.sleep(.5) manager.delete_security_group(security_group) thread_manager = ThreadManager() - self.driver.shell.nova_manager.connect(tenant=tenant_name) - instances = self.driver.shell.nova_manager.servers.findall(name=instance_name) + tenant = self.driver.shell.auth_manager.tenants.find(id=instance.tenant_id) + self.driver.shell.nova_manager.connect(tenant=tenant.name) + args = {'name': instance.name, + 'id': instance.id} + instances = self.driver.shell.nova_manager.servers.findall(**args) + security_group_manager = SecurityGroup(self.driver) for instance in instances: # destroy instance self.driver.shell.nova_manager.servers.delete(instance) # deleate this instance's security groups thread_manager.run(_delete_security_group, instance) - return True - + return 1 - def stop_instances(self, instance_name, tenant_name): + def stop_instances(self, instance_name, tenant_name, id=None): self.driver.shell.nova_manager.connect(tenant=tenant_name) - instances = self.driver.shell.nova_manager.servers.findall(name=instance_name) + args = {'name': instance_name} + if id: + args['id'] = id + instances = self.driver.shell.nova_manager.servers.findall(**args) for instance in instances: self.driver.shell.nova_manager.servers.pause(instance) return 1 + def start_instances(self, instance_name, tenant_name, id=None): + self.driver.shell.nova_manager.connect(tenant=tenant_name) + args = {'name': instance_name} + if id: + args['id'] = id + instances = self.driver.shell.nova_manager.servers.findall(**args) + for instance in instances: + self.driver.shell.nova_manager.servers.resume(instance) + return 1 + + def restart_instances(self, instacne_name, tenant_name, id=None): + self.stop_instances(instance_name, tenant_name, id) + self.start_instances(instance_name, tenant_name, id) + return 1 + def update_instances(self, project_name): pass diff --git a/sfa/openstack/osxrn.py b/sfa/openstack/osxrn.py index e7d1d21d..6a3944c9 100644 --- a/sfa/openstack/osxrn.py +++ b/sfa/openstack/osxrn.py @@ -17,13 +17,15 @@ class OSXrn(Xrn): def __init__(self, name=None, auth=None, **kwds): config = Config() + self.id = id if name is not None: + Xrn.__init__(self, **kwds) if 'type' in kwds: self.type = kwds['type'] if auth is not None: self.hrn='.'.join([auth, cleanup_name(name)]) else: - self.hrn = config.SFA_INTERFACE_HRN + "." + cleanup_name(name) + self.hrn = name.replace('_', '.') self.hrn_to_urn() else: Xrn.__init__(self, **kwds) @@ -50,6 +52,4 @@ class OSXrn(Xrn): self._normalize() tenant_name = self.hrn.replace('\.', '') return tenant_name - - - + diff --git a/sfa/planetlab/plaggregate.py b/sfa/planetlab/plaggregate.py index 47c637bc..c6006fbb 100644 --- a/sfa/planetlab/plaggregate.py +++ b/sfa/planetlab/plaggregate.py @@ -1,17 +1,18 @@ #!/usr/bin/python +from collections import defaultdict from sfa.util.xrn import Xrn, hrn_to_urn, urn_to_hrn from sfa.util.sfatime import utcparse, datetime_to_string from sfa.util.sfalogging import logger - +from sfa.util.faults import SliverDoesNotExist from sfa.rspecs.rspec import RSpec from sfa.rspecs.elements.hardware_type import HardwareType -from sfa.rspecs.elements.node import Node +from sfa.rspecs.elements.node import NodeElement from sfa.rspecs.elements.link import Link from sfa.rspecs.elements.sliver import Sliver from sfa.rspecs.elements.login import Login from sfa.rspecs.elements.location import Location from sfa.rspecs.elements.interface import Interface -from sfa.rspecs.elements.services import Services +from sfa.rspecs.elements.services import ServicesElement from sfa.rspecs.elements.pltag import PLTag from sfa.rspecs.elements.lease import Lease from sfa.rspecs.elements.granularity import Granularity @@ -20,6 +21,9 @@ from sfa.rspecs.version_manager import VersionManager from sfa.planetlab.plxrn import PlXrn, hostname_to_urn, hrn_to_pl_slicename, slicename_to_hrn from sfa.planetlab.vlink import get_tc_rate from sfa.planetlab.topology import Topology +from sfa.storage.alchemy import dbsession +from sfa.storage.model import SliverAllocation + import time @@ -27,6 +31,15 @@ class PlAggregate: def __init__(self, driver): self.driver = driver + + def get_nodes(self, options={}): + filter = {'peer_id': None} + geni_available = options.get('geni_available') + if geni_available == True: + filter['boot_state'] = 'boot' + nodes = self.driver.shell.GetNodes(filter) + + return nodes def get_sites(self, filter={}): sites = {} @@ -99,164 +112,212 @@ class PlAggregate: pl_initscripts[initscript['initscript_id']] = initscript return pl_initscripts + def get_slivers(self, urns, options={}): + names = set() + slice_ids = set() + node_ids = [] + for urn in urns: + xrn = PlXrn(xrn=urn) + if xrn.type == 'sliver': + # id: slice_id-node_id + try: + sliver_id_parts = xrn.get_sliver_id_parts() + slice_id = int(sliver_id_parts[0]) + node_id = int(sliver_id_parts[1]) + slice_ids.add(slice_id) + node_ids.append(node_id) + except ValueError: + pass + else: + names.add(xrn.pl_slicename()) - def get_slice_and_slivers(self, slice_xrn): - """ - Returns a dict of slivers keyed on the sliver's node_id - """ - slivers = {} - slice = None - if not slice_xrn: - return (slice, slivers) - slice_urn = hrn_to_urn(slice_xrn, 'slice') - slice_hrn, _ = urn_to_hrn(slice_xrn) - slice_name = hrn_to_pl_slicename(slice_hrn) - slices = self.driver.shell.GetSlices(slice_name) + filter = {} + if names: + filter['name'] = list(names) + if slice_ids: + filter['slice_id'] = list(slice_ids) + # get slices + slices = self.driver.shell.GetSlices(filter) if not slices: - return (slice, slivers) - slice = slices[0] - - # sort slivers by node id - for node_id in slice['node_ids']: - sliver_xrn = Xrn(slice_urn, type='sliver', id=node_id) - sliver_xrn.set_authority(self.driver.hrn) - sliver = Sliver({'sliver_id': sliver_xrn.urn, - 'name': slice['name'], - 'type': 'plab-vserver', - 'tags': []}) - slivers[node_id]= sliver - - # sort sliver attributes by node id - tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice['slice_tag_ids']}) - for tag in tags: - # most likely a default/global sliver attribute (node_id == None) - if tag['node_id'] not in slivers: - sliver_xrn = Xrn(slice_urn, type='sliver', id=tag['node_id']) - sliver_xrn.set_authority(self.driver.hrn) - sliver = Sliver({'sliver_id': sliver_xrn.urn, - 'name': slice['name'], - 'type': 'plab-vserver', - 'tags': []}) - slivers[tag['node_id']] = sliver - slivers[tag['node_id']]['tags'].append(tag) + return [] + slice = slices[0] + slice['hrn'] = PlXrn(auth=self.driver.hrn, slicename=slice['name']).hrn + + # get sliver users + persons = [] + person_ids = [] + for slice in slices: + person_ids.extend(slice['person_ids']) + if person_ids: + persons = self.driver.shell.GetPersons(person_ids) + + # get user keys + keys = {} + key_ids = [] + for person in persons: + key_ids.extend(person['key_ids']) - return (slice, slivers) - - def get_nodes_and_links(self, slice_xrn, slice=None,slivers=[], options={}): - # if we are dealing with a slice that has no node just return - # and empty list - if slice_xrn: - if not slice or not slice['node_ids']: - return ([],[]) + if key_ids: + key_list = self.driver.shell.GetKeys(key_ids) + for key in key_list: + keys[key['key_id']] = key + + # construct user key info + users = [] + for person in persons: + name = person['email'][0:person['email'].index('@')] + user = { + 'login': slice['name'], + 'user_urn': Xrn('%s.%s' % (self.driver.hrn, name), type='user').urn, + 'keys': [keys[k_id]['key'] for k_id in person['key_ids'] if k_id in keys] + } + users.append(user) + + if node_ids: + node_ids = [node_id for node_id in node_ids if node_id in slice['node_ids']] + slice['node_ids'] = node_ids + tags_dict = self.get_slice_tags(slice) + nodes_dict = self.get_slice_nodes(slice, options) + slivers = [] + for node in nodes_dict.values(): + node.update(slice) + node['tags'] = tags_dict[node['node_id']] + sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id']) + node['sliver_id'] = Xrn(sliver_hrn, type='sliver').urn + node['urn'] = node['sliver_id'] + node['services_user'] = users + slivers.append(node) + return slivers + + def node_to_rspec_node(self, node, sites, interfaces, node_tags, pl_initscripts=[], grain=None, options={}): + rspec_node = NodeElement() + # xxx how to retrieve site['login_base'] + site=sites[node['site_id']] + rspec_node['component_id'] = PlXrn(self.driver.hrn, hostname=node['hostname']).get_urn() + rspec_node['component_name'] = node['hostname'] + rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn() + rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa') + # do not include boot state ( element) in the manifest rspec + rspec_node['boot_state'] = node['boot_state'] + if node['boot_state'] == 'boot': + rspec_node['available'] = 'true' + else: + rspec_node['available'] = 'false' + rspec_node['exclusive'] = 'false' + rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}), + HardwareType({'name': 'pc'})] + # only doing this because protogeni rspec needs + # to advertise available initscripts + rspec_node['pl_initscripts'] = pl_initscripts.values() + # add site/interface info to nodes. + # assumes that sites, interfaces and tags have already been prepared. + if site['longitude'] and site['latitude']: + location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'}) + rspec_node['location'] = location + # Granularity + granularity = Granularity({'grain': grain}) + rspec_node['granularity'] = granularity + rspec_node['interfaces'] = [] + if_count=0 + for if_id in node['interface_ids']: + interface = Interface(interfaces[if_id]) + interface['ipv4'] = interface['ip'] + interface['component_id'] = PlXrn(auth=self.driver.hrn, + interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn() + # interfaces in the manifest need a client id + if slice: + interface['client_id'] = "%s:%s" % (node['node_id'], if_id) + rspec_node['interfaces'].append(interface) + if_count+=1 + tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids'] if tag_id in node_tags] + rspec_node['tags'] = tags + return rspec_node + + def sliver_to_rspec_node(self, sliver, sites, interfaces, node_tags, \ + pl_initscripts, sliver_allocations): + # get the granularity in second for the reservation system + grain = self.driver.shell.GetLeaseGranularity() + rspec_node = self.node_to_rspec_node(sliver, sites, interfaces, node_tags, pl_initscripts, grain) + # xxx how to retrieve site['login_base'] + rspec_node['expires'] = datetime_to_string(utcparse(sliver['expires'])) + # remove interfaces from manifest + rspec_node['interfaces'] = [] + # add sliver info + rspec_sliver = Sliver({'sliver_id': sliver['urn'], + 'name': sliver['name'], + 'type': 'plab-vserver', + 'tags': []}) + rspec_node['sliver_id'] = rspec_sliver['sliver_id'] + rspec_node['client_id'] = sliver_allocations[sliver['urn']].client_id + if sliver_allocations[sliver['urn']].component_id: + rspec_node['component_id'] = sliver_allocations[sliver['urn']].component_id + rspec_node['slivers'] = [rspec_sliver] + + # slivers always provide the ssh service + login = Login({'authentication': 'ssh-keys', + 'hostname': sliver['hostname'], + 'port':'22', + 'username': sliver['name'], + 'login': sliver['name'] + }) + service = ServicesElement({'login': login, + 'services_user': sliver['services_user']}) + rspec_node['services'] = [service] + return rspec_node + + def get_slice_tags(self, slice): + slice_tag_ids = [] + slice_tag_ids.extend(slice['slice_tag_ids']) + tags = self.driver.shell.GetSliceTags({'slice_tag_id': slice_tag_ids}) + # sorted by node_id + tags_dict = defaultdict(list) + for tag in tags: + tags_dict[tag['node_id']] = tag + return tags_dict - filter = {} + def get_slice_nodes(self, slice, options={}): + nodes_dict = {} + filter = {'peer_id': None} tags_filter = {} - if slice and 'node_ids' in slice and slice['node_ids']: + if slice and slice.get('node_ids'): filter['node_id'] = slice['node_ids'] - tags_filter=filter.copy() - - geni_available = options.get('geni_available') + else: + # there are no nodes to look up + return nodes_dict + tags_filter=filter.copy() + geni_available = options.get('geni_available') if geni_available == True: - filter['boot_state'] = 'boot' - - filter.update({'peer_id': None}) + filter['boot_state'] = 'boot' nodes = self.driver.shell.GetNodes(filter) - - # get the granularity in second for the reservation system - grain = self.driver.shell.GetLeaseGranularity() - - site_ids = [] - interface_ids = [] - tag_ids = [] - nodes_dict = {} for node in nodes: - site_ids.append(node['site_id']) - interface_ids.extend(node['interface_ids']) - tag_ids.extend(node['node_tag_ids']) nodes_dict[node['node_id']] = node - - # get sites - sites_dict = self.get_sites({'site_id': site_ids}) - # get interfaces - interfaces = self.get_interfaces({'interface_id':interface_ids}) - # get tags - node_tags = self.get_node_tags(tags_filter) - # get initscripts - pl_initscripts = self.get_pl_initscripts() - - links = self.get_links(sites_dict, nodes_dict, interfaces) - - rspec_nodes = [] - for node in nodes: - # skip whitelisted nodes - if node['slice_ids_whitelist']: - if not slice or slice['slice_id'] not in node['slice_ids_whitelist']: - continue - rspec_node = Node() - # xxx how to retrieve site['login_base'] - site_id=node['site_id'] - site=sites_dict[site_id] - rspec_node['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], node['hostname']) - rspec_node['component_name'] = node['hostname'] - rspec_node['component_manager_id'] = Xrn(self.driver.hrn, 'authority+cm').get_urn() - rspec_node['authority_id'] = hrn_to_urn(PlXrn.site_hrn(self.driver.hrn, site['login_base']), 'authority+sa') - # do not include boot state ( element) in the manifest rspec - if not slice: - rspec_node['boot_state'] = node['boot_state'] - - #add the exclusive tag to distinguish between Shared and Reservable nodes - if node['node_type'] == 'reservable': - rspec_node['exclusive'] = 'true' + return nodes_dict + + def rspec_node_to_geni_sliver(self, rspec_node, sliver_allocations = {}): + if rspec_node['sliver_id'] in sliver_allocations: + # set sliver allocation and operational status + sliver_allocation = sliver_allocations[rspec_node['sliver_id']] + if sliver_allocation: + allocation_status = sliver_allocation.allocation_state + if allocation_status == 'geni_allocated': + op_status = 'geni_pending_allocation' + elif allocation_status == 'geni_provisioned': + if rspec_node['boot_state'] == 'boot': + op_status = 'geni_ready' + else: + op_status = 'geni_failed' + else: + op_status = 'geni_unknown' else: - rspec_node['exclusive'] = 'false' - - rspec_node['hardware_types'] = [HardwareType({'name': 'plab-pc'}), - HardwareType({'name': 'pc'})] - # only doing this because protogeni rspec needs - # to advertise available initscripts - rspec_node['pl_initscripts'] = pl_initscripts.values() - # add site/interface info to nodes. - # assumes that sites, interfaces and tags have already been prepared. - site = sites_dict[node['site_id']] - if site['longitude'] and site['latitude']: - location = Location({'longitude': site['longitude'], 'latitude': site['latitude'], 'country': 'unknown'}) - rspec_node['location'] = location - # Granularity - granularity = Granularity({'grain': grain}) - rspec_node['granularity'] = granularity - - rspec_node['interfaces'] = [] - if_count=0 - for if_id in node['interface_ids']: - interface = Interface(interfaces[if_id]) - interface['ipv4'] = interface['ip'] - interface['component_id'] = PlXrn(auth=self.driver.hrn, - interface='node%s:eth%s' % (node['node_id'], if_count)).get_urn() - # interfaces in the manifest need a client id - if slice: - interface['client_id'] = "%s:%s" % (node['node_id'], if_id) - rspec_node['interfaces'].append(interface) - if_count+=1 - - tags = [PLTag(node_tags[tag_id]) for tag_id in node['node_tag_ids']\ - if tag_id in node_tags] - rspec_node['tags'] = tags - if node['node_id'] in slivers: - # add sliver info - sliver = slivers[node['node_id']] - rspec_node['sliver_id'] = sliver['sliver_id'] - rspec_node['slivers'] = [sliver] - for tag in sliver['tags']: - if tag['tagname'] == 'client_id': - rspec_node['client_id'] = tag['value'] - - # slivers always provide the ssh service - login = Login({'authentication': 'ssh-keys', 'hostname': node['hostname'], 'port':'22', 'username': sliver['name']}) - service = Services({'login': login}) - rspec_node['services'] = [service] - rspec_nodes.append(rspec_node) - return (rspec_nodes, links) - + allocation_status = 'geni_unallocated' + # required fields + geni_sliver = {'geni_sliver_urn': rspec_node['sliver_id'], + 'geni_expires': rspec_node['expires'], + 'geni_allocation_status' : allocation_status, + 'geni_operational_status': op_status, + 'geni_error': '', + } + return geni_sliver def get_leases(self, slice_xrn=None, slice=None, options={}): @@ -288,7 +349,6 @@ class PlAggregate: site_id=lease['site_id'] site=sites_dict[site_id] - #rspec_lease['lease_id'] = lease['lease_id'] rspec_lease['component_id'] = hostname_to_urn(self.driver.hrn, site['login_base'], lease['hostname']) if slice_xrn: slice_urn = slice_xrn @@ -303,39 +363,116 @@ class PlAggregate: return rspec_leases - def get_rspec(self, slice_xrn=None, version = None, options={}): + def list_resources(self, version = None, options={}): version_manager = VersionManager() version = version_manager.get_version(version) - if not slice_xrn: - rspec_version = version_manager._get_version(version.type, version.version, 'ad') - else: - rspec_version = version_manager._get_version(version.type, version.version, 'manifest') - - slice, slivers = self.get_slice_and_slivers(slice_xrn) + rspec_version = version_manager._get_version(version.type, version.version, 'ad') rspec = RSpec(version=rspec_version, user_options=options) - if slice and 'expires' in slice: - rspec.xml.set('expires', datetime_to_string(utcparse(slice['expires']))) - - if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'leases': - if slice_xrn and not slivers: - nodes, links = [], [] - else: - nodes, links = self.get_nodes_and_links(slice_xrn, slice, slivers, options) - rspec.version.add_nodes(nodes) + + if not options.get('list_leases') or options['list_leases'] != 'leases': + # get nodes + nodes = self.get_nodes(options) + site_ids = [] + interface_ids = [] + tag_ids = [] + nodes_dict = {} + for node in nodes: + site_ids.append(node['site_id']) + interface_ids.extend(node['interface_ids']) + tag_ids.extend(node['node_tag_ids']) + nodes_dict[node['node_id']] = node + sites = self.get_sites({'site_id': site_ids}) + interfaces = self.get_interfaces({'interface_id':interface_ids}) + node_tags = self.get_node_tags({'node_tag_id': tag_ids}) + pl_initscripts = self.get_pl_initscripts() + # convert nodes to rspec nodes + rspec_nodes = [] + for node in nodes: + rspec_node = self.node_to_rspec_node(node, sites, interfaces, node_tags, pl_initscripts) + rspec_nodes.append(rspec_node) + rspec.version.add_nodes(rspec_nodes) + + # add links + links = self.get_links(sites, nodes_dict, interfaces) rspec.version.add_links(links) - # add sliver defaults - default_sliver = slivers.get(None, []) - if default_sliver: - default_sliver_attribs = default_sliver.get('tags', []) - for attrib in default_sliver_attribs: - logger.info(attrib) - rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value']) - + if not options.get('list_leases') or options.get('list_leases') and options['list_leases'] != 'resources': leases = self.get_leases(slice_xrn, slice) rspec.version.add_leases(leases) return rspec.toxml() + def describe(self, urns, version=None, options={}): + version_manager = VersionManager() + version = version_manager.get_version(version) + rspec_version = version_manager._get_version(version.type, version.version, 'manifest') + rspec = RSpec(version=rspec_version, user_options=options) + + # get slivers + geni_slivers = [] + slivers = self.get_slivers(urns, options) + if slivers: + rspec_expires = datetime_to_string(utcparse(slivers[0]['expires'])) + else: + rspec_expires = datetime_to_string(utcparse(time.time())) + rspec.xml.set('expires', rspec_expires) + + # lookup the sliver allocations + geni_urn = urns[0] + sliver_ids = [sliver['sliver_id'] for sliver in slivers] + constraint = SliverAllocation.sliver_id.in_(sliver_ids) + sliver_allocations = dbsession.query(SliverAllocation).filter(constraint) + sliver_allocation_dict = {} + for sliver_allocation in sliver_allocations: + geni_urn = sliver_allocation.slice_urn + sliver_allocation_dict[sliver_allocation.sliver_id] = sliver_allocation + + if not options.get('list_leases') or options['list_leases'] != 'leases': + # add slivers + site_ids = [] + interface_ids = [] + tag_ids = [] + nodes_dict = {} + for sliver in slivers: + site_ids.append(sliver['site_id']) + interface_ids.extend(sliver['interface_ids']) + tag_ids.extend(sliver['node_tag_ids']) + nodes_dict[sliver['node_id']] = sliver + sites = self.get_sites({'site_id': site_ids}) + interfaces = self.get_interfaces({'interface_id':interface_ids}) + node_tags = self.get_node_tags({'node_tag_id': tag_ids}) + pl_initscripts = self.get_pl_initscripts() + rspec_nodes = [] + for sliver in slivers: + if sliver['slice_ids_whitelist'] and sliver['slice_id'] not in sliver['slice_ids_whitelist']: + continue + rspec_node = self.sliver_to_rspec_node(sliver, sites, interfaces, node_tags, + pl_initscripts, sliver_allocation_dict) + # manifest node element shouldn't contain available attribute + rspec_node.pop('available') + rspec_nodes.append(rspec_node) + geni_sliver = self.rspec_node_to_geni_sliver(rspec_node, sliver_allocation_dict) + geni_slivers.append(geni_sliver) + rspec.version.add_nodes(rspec_nodes) + + # add sliver defaults + #default_sliver = slivers.get(None, []) + #if default_sliver: + # default_sliver_attribs = default_sliver.get('tags', []) + # for attrib in default_sliver_attribs: + # rspec.version.add_default_sliver_attribute(attrib['tagname'], attrib['value']) + + # add links + links = self.get_links(sites, nodes_dict, interfaces) + rspec.version.add_links(links) + + if not options.get('list_leases') or options['list_leases'] != 'resources': + if slivers: + leases = self.get_leases(slivers[0]) + rspec.version.add_leases(leases) + + return {'geni_urn': geni_urn, + 'geni_rspec': rspec.toxml(), + 'geni_slivers': geni_slivers} diff --git a/sfa/planetlab/pldriver.py b/sfa/planetlab/pldriver.py index 6c88a14f..4bcb7937 100644 --- a/sfa/planetlab/pldriver.py +++ b/sfa/planetlab/pldriver.py @@ -1,9 +1,8 @@ -import time import datetime # from sfa.util.faults import MissingSfaInfo, UnknownSfaType, \ - RecordNotFound, SfaNotImplemented, SliverDoesNotExist - + RecordNotFound, SfaNotImplemented, SliverDoesNotExist, SearchFailed, \ + UnsupportedOperation, Forbidden from sfa.util.sfalogging import logger from sfa.util.defaultdict import defaultdict from sfa.util.sfatime import utcparse, datetime_to_string, datetime_to_epoch @@ -12,17 +11,16 @@ from sfa.util.cache import Cache # one would think the driver should not need to mess with the SFA db, but.. from sfa.storage.alchemy import dbsession -from sfa.storage.model import RegRecord +from sfa.storage.model import RegRecord, SliverAllocation +from sfa.trust.credential import Credential # used to be used in get_ticket #from sfa.trust.sfaticket import SfaTicket - from sfa.rspecs.version_manager import VersionManager from sfa.rspecs.rspec import RSpec # the driver interface, mostly provides default behaviours from sfa.managers.driver import Driver - from sfa.planetlab.plshell import PlShell import sfa.planetlab.peers as peers from sfa.planetlab.plaggregate import PlAggregate @@ -55,7 +53,49 @@ class PlDriver (Driver): if PlDriver.cache is None: PlDriver.cache = Cache() self.cache = PlDriver.cache + + def sliver_to_slice_xrn(self, xrn): + sliver_id_parts = Xrn(xrn).get_sliver_id_parts() + filter = {} + try: + filter['slice_id'] = int(sliver_id_parts[0]) + except ValueError: + fliter['name'] = sliver_id_parts[0] + slices = self.shell.GetSlices(filter) + if not slices: + raise Forbidden("Unable to locate slice record for sliver: %s" % xrn) + slice = slices[0] + slice_xrn = PlXrn(auth=self.hrn, slicename=slice['name']) + return slice_xrn + def check_sliver_credentials(self, creds, urns): + # build list of cred object hrns + slice_cred_names = [] + for cred in creds: + slice_cred_hrn = Credential(cred=cred).get_gid_object().get_hrn() + slice_cred_names.append(PlXrn(xrn=slice_cred_hrn).pl_slicename()) + + # look up slice name of slivers listed in urns arg + slice_ids = [] + for urn in urns: + sliver_id_parts = Xrn(xrn=urn).get_sliver_id_parts() + try: + slice_ids.append(int(sliver_id_parts[0])) + except ValueError: + pass + + if not slice_ids: + raise Forbidden("sliver urn not provided") + + slices = self.shell.GetSlices(slice_ids) + sliver_names = [slice['name'] for slice in slices] + + # make sure we have a credential for every specified sliver ierd + for sliver_name in sliver_names: + if sliver_name not in slice_cred_names: + msg = "Valid credential not found for target: %s" % sliver_name + raise Forbidden(msg) + ######################################## ########## registry oriented ######################################## @@ -566,160 +606,34 @@ class PlDriver (Driver): def testbed_name (self): return "myplc" - # 'geni_request_rspec_versions' and 'geni_ad_rspec_versions' are mandatory def aggregate_version (self): - version_manager = VersionManager() - ad_rspec_versions = [] - request_rspec_versions = [] - for rspec_version in version_manager.versions: - if rspec_version.content_type in ['*', 'ad']: - ad_rspec_versions.append(rspec_version.to_dict()) - if rspec_version.content_type in ['*', 'request']: - request_rspec_versions.append(rspec_version.to_dict()) - return { - 'testbed':self.testbed_name(), - 'geni_request_rspec_versions': request_rspec_versions, - 'geni_ad_rspec_versions': ad_rspec_versions, - } - - def list_slices (self, creds, options): - # look in cache first - if self.cache: - slices = self.cache.get('slices') - if slices: - logger.debug("PlDriver.list_slices returns from cache") - return slices - - # get data from db - slices = self.shell.GetSlices({'peer_id': None}, ['name']) - slice_hrns = [slicename_to_hrn(self.hrn, slice['name']) for slice in slices] - slice_urns = [hrn_to_urn(slice_hrn, 'slice') for slice_hrn in slice_hrns] - - # cache the result - if self.cache: - logger.debug ("PlDriver.list_slices stores value in cache") - self.cache.add('slices', slice_urns) - - return slice_urns - - # first 2 args are None in case of resource discovery - def list_resources (self, slice_urn, slice_hrn, creds, options): - cached_requested = options.get('cached', True) - - version_manager = VersionManager() - # get the rspec's return format from options - rspec_version = version_manager.get_version(options.get('geni_rspec_version')) - version_string = "rspec_%s" % (rspec_version) - - #panos adding the info option to the caching key (can be improved) - if options.get('info'): - version_string = version_string + "_"+options.get('info', 'default') + return {} - # Adding the list_leases option to the caching key - if options.get('list_leases'): - version_string = version_string + "_"+options.get('list_leases', 'default') - - # Adding geni_available to caching key - if options.get('geni_available'): - version_string = version_string + "_" + str(options.get('geni_available')) - - # look in cache first - if cached_requested and self.cache and not slice_hrn: - rspec = self.cache.get(version_string) - if rspec: - logger.debug("PlDriver.ListResources: returning cached advertisement") - return rspec - - #panos: passing user-defined options - #print "manager options = ",options + # first 2 args are None in case of resource discovery + def list_resources (self, version=None, options={}): aggregate = PlAggregate(self) - rspec = aggregate.get_rspec(slice_xrn=slice_urn, version=rspec_version, - options=options) - - # cache the result - if self.cache and not slice_hrn: - logger.debug("PlDriver.ListResources: stores advertisement in cache") - self.cache.add(version_string, rspec) - + rspec = aggregate.list_resources(version=version, options=options) return rspec - - def sliver_status (self, slice_urn, slice_hrn): - # find out where this slice is currently running - slicename = hrn_to_pl_slicename(slice_hrn) - - slices = self.shell.GetSlices([slicename], ['slice_id', 'node_ids','person_ids','name','expires']) - if len(slices) == 0: - raise SliverDoesNotExist("%s (used %s as slicename internally)" % (slice_hrn, slicename)) - slice = slices[0] - - # report about the local nodes only - nodes = self.shell.GetNodes({'node_id':slice['node_ids'],'peer_id':None}, - ['node_id', 'hostname', 'site_id', 'boot_state', 'last_contact']) - - if len(nodes) == 0: - raise SliverDoesNotExist("You have not allocated any slivers here") - - # get login info - user = {} - if slice['person_ids']: - persons = self.shell.GetPersons(slice['person_ids'], ['key_ids']) - key_ids = [key_id for person in persons for key_id in person['key_ids']] - person_keys = self.shell.GetKeys(key_ids) - keys = [key['key'] for key in person_keys] - - user.update({'urn': slice_urn, - 'login': slice['name'], - 'protocol': ['ssh'], - 'port': ['22'], - 'keys': keys}) - - site_ids = [node['site_id'] for node in nodes] - - result = {} - top_level_status = 'unknown' - if nodes: - top_level_status = 'ready' - result['geni_urn'] = slice_urn - result['pl_login'] = slice['name'] - result['pl_expires'] = datetime_to_string(utcparse(slice['expires'])) - result['geni_expires'] = datetime_to_string(utcparse(slice['expires'])) - - resources = [] - for node in nodes: - res = {} - res['pl_hostname'] = node['hostname'] - res['pl_boot_state'] = node['boot_state'] - res['pl_last_contact'] = node['last_contact'] - res['geni_expires'] = datetime_to_string(utcparse(slice['expires'])) - if node['last_contact'] is not None: - - res['pl_last_contact'] = datetime_to_string(utcparse(node['last_contact'])) - sliver_xrn = Xrn(slice_urn, type='sliver', id=node['node_id']) - sliver_xrn.set_authority(self.hrn) - - res['geni_urn'] = sliver_xrn.urn - if node['boot_state'] == 'boot': - res['geni_status'] = 'ready' - else: - res['geni_status'] = 'failed' - top_level_status = 'failed' - - res['geni_error'] = '' - res['users'] = [user] - - resources.append(res) - - result['geni_status'] = top_level_status - result['geni_resources'] = resources - return result - def create_sliver (self, slice_urn, slice_hrn, creds, rspec_string, users, options): + def describe(self, urns, version, options={}): + aggregate = PlAggregate(self) + return aggregate.describe(urns, version=version, options=options) + + def status (self, urns, options={}): + aggregate = PlAggregate(self) + desc = aggregate.describe(urns) + status = {'geni_urn': desc['geni_urn'], + 'geni_slivers': desc['geni_slivers']} + return status + def allocate (self, urn, rspec_string, expiration, options={}): + xrn = Xrn(urn) aggregate = PlAggregate(self) slices = PlSlices(self) - peer = slices.get_peer(slice_hrn) - sfa_peer = slices.get_sfa_peer(slice_hrn) + peer = slices.get_peer(xrn.get_hrn()) + sfa_peer = slices.get_sfa_peer(xrn.get_hrn()) slice_record=None + users = options.get('geni_users', []) if users: slice_record = users[0].get('slice_record', {}) @@ -728,19 +642,18 @@ class PlDriver (Driver): requested_attributes = rspec.version.get_slice_attributes() # ensure site record exists - site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer, options=options) + site = slices.verify_site(xrn.hrn, slice_record, peer, sfa_peer, options=options) # ensure slice record exists - slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer, options=options) + slice = slices.verify_slice(xrn.hrn, slice_record, peer, sfa_peer, expiration=expiration, options=options) # ensure person records exists - persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer, options=options) + #persons = slices.verify_persons(xrn.hrn, slice, users, peer, sfa_peer, options=options) # ensure slice attributes exists slices.verify_slice_attributes(slice, requested_attributes, options=options) - + # add/remove slice from nodes - requested_slivers = {} - slivers = rspec.version.get_nodes_with_slivers() - nodes = slices.verify_slice_nodes(slice, slivers, peer) - + request_nodes = rspec.version.get_nodes_with_slivers() + nodes = slices.verify_slice_nodes(urn, slice, request_nodes, peer) + # add/remove links links slices.verify_slice_links(slice, rspec.version.get_link_requests(), nodes) @@ -750,84 +663,107 @@ class PlDriver (Driver): leases = slices.verify_slice_leases(slice, rspec_requested_leases, peer) except: pass - #requested_leases = [] - #kept_leases = [] - #for lease in rspec.version.get_leases(): - # requested_lease = {} - # if not lease.get('lease_id'): - # requested_lease['hostname'] = xrn_to_hostname(lease.get('component_id').strip()) - # requested_lease['start_time'] = lease.get('start_time') - # requested_lease['duration'] = lease.get('duration') - # else: - # kept_leases.append(int(lease['lease_id'])) - # if requested_lease.get('hostname'): - # requested_leases.append(requested_lease) - - #leases = slices.verify_slice_leases(slice, requested_leases, kept_leases, peer) - + # handle MyPLC peer association. # only used by plc and ple. - slices.handle_peer(site, slice, persons, peer) + slices.handle_peer(site, slice, None, peer) - return aggregate.get_rspec(slice_xrn=slice_urn, - version=rspec.version) + return aggregate.describe([xrn.get_urn()], version=rspec.version) - def delete_sliver (self, slice_urn, slice_hrn, creds, options): - slicename = hrn_to_pl_slicename(slice_hrn) - slices = self.shell.GetSlices({'name': slicename}) - if not slices: - return True - slice = slices[0] - - # leases - leases = self.shell.GetLeases({'name': slicename}) - leases_ids = [lease['lease_id'] for lease in leases ] - # determine if this is a peer slice - # xxx I wonder if this would not need to use PlSlices.get_peer instead - # in which case plc.peers could be deprecated as this here - # is the only/last call to this last method in plc.peers - peer = peers.get_peer(self, slice_hrn) - try: - if peer: - self.shell.UnBindObjectFromPeer('slice', slice['slice_id'], peer) - self.shell.DeleteSliceFromNodes(slicename, slice['node_ids']) - if len(leases_ids) > 0: - self.shell.DeleteLeases(leases_ids) - finally: - if peer: - self.shell.BindObjectToPeer('slice', slice['slice_id'], peer, slice['peer_slice_id']) - return True - - def renew_sliver (self, slice_urn, slice_hrn, creds, expiration_time, options): - slicename = hrn_to_pl_slicename(slice_hrn) - slices = self.shell.GetSlices({'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(slice_hrn) - slice = slices[0] + def provision(self, urns, options={}): + # update users + slices = PlSlices(self) + aggregate = PlAggregate(self) + slivers = aggregate.get_slivers(urns) + slice = slivers[0] + peer = slices.get_peer(slice['hrn']) + sfa_peer = slices.get_sfa_peer(slice['hrn']) + users = options.get('geni_users', []) + persons = slices.verify_persons(None, slice, users, peer, sfa_peer, options=options) + slices.handle_peer(None, None, persons, peer) + # update sliver allocation states and set them to geni_provisioned + sliver_ids = [sliver['sliver_id'] for sliver in slivers] + SliverAllocation.set_allocations(sliver_ids, 'geni_provisioned') + version_manager = VersionManager() + rspec_version = version_manager.get_version(options['geni_rspec_version']) + return self.describe(urns, rspec_version, options=options) + + def delete(self, urns, options={}): + # collect sliver ids so we can update sliver allocation states after + # we remove the slivers. + aggregate = PlAggregate(self) + slivers = aggregate.get_slivers(urns) + if slivers: + slice_id = slivers[0]['slice_id'] + node_ids = [] + sliver_ids = [] + for sliver in slivers: + node_ids.append(sliver['node_id']) + sliver_ids.append(sliver['sliver_id']) + + # determine if this is a peer slice + # xxx I wonder if this would not need to use PlSlices.get_peer instead + # in which case plc.peers could be deprecated as this here + # is the only/last call to this last method in plc.peers + slice_hrn = PlXrn(auth=self.hrn, slicename=slivers[0]['name']).get_hrn() + peer = peers.get_peer(self, slice_hrn) + try: + if peer: + self.shell.UnBindObjectFromPeer('slice', slice_id, peer) + + self.shell.DeleteSliceFromNodes(slice_id, node_ids) + + # delete sliver allocation states + SliverAllocation.delete_allocations(sliver_ids) + finally: + if peer: + self.shell.BindObjectToPeer('slice', slice_id, peer, slice['peer_slice_id']) + + # prepare return struct + geni_slivers = [] + for sliver in slivers: + geni_slivers.append( + {'geni_sliver_urn': sliver['sliver_id'], + 'geni_allocation_status': 'geni_unallocated', + 'geni_expires': datetime_to_string(utcparse(sliver['expires']))}) + return geni_slivers + + def renew (self, urns, expiration_time, options={}): + aggregate = PlAggregate(self) + slivers = aggregate.get_slivers(urns) + if not slivers: + raise SearchFailed(urns) + slice = slivers[0] requested_time = utcparse(expiration_time) record = {'expires': int(datetime_to_epoch(requested_time))} - try: - self.shell.UpdateSlice(slice['slice_id'], record) - return True - except: - return False + self.shell.UpdateSlice(slice['slice_id'], record) + description = self.describe(urns, None, options) + return description['geni_slivers'] + - # remove the 'enabled' tag - def start_slice (self, slice_urn, slice_hrn, creds): - slicename = hrn_to_pl_slicename(slice_hrn) - slices = self.shell.GetSlices({'name': slicename}, ['slice_id']) - if not slices: - raise RecordNotFound(slice_hrn) - slice_id = slices[0]['slice_id'] - slice_tags = self.shell.GetSliceTags({'slice_id': slice_id, 'tagname': 'enabled'}, ['slice_tag_id']) - # just remove the tag if it exists - if slice_tags: - self.shell.DeleteSliceTag(slice_tags[0]['slice_tag_id']) - return 1 + def perform_operational_action (self, urns, action, options={}): + # MyPLC doesn't support operational actions. Lets pretend like it + # supports start, but reject everything else. + action = action.lower() + if action not in ['geni_start']: + raise UnsupportedOperation(action) + + # fault if sliver is not full allocated (operational status is geni_pending_allocation) + description = self.describe(urns, None, options) + for sliver in description['geni_slivers']: + if sliver['geni_operational_status'] == 'geni_pending_allocation': + raise UnsupportedOperation(action, "Sliver must be fully allocated (operational status is not geni_pending_allocation)") + # + # Perform Operational Action Here + # + + geni_slivers = self.describe(urns, None, options)['geni_slivers'] + return geni_slivers # set the 'enabled' tag to 0 - def stop_slice (self, slice_urn, slice_hrn, creds): - slicename = hrn_to_pl_slicename(slice_hrn) + def shutdown (self, xrn, options={}): + xrn = PlXrn(xrn=xrn, type='slice') + slicename = xrn.pl_slicename() slices = self.shell.GetSlices({'name': slicename}, ['slice_id']) if not slices: raise RecordNotFound(slice_hrn) @@ -839,76 +775,3 @@ class PlDriver (Driver): tag_id = slice_tags[0]['slice_tag_id'] self.shell.UpdateSliceTag(tag_id, '0') return 1 - - def reset_slice (self, slice_urn, slice_hrn, creds): - raise SfaNotImplemented ("reset_slice not available at this interface") - - # xxx this code is quite old and has not run for ages - # it is obviously totally broken and needs a rewrite - def get_ticket (self, slice_urn, slice_hrn, creds, rspec_string, options): - raise SfaNotImplemented,"PlDriver.get_ticket needs a rewrite" -# please keep this code for future reference -# slices = PlSlices(self) -# peer = slices.get_peer(slice_hrn) -# sfa_peer = slices.get_sfa_peer(slice_hrn) -# -# # get the slice record -# credential = api.getCredential() -# interface = api.registries[api.hrn] -# registry = api.server_proxy(interface, credential) -# records = registry.Resolve(xrn, credential) -# -# # make sure we get a local slice record -# record = None -# for tmp_record in records: -# if tmp_record['type'] == 'slice' and \ -# not tmp_record['peer_authority']: -# #Error (E0602, GetTicket): Undefined variable 'SliceRecord' -# slice_record = SliceRecord(dict=tmp_record) -# if not record: -# raise RecordNotFound(slice_hrn) -# -# # similar to CreateSliver, we must verify that the required records exist -# # at this aggregate before we can issue a ticket -# # parse rspec -# rspec = RSpec(rspec_string) -# requested_attributes = rspec.version.get_slice_attributes() -# -# # ensure site record exists -# site = slices.verify_site(slice_hrn, slice_record, peer, sfa_peer) -# # ensure slice record exists -# slice = slices.verify_slice(slice_hrn, slice_record, peer, sfa_peer) -# # ensure person records exists -# # xxx users is undefined in this context -# persons = slices.verify_persons(slice_hrn, slice, users, peer, sfa_peer) -# # ensure slice attributes exists -# slices.verify_slice_attributes(slice, requested_attributes) -# -# # get sliver info -# slivers = slices.get_slivers(slice_hrn) -# -# if not slivers: -# raise SliverDoesNotExist(slice_hrn) -# -# # get initscripts -# initscripts = [] -# data = { -# 'timestamp': int(time.time()), -# 'initscripts': initscripts, -# 'slivers': slivers -# } -# -# # create the ticket -# object_gid = record.get_gid_object() -# new_ticket = SfaTicket(subject = object_gid.get_subject()) -# new_ticket.set_gid_caller(api.auth.client_gid) -# new_ticket.set_gid_object(object_gid) -# new_ticket.set_issuer(key=api.key, subject=self.hrn) -# new_ticket.set_pubkey(object_gid.get_pubkey()) -# new_ticket.set_attributes(data) -# new_ticket.set_rspec(rspec) -# #new_ticket.set_parent(api.auth.hierarchy.get_auth_ticket(auth_hrn)) -# new_ticket.encode() -# new_ticket.sign() -# -# return new_ticket.save_to_string(save_parents=True) diff --git a/sfa/planetlab/plslices.py b/sfa/planetlab/plslices.py index 0d76b593..351335e2 100644 --- a/sfa/planetlab/plslices.py +++ b/sfa/planetlab/plslices.py @@ -4,13 +4,12 @@ from collections import defaultdict from sfa.util.sfatime import utcparse, datetime_to_epoch from sfa.util.sfalogging import logger from sfa.util.xrn import Xrn, get_leaf, get_authority, urn_to_hrn - from sfa.rspecs.rspec import RSpec - from sfa.planetlab.vlink import VLink -from sfa.planetlab.plxrn import PlXrn, hrn_to_pl_slicename, xrn_to_hostname - -import time +from sfa.planetlab.topology import Topology +from sfa.planetlab.plxrn import PlXrn, hrn_to_pl_slicename +from sfa.storage.model import SliverAllocation +from sfa.storage.alchemy import dbsession MAXINT = 2L**31-1 @@ -216,7 +215,19 @@ class PlSlices: return leases - def verify_slice_nodes(self, slice, slivers, peer): + def verify_slice_nodes(self, slice_urn, slice, rspec_nodes, peer): + + slivers = {} + for node in rspec_nodes: + hostname = node.get('component_name') + client_id = node.get('client_id') + component_id = node.get('component_id').strip() + if hostname: + hostname = hostname.strip() + elif component_id: + hostname = xrn_to_hostname(component_id) + if hostname: + slivers[hostname] = {'client_id': client_id, 'component_id': component_id} nodes = self.driver.shell.GetNodes(slice['node_ids'], ['node_id', 'hostname', 'interface_ids']) current_slivers = [node['hostname'] for node in nodes] @@ -238,10 +249,10 @@ class PlSlices: requested_slivers.append(hostname) # remove nodes not in rspec - deleted_nodes = list(set(current_slivers).difference(requested_slivers)) + deleted_nodes = list(set(current_slivers).difference(slivers.keys())) # add nodes from rspec - added_nodes = list(set(requested_slivers).difference(current_slivers)) + added_nodes = list(set(slivers.keys()).difference(current_slivers)) try: if peer: @@ -252,13 +263,21 @@ class PlSlices: except: logger.log_exc('Failed to add/remove slice from nodes') - # add tags - for tag in tags: - try: - self.driver.shell.AddSliceTag(tag['slicename'], tag['tagname'], tag['value'], tag['node']) - except: - logger.log_exc('Failed to add slice tag') - return nodes + slices = self.driver.shell.GetSlices(slice['name'], ['node_ids']) + resulting_nodes = self.driver.shell.GetNodes(slices[0]['node_ids']) + + # update sliver allocations + for node in resulting_nodes: + client_id = slivers[node['hostname']]['client_id'] + component_id = slivers[node['hostname']]['component_id'] + sliver_hrn = '%s.%s-%s' % (self.driver.hrn, slice['slice_id'], node['node_id']) + sliver_id = Xrn(sliver_hrn, type='sliver').urn + record = SliverAllocation(sliver_id=sliver_id, client_id=client_id, + component_id=component_id, + slice_urn = slice_urn, + allocation_state='geni_allocated') + record.sync() + return resulting_nodes def free_egre_key(self): used = set() @@ -275,10 +294,15 @@ class PlSlices: return str(key) def verify_slice_links(self, slice, requested_links, nodes): - # nodes is undefined here + if not requested_links: return - + + # exit if links are not supported here + topology = Topology() + if not topology: + return + # build dict of nodes nodes_dict = {} interface_ids = [] @@ -306,7 +330,11 @@ class PlSlices: for link in requested_links: # get the ip address of the first node in the link ifname1 = Xrn(link['interface1']['component_id']).get_leaf() - (node_raw, device) = ifname1.split(':') + ifname_parts = ifname1.split(':') + node_raw = ifname_parts[0] + device = None + if len(ifname_parts) > 1: + device = ifname_parts[1] node_id = int(node_raw.replace('node', '')) node = nodes_dict[node_id] if1 = interfaces_dict[node['interface_ids'][0]] @@ -395,37 +423,34 @@ class PlSlices: return site - def verify_slice(self, slice_hrn, slice_record, peer, sfa_peer, options={}): + def verify_slice(self, slice_hrn, slice_record, peer, sfa_peer, expiration, options={}): slicename = hrn_to_pl_slicename(slice_hrn) parts = slicename.split("_") login_base = parts[0] slices = self.driver.shell.GetSlices([slicename]) + expires = int(datetime_to_epoch(utcparse(expiration))) if not slices: slice = {'name': slicename, - 'url': slice_record.get('url', slice_hrn), - 'description': slice_record.get('description', slice_hrn)} + 'url': 'No Url', + 'description': 'No Description'} # add the slice slice['slice_id'] = self.driver.shell.AddSlice(slice) slice['node_ids'] = [] slice['person_ids'] = [] - if peer: - slice['peer_slice_id'] = slice_record.get('slice_id', None) - # mark this slice as an sfa peer record -# if sfa_peer: -# peer_dict = {'type': 'slice', 'hrn': slice_hrn, -# 'peer_authority': sfa_peer, 'pointer': slice['slice_id']} -# self.registry.register_peer_object(self.credential, peer_dict) + if peer and slice_record: + slice['peer_slice_id'] = slice_record.get('slice_id', None) + # set the expiration + self.driver.shell.UpdateSlice(slice['slice_id'], {'expires': expires}) else: slice = slices[0] - if peer: + if peer and slice_record: slice['peer_slice_id'] = slice_record.get('slice_id', None) # unbind from peer so we can modify if necessary. Will bind back later self.driver.shell.UnBindObjectFromPeer('slice', slice['slice_id'], peer['shortname']) - #Update existing record (e.g. expires field) it with the latest info. - if slice_record.get('expires'): - requested_expires = int(datetime_to_epoch(utcparse(slice_record['expires']))) - if requested_expires and slice['expires'] != requested_expires: - self.driver.shell.UpdateSlice( slice['slice_id'], {'expires' : requested_expires}) + + #Update expiration if necessary + if slice['expires'] != expires: + self.driver.shell.UpdateSlice( slice['slice_id'], {'expires' : expires}) return slice diff --git a/sfa/rspecs/elements/login.py b/sfa/rspecs/elements/login.py index 99dc5c3b..51741a9b 100644 --- a/sfa/rspecs/elements/login.py +++ b/sfa/rspecs/elements/login.py @@ -5,5 +5,5 @@ class Login(Element): 'authentication', 'hostname', 'port', - 'username' + 'username', ] diff --git a/sfa/rspecs/elements/node.py b/sfa/rspecs/elements/node.py index 7c467f4c..e0f65e40 100644 --- a/sfa/rspecs/elements/node.py +++ b/sfa/rspecs/elements/node.py @@ -1,6 +1,6 @@ from sfa.rspecs.elements.element import Element -class Node(Element): +class NodeElement(Element): fields = [ 'client_id', diff --git a/sfa/rspecs/elements/services.py b/sfa/rspecs/elements/services.py index df0546d4..e159d70c 100644 --- a/sfa/rspecs/elements/services.py +++ b/sfa/rspecs/elements/services.py @@ -1,10 +1,11 @@ from sfa.rspecs.elements.element import Element -class Services(Element): +class ServicesElement(Element): fields = [ 'install', 'execute', 'login', + 'services_user', ] diff --git a/sfa/rspecs/elements/v3/__init__.py b/sfa/rspecs/elements/v3/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sfa/rspecs/elements/v3/node.py b/sfa/rspecs/elements/v3/node.py new file mode 100644 index 00000000..1ddaa737 --- /dev/null +++ b/sfa/rspecs/elements/v3/node.py @@ -0,0 +1,145 @@ +from sfa.util.xrn import Xrn +from sfa.util.xml import XpathFilter + +from sfa.rspecs.elements.node import NodeElement +from sfa.rspecs.elements.sliver import Sliver +from sfa.rspecs.elements.location import Location +from sfa.rspecs.elements.hardware_type import HardwareType +from sfa.rspecs.elements.disk_image import DiskImage +from sfa.rspecs.elements.interface import Interface +from sfa.rspecs.elements.bwlimit import BWlimit +from sfa.rspecs.elements.pltag import PLTag +from sfa.rspecs.elements.v3.services import Services +from sfa.rspecs.elements.versions.pgv2SliverType import PGv2SliverType +from sfa.rspecs.elements.versions.pgv2Interface import PGv2Interface + +from sfa.planetlab.plxrn import xrn_to_hostname + +class Node: + @staticmethod + def add_nodes(xml, nodes): + node_elems = [] + for node in nodes: + node_fields = ['component_manager_id', 'component_id', 'client_id', 'sliver_id', 'exclusive'] + node_elem = xml.add_instance('node', node, node_fields) + node_elems.append(node_elem) + # set component name + if node.get('component_id'): + component_name = xrn_to_hostname(node['component_id']) + node_elem.set('component_name', component_name) + # set hardware types + if node.get('hardware_types'): + for hardware_type in node.get('hardware_types', []): + node_elem.add_instance('hardware_type', hardware_type, HardwareType.fields) + # set location + if node.get('location'): + node_elem.add_instance('location', node['location'], Location.fields) + # set interfaces + PGv2Interface.add_interfaces(node_elem, node.get('interfaces')) + #if node.get('interfaces'): + # for interface in node.get('interfaces', []): + # node_elem.add_instance('interface', interface, ['component_id', 'client_id']) + # set available element + if node.get('available'): + available_elem = node_elem.add_element('available', now=node['available']) + # add services + Services.add_services(node_elem, node.get('services', [])) + # add slivers + slivers = node.get('slivers', []) + if not slivers: + # we must still advertise the available sliver types + slivers = Sliver({'type': 'plab-vserver'}) + # we must also advertise the available initscripts + slivers['tags'] = [] + if node.get('pl_initscripts'): + for initscript in node.get('pl_initscripts', []): + slivers['tags'].append({'name': 'initscript', 'value': initscript['name']}) + PGv2SliverType.add_slivers(node_elem, slivers) + + return node_elems + + @staticmethod + def get_nodes(xml, filter={}): + xpath = '//node%s | //default:node%s' % (XpathFilter.xpath(filter), XpathFilter.xpath(filter)) + node_elems = xml.xpath(xpath) + return Node.get_node_objs(node_elems) + + @staticmethod + def get_nodes_with_slivers(xml, filter={}): + xpath = '//node[count(sliver_type)>0] | //default:node[count(default:sliver_type) > 0]' + node_elems = xml.xpath(xpath) + return Node.get_node_objs(node_elems) + + @staticmethod + def get_node_objs(node_elems): + nodes = [] + for node_elem in node_elems: + node = NodeElement(node_elem.attrib, node_elem) + nodes.append(node) + if 'component_id' in node_elem.attrib: + node['authority_id'] = Xrn(node_elem.attrib['component_id']).get_authority_urn() + + # get hardware types + hardware_type_elems = node_elem.xpath('./default:hardware_type | ./hardware_type') + node['hardware_types'] = [hw_type.get_instance(HardwareType) for hw_type in hardware_type_elems] + + # get location + location_elems = node_elem.xpath('./default:location | ./location') + locations = [location_elem.get_instance(Location) for location_elem in location_elems] + if len(locations) > 0: + node['location'] = locations[0] + + # get interfaces + iface_elems = node_elem.xpath('./default:interface | ./interface') + node['interfaces'] = [iface_elem.get_instance(Interface) for iface_elem in iface_elems] + + # get services + node['services'] = Services.get_services(node_elem) + + # get slivers + node['slivers'] = PGv2SliverType.get_slivers(node_elem) + available_elems = node_elem.xpath('./default:available | ./available') + if len(available_elems) > 0 and 'name' in available_elems[0].attrib: + if available_elems[0].attrib.get('now', '').lower() == 'true': + node['boot_state'] = 'boot' + else: + node['boot_state'] = 'disabled' + return nodes + + + @staticmethod + def add_slivers(xml, slivers): + component_ids = [] + for sliver in slivers: + filter = {} + if isinstance(sliver, str): + filter['component_id'] = '*%s*' % sliver + sliver = {} + elif 'component_id' in sliver and sliver['component_id']: + filter['component_id'] = '*%s*' % sliver['component_id'] + if not filter: + continue + nodes = Node.get_nodes(xml, filter) + if not nodes: + continue + node = nodes[0] + PGv2SliverType.add_slivers(node, sliver) + + @staticmethod + def remove_slivers(xml, hostnames): + for hostname in hostnames: + nodes = Node.get_nodes(xml, {'component_id': '*%s*' % hostname}) + for node in nodes: + slivers = PGv2SliverType.get_slivers(node.element) + for sliver in slivers: + node.element.remove(sliver.element) +if __name__ == '__main__': + from sfa.rspecs.rspec import RSpec + import pdb + r = RSpec('/tmp/emulab.rspec') + r2 = RSpec(version = 'GENI') + nodes = Node.get_nodes(r.xml) + Node.add_nodes(r2.xml.root, nodes) + #pdb.set_trace() + + diff --git a/sfa/rspecs/elements/v3/services.py b/sfa/rspecs/elements/v3/services.py new file mode 100644 index 00000000..72111085 --- /dev/null +++ b/sfa/rspecs/elements/v3/services.py @@ -0,0 +1,60 @@ +from sfa.rspecs.elements.element import Element +from sfa.rspecs.elements.execute import Execute +from sfa.rspecs.elements.install import Install +from sfa.rspecs.elements.services import ServicesElement +from sfa.rspecs.elements.login import Login + +class Services: + @staticmethod + def add_services(xml, services): + if not services: + return + for service in services: + service_elem = xml.add_element('services') + child_elements = {'install': Install.fields, + 'execute': Execute.fields, + 'login': Login.fields} + for (name, fields) in child_elements.items(): + child = service.get(name) + if not child: + continue + if isinstance(child, dict): + service_elem.add_instance(name, child, fields) + elif isinstance(child, list): + for obj in child: + service_elem.add_instance(name, obj, fields) + + # add ssh_users + if service['services_user']: + for ssh_user in service['services_user']: + ssh_user_elem = service_elem.add_element('{%s}services_user' % xml.namespaces['ssh-user'], + login=ssh_user['login'], + user_urn=ssh_user['user_urn']) + for key in ssh_user['keys']: + pkey_elem = ssh_user_elem.add_element('{%s}public_key' % xml.namespaces['ssh-user']) + pkey_elem.element.text=key + + + @staticmethod + def get_services(xml): + services = [] + for services_elem in xml.xpath('./default:services | ./services'): + service = ServicesElement(services_elem.attrib, services_elem) + # get install + install_elems = xml.xpath('./default:install | ./install') + service['install'] = [install_elem.get_instance(Install) for install_elem in install_elems] + # get execute + execute_elems = xml.xpath('./default:execute | ./execute') + service['execute'] = [execute_elem.get_instance(Execute) for execute_elem in execute_elems] + # get login + login_elems = xml.xpath('./default:login | ./login') + service['login'] = [login_elem.get_instance(Login) for login_elem in login_elems] + + ssh_user_elems = xml.xpath('./ssh-user:service_user | ./service_user') + services_user = [] + for ssh_user_elem in ssh_user_elems: + services_user = ssh_user_elem.get_instance(None, fields=['login', 'user_urn']) + service['services_user'] = services_user + services.append(service) + return services + diff --git a/sfa/rspecs/elements/versions/pgv2Node.py b/sfa/rspecs/elements/versions/pgv2Node.py index 4b424038..fb9a9ac5 100644 --- a/sfa/rspecs/elements/versions/pgv2Node.py +++ b/sfa/rspecs/elements/versions/pgv2Node.py @@ -1,7 +1,7 @@ from sfa.util.xrn import Xrn from sfa.util.xml import XpathFilter -from sfa.rspecs.elements.node import Node +from sfa.rspecs.elements.node import NodeElement from sfa.rspecs.elements.sliver import Sliver from sfa.rspecs.elements.location import Location from sfa.rspecs.elements.hardware_type import HardwareType @@ -46,11 +46,8 @@ class PGv2Node: # for interface in node.get('interfaces', []): # node_elem.add_instance('interface', interface, ['component_id', 'client_id']) # set available element - if node.get('boot_state'): - if node.get('boot_state').lower() == 'boot': - available_elem = node_elem.add_element('available', now='true') - else: - available_elem = node_elem.add_element('available', now='false') + if node.get('available'): + available_elem = node_elem.add_element('available', now=node['available']) # add services PGv2Services.add_services(node_elem, node.get('services', [])) # add slivers @@ -82,7 +79,7 @@ class PGv2Node: def get_node_objs(node_elems): nodes = [] for node_elem in node_elems: - node = Node(node_elem.attrib, node_elem) + node = NodeElement(node_elem.attrib, node_elem) nodes.append(node) if 'component_id' in node_elem.attrib: node['authority_id'] = Xrn(node_elem.attrib['component_id']).get_authority_urn() diff --git a/sfa/rspecs/elements/versions/pgv2Services.py b/sfa/rspecs/elements/versions/pgv2Services.py index be1d618e..ff9e9d13 100644 --- a/sfa/rspecs/elements/versions/pgv2Services.py +++ b/sfa/rspecs/elements/versions/pgv2Services.py @@ -1,7 +1,7 @@ from sfa.rspecs.elements.element import Element from sfa.rspecs.elements.execute import Execute from sfa.rspecs.elements.install import Install -from sfa.rspecs.elements.services import Services +from sfa.rspecs.elements.services import ServicesElement from sfa.rspecs.elements.login import Login class PGv2Services: @@ -28,7 +28,7 @@ class PGv2Services: def get_services(xml): services = [] for services_elem in xml.xpath('./default:services | ./services'): - service = Services(services_elem.attrib, services_elem) + service = ServicesElement(services_elem.attrib, services_elem) # get install install_elems = xml.xpath('./default:install | ./install') service['install'] = [install_elem.get_instance(Install) for install_elem in install_elems] diff --git a/sfa/rspecs/elements/versions/sfav1Lease.py b/sfa/rspecs/elements/versions/sfav1Lease.py index 03a43422..d4639042 100644 --- a/sfa/rspecs/elements/versions/sfav1Lease.py +++ b/sfa/rspecs/elements/versions/sfav1Lease.py @@ -3,7 +3,7 @@ from sfa.util.xml import XpathFilter from sfa.util.xrn import Xrn from sfa.rspecs.elements.element import Element -from sfa.rspecs.elements.node import Node +from sfa.rspecs.elements.node import NodeElement from sfa.rspecs.elements.sliver import Sliver from sfa.rspecs.elements.location import Location from sfa.rspecs.elements.hardware_type import HardwareType diff --git a/sfa/rspecs/elements/versions/sfav1Node.py b/sfa/rspecs/elements/versions/sfav1Node.py index 1b509cc0..997f395a 100644 --- a/sfa/rspecs/elements/versions/sfav1Node.py +++ b/sfa/rspecs/elements/versions/sfav1Node.py @@ -3,7 +3,7 @@ from sfa.util.xml import XpathFilter from sfa.util.xrn import Xrn from sfa.rspecs.elements.element import Element -from sfa.rspecs.elements.node import Node +from sfa.rspecs.elements.node import NodeElement from sfa.rspecs.elements.sliver import Sliver from sfa.rspecs.elements.location import Location from sfa.rspecs.elements.hardware_type import HardwareType @@ -138,7 +138,7 @@ class SFAv1Node: def get_node_objs(node_elems): nodes = [] for node_elem in node_elems: - node = Node(node_elem.attrib, node_elem) + node = NodeElement(node_elem.attrib, node_elem) if 'site_id' in node_elem.attrib: node['authority_id'] = node_elem.attrib['site_id'] # get location @@ -160,7 +160,7 @@ class SFAv1Node: # get slivers node['slivers'] = SFAv1Sliver.get_slivers(node_elem) # get tags - node['tags'] = SFAv1PLTag.get_pl_tags(node_elem, ignore=Node.fields+["hardware_type"]) + node['tags'] = SFAv1PLTag.get_pl_tags(node_elem, ignore=NodeElement.fields+["hardware_type"]) # get hardware types hardware_type_elems = node_elem.xpath('./default:hardware_type | ./hardware_type') node['hardware_types'] = [hw_type.get_instance(HardwareType) for hw_type in hardware_type_elems] diff --git a/sfa/rspecs/rspec.py b/sfa/rspecs/rspec.py index e58096a7..83647885 100755 --- a/sfa/rspecs/rspec.py +++ b/sfa/rspecs/rspec.py @@ -95,12 +95,12 @@ class RSpec: def filter(self, filter): if 'component_manager_id' in filter: - nodes = self.version.get_node_elements() + nodes = self.version.get_nodes() for node in nodes: if 'component_manager_id' not in node.attrib or \ node.attrib['component_manager_id'] != filter['component_manager_id']: parent = node.getparent() - parent.remove(node) + parent.remove(node.element) def toxml(self, header=True): diff --git a/sfa/rspecs/version_manager.py b/sfa/rspecs/version_manager.py index 28e98d09..4e60bacb 100644 --- a/sfa/rspecs/version_manager.py +++ b/sfa/rspecs/version_manager.py @@ -28,7 +28,7 @@ class VersionManager: retval = None for version in self.versions: if type is None or type.lower() == version.type.lower(): - if version_num is None or str(version_num) == version.version: + if version_num is None or str(float(version_num)) == str(float(version.version)): if content_type is None or content_type.lower() == version.content_type.lower() \ or version.content_type == '*': retval = version @@ -55,6 +55,8 @@ class VersionManager: retval = self._get_version(type, version_num, content_type) elif isinstance(version, RSpecVersion): retval = version + elif not version: + retval = self.versions[0] else: raise UnsupportedRSpecVersion("No such version: %s "% str(version)) diff --git a/sfa/rspecs/versions/federica.py b/sfa/rspecs/versions/federica.py index 798fc7df..8ff9f5e8 100644 --- a/sfa/rspecs/versions/federica.py +++ b/sfa/rspecs/versions/federica.py @@ -1,17 +1,23 @@ -from sfa.rspecs.versions.pgv2 import PGv2Ad, PGv2Request, PGv2Manifest +from sfa.rspecs.versions.pgv2 import PGv2 -class FedericaAd (PGv2Ad): +class FedericaAd (PGv2): enabled = True + type = 'Fedrica' + content_type = 'ad' schema = 'http://sorch.netmode.ntua.gr/ws/RSpec/ad.xsd' namespace = 'http://sorch.netmode.ntua.gr/ws/RSpec' -class FedericaRequest (PGv2Request): +class FedericaRequest (PGv2): enabled = True + type = 'Fedrica' + content_type = 'request' schema = 'http://sorch.netmode.ntua.gr/ws/RSpec/request.xsd' namespace = 'http://sorch.netmode.ntua.gr/ws/RSpec' -class FedericaManifest (PGv2Manifest): +class FedericaManifest (PGv2): enabled = True + type = 'Fedrica' + content_type = 'manifest' schema = 'http://sorch.netmode.ntua.gr/ws/RSpec/manifest.xsd' namespace = 'http://sorch.netmode.ntua.gr/ws/RSpec' diff --git a/sfa/rspecs/versions/pgv3.py b/sfa/rspecs/versions/pgv3.py index bb036df9..a4413c16 100644 --- a/sfa/rspecs/versions/pgv3.py +++ b/sfa/rspecs/versions/pgv3.py @@ -1,4 +1,5 @@ from sfa.rspecs.versions.pgv2 import PGv2 +from sfa.rspecs.elements.v3.node import Node class GENIv3(PGv2): type = 'GENI' @@ -19,7 +20,27 @@ class GENIv3Ad(GENIv3): enabled = True content_type = 'ad' schema = 'http://www.geni.net/resources/rspec/3/ad.xsd' - template = '' + template = """ + + + + + + Boot the node + + VMs begin powered down or inactive. They + must be explicitly booted before use. + + + The node is up and ready to use. + + + The node has failed and requires administrator + intervention before it can be used. Please contact support + for assistance. + + +""" class GENIv3Request(GENIv3): enabled = True @@ -31,5 +52,11 @@ class GENIv2Manifest(GENIv3): enabled = True content_type = 'manifest' schema = 'http://www.geni.net/resources/rspec/3/manifest.xsd' - template = '' - + template = '' + + + def add_nodes(self, nodes, check_for_dupes=False): + return Node.add_nodes(self.xml, nodes) + + def get_nodes(self, filter=None): + return Node.get_nodes(self.xml, filter) diff --git a/sfa/rspecs/versions/sfav1.py b/sfa/rspecs/versions/sfav1.py index fd2e0313..bdfeb47d 100644 --- a/sfa/rspecs/versions/sfav1.py +++ b/sfa/rspecs/versions/sfav1.py @@ -15,8 +15,8 @@ class SFAv1(RSpecVersion): type = 'SFA' content_type = '*' version = '1' - schema = None - namespace = None + schema = '' + namespace = '' extensions = {} namespaces = None template = '' % type diff --git a/sfa/server/sfaapi.py b/sfa/server/sfaapi.py index 48bd21c4..898fb66b 100644 --- a/sfa/server/sfaapi.py +++ b/sfa/server/sfaapi.py @@ -10,9 +10,8 @@ from sfa.trust.auth import Auth from sfa.trust.certificate import Keypair, Certificate from sfa.trust.credential import Credential from sfa.trust.rights import determine_rights - +from sfa.util.version import version_core from sfa.server.xmlrpcapi import XmlrpcApi - from sfa.client.return_value import ReturnValue @@ -129,7 +128,7 @@ class SfaApi (XmlrpcApi): delegated_cred = None for cred in creds: - if hierarchy.auth_exists(Credential(string=cred).get_gid_caller().get_hrn()): + if hierarchy.auth_exists(Credential(cred=cred).get_gid_caller().get_hrn()): delegated_cred = cred break return delegated_cred @@ -231,9 +230,10 @@ class SfaApi (XmlrpcApi): output = result.faultString return output - def prepare_response_v2_am(self, result): + def prepare_response_am(self, result): + version = version_core() response = { - 'geni_api': 2, + 'geni_api': 3, 'code': self.get_geni_code(result), 'value': self.get_geni_value(result), 'output': self.get_geni_output(result), @@ -247,6 +247,6 @@ class SfaApi (XmlrpcApi): """ # as of dec 13 2011 we only support API v2 if self.interface.lower() in ['aggregate', 'slicemgr']: - result = self.prepare_response_v2_am(result) + result = self.prepare_response_am(result) return XmlrpcApi.prepare_response(self, result, method) diff --git a/sfa/storage/migrations/versions/003_sliver_allocations.py b/sfa/storage/migrations/versions/003_sliver_allocations.py new file mode 100644 index 00000000..21c8511d --- /dev/null +++ b/sfa/storage/migrations/versions/003_sliver_allocations.py @@ -0,0 +1,13 @@ +from sqlalchemy import Table, MetaData, Column, ForeignKey +from sqlalchemy import Integer, String +from sfa.storage.model import SliverAllocation + +metadata=MetaData() + +def upgrade(migrate_engine): + metadata.bind = migrate_engine + SliverAllocation.create() + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + SliverAllocation.drop() diff --git a/sfa/storage/model.py b/sfa/storage/model.py index 16338095..b0950429 100644 --- a/sfa/storage/model.py +++ b/sfa/storage/model.py @@ -1,7 +1,8 @@ from types import StringTypes from datetime import datetime -from sqlalchemy import Integer, String, DateTime +from sqlalchemy import or_, and_ +from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Table, Column, MetaData, join, ForeignKey from sqlalchemy.orm import relationship, backref from sqlalchemy.orm import column_property @@ -311,6 +312,91 @@ class RegKey (Base): result += ">" return result +class SliverAllocation(Base,AlchemyObj): + __tablename__ = 'sliver_allocation' + sliver_id = Column(String, primary_key=True) + client_id = Column(String) + component_id = Column(String) + slice_urn = Column(String) + allocation_state = Column(String) + + def __init__(self, **kwds): + if 'sliver_id' in kwds: + self.sliver_id = kwds['sliver_id'] + if 'client_id' in kwds: + self.client_id = kwds['client_id'] + if 'component_id' in kwds: + self.component_id = kwds['component_id'] + if 'slice_urn' in kwds: + self.slice_urn = kwds['slice_urn'] + if 'allocation_state' in kwds: + self.allocation_state = kwds['allocation_state'] + + def __repr__(self): + result = "