4 Define some common methods for the Syndicate observer.
19 from Crypto.Hash import SHA256 as HashAlg
20 from Crypto.PublicKey import RSA as CryptoKey
21 from Crypto import Random
22 from Crypto.Signature import PKCS1_PSS as CryptoSigner
25 from logging import Logger
26 logging.basicConfig( format='[%(levelname)s] [%(module)s:%(lineno)d] %(message)s' )
27 logger = logging.getLogger()
28 logger.setLevel( logging.INFO )
31 import syndicatelib_config.config as CONFIG
33 # get the Syndicate modules
36 import syndicate.client.bin.syntool as syntool
37 import syndicate.client.common.msconfig as msconfig
38 import syndicate.client.common.api as api
39 import syndicate.util.storage as syndicate_storage
40 import syndicate.util.watchdog as syndicate_watchdog
41 import syndicate.util.daemonize as syndicate_daemon
42 import syndicate.util.crypto as syndicate_crypto
43 import syndicate.util.provisioning as syndicate_provisioning
44 import syndicate.syndicate as c_syndicate
48 class FakeObject(object):
52 if os.getenv("OPENCLOUD_PYTHONPATH") is not None:
53 sys.path.insert(0, os.getenv("OPENCLOUD_PYTHONPATH"))
55 logger.warning("No OPENCLOUD_PYTHONPATH set. Assuming Syndicate models are in your PYTHONPATH")
58 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
61 import syndicate_storage.models as models
63 # get OpenCloud models
64 from core.models import Slice,Sliver
66 from django.core.exceptions import ObjectDoesNotExist
67 from django.db import IntegrityError
69 except ImportError, ie:
70 logger.warning("Failed to import models; some tests will not work")
72 # create a fake "models" package that has exactly the members we need for testing.
74 models.Volume = FakeObject()
75 models.Volume.CAP_READ_DATA = 1
76 models.Volume.CAP_WRITE_DATA = 2
77 models.Volume.CAP_HOST_DATA = 4
82 #-------------------------------
83 class SyndicateObserverError( Exception ):
86 #-------------------------------
89 Return the imported config
94 #-------------------------------
95 def make_openid_url( email ):
97 Generate an OpenID identity URL from an email address.
99 return os.path.join( CONFIG.SYNDICATE_OPENID_TRUSTROOT, "id", email )
102 #-------------------------------
103 def connect_syndicate( username=CONFIG.SYNDICATE_OPENCLOUD_USER, password=CONFIG.SYNDICATE_OPENCLOUD_PASSWORD, user_pkey_pem=CONFIG.SYNDICATE_OPENCLOUD_PKEY ):
105 Connect to the OpenCloud Syndicate SMI, using the OpenCloud user credentials.
108 if hasattr(CONFIG, "DEBUG"):
111 client = syntool.Client( username, CONFIG.SYNDICATE_SMI_URL,
113 user_pkey_pem=user_pkey_pem,
119 #-------------------------------
120 def opencloud_caps_to_syndicate_caps( cap_read, cap_write, cap_host ):
122 Convert OpenCloud capability bits from the UI into Syndicate's capability bits.
127 syn_caps |= (msconfig.GATEWAY_CAP_READ_DATA | msconfig.GATEWAY_CAP_READ_METADATA)
129 syn_caps |= (msconfig.GATEWAY_CAP_WRITE_DATA | msconfig.GATEWAY_CAP_WRITE_METADATA)
131 syn_caps |= (msconfig.GATEWAY_CAP_COORDINATE)
135 #-------------------------------
136 def ensure_user_exists( user_email, **user_kw ):
138 Given an OpenCloud user, ensure that the corresponding
139 Syndicate user exists on the MS. This method does NOT
140 create any OpenCloud-specific data.
142 Return the (created, user), where created==True if the user
143 was created and created==False if the user was read.
144 Raise an exception on error.
147 client = connect_syndicate()
148 user_openid_url = make_openid_url( user_email )
150 return syndicate_provisioning.ensure_user_exists( client, user_email, user_openid_url, **user_kw )
153 #-------------------------------
154 def ensure_user_absent( user_email ):
156 Ensure that a given OpenCloud user's associated Syndicate user record
157 has been deleted. This method does NOT delete any OpenCloud-specific data.
159 Returns True on success
160 Raises an exception on error
163 client = connect_syndicate()
165 return client.delete_user( user_email )
168 #-------------------------------
169 def make_volume_principal_id( user_email, volume_name ):
171 Create a principal id for a Volume owner.
174 volume_name_safe = urllib.quote( volume_name )
176 return "volume_%s.%s" % (volume_name_safe, user_email)
179 #-------------------------------
180 def make_slice_principal_id( user_email, slice_name ):
182 Create a principal id for a slice owner.
185 slice_name_safe = urllib.quote( slice_name )
187 return "slice_%s.%s" % (slice_name, user_email)
190 #-------------------------------
191 def ensure_principal_exists( user_email, observer_secret, **user_kw ):
193 Ensure that a Syndicate user exists, as well as its OpenCloud-specific data.
195 Return (True, (None OR user)) on success. Returns a user if the user was created.
196 Return (False, None) on error
200 created, new_user = ensure_user_exists( user_email, **user_kw )
202 traceback.print_exc()
203 logger.error("Failed to ensure user '%s' exists" % user_email )
206 # if we created a new user, then save its (sealed) credentials to the Django DB
209 rc = put_principal_data( user_email, observer_secret, new_user['signing_public_key'], new_user['signing_private_key'] )
210 assert rc == True, "Failed to save SyndicatePrincipal"
212 traceback.print_exc()
213 logger.error("Failed to save private key for principal %s" % (user_email))
216 return (True, new_user)
220 #-------------------------------
221 def ensure_principal_absent( user_email ):
223 Ensure that a Syndicate user does not exists, and remove the OpenCloud-specific data.
225 Return True on success.
228 ensure_user_absent( user_email )
229 delete_principal_data( user_email )
232 #-------------------------------
233 def ensure_volume_exists( user_email, opencloud_volume, user=None ):
235 Given the email address of a user, ensure that the given
236 Volume exists and is owned by that user.
237 Do not try to ensure that the user exists.
239 Return the Volume if we created it, or return None if we did not.
240 Raise an exception on error.
242 client = connect_syndicate()
245 volume = client.read_volume( opencloud_volume.name )
252 # the volume does not exist....try to create it
253 vol_name = opencloud_volume.name
254 vol_blocksize = opencloud_volume.blocksize
255 vol_description = opencloud_volume.description
256 vol_private = opencloud_volume.private
257 vol_archive = opencloud_volume.archive
258 vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
261 vol_info = client.create_volume( user_email, vol_name, vol_description, vol_blocksize,
265 default_gateway_caps=vol_default_gateway_caps,
266 store_private_key=False,
267 metadata_private_key="MAKE_METADATA_KEY" )
275 # successfully created the volume!
280 # volume already exists. Verify its owned by this user.
283 user = client.read_user( volume['owner_id'] )
285 # transport error, or user doesn't exist (either is unacceptable)
289 if user is None or user['email'] != user_email:
290 raise Exception("Volume '%s' already exists, but is NOT owned by '%s'" % (opencloud_volume.name, user_email) )
296 #-------------------------------
297 def ensure_volume_absent( volume_name ):
299 Given an OpenCloud volume, ensure that the corresponding Syndicate
300 Volume does not exist.
303 client = connect_syndicate()
305 # this is idempotent, and returns True even if the Volume doesn't exist
306 return client.delete_volume( volume_name )
309 #-------------------------------
310 def update_volume( opencloud_volume ):
312 Update a Syndicate Volume from an OpenCloud Volume model.
313 Fails if the Volume does not exist in Syndicate.
316 client = connect_syndicate()
318 vol_name = opencloud_volume.name
319 vol_description = opencloud_volume.description
320 vol_private = opencloud_volume.private
321 vol_archive = opencloud_volume.archive
322 vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
325 rc = client.update_volume( vol_name,
326 description=vol_description,
329 default_gateway_caps=vol_default_gateway_caps )
332 raise Exception("update_volume(%s) failed!" % vol_name )
335 # transort or method error
343 #-------------------------------
344 def ensure_volume_access_right_exists( user_email, volume_name, caps, allowed_gateways=[msconfig.GATEWAY_TYPE_UG] ):
346 Ensure that a particular user has particular access to a particular volume.
347 Do not try to ensure that the user or volume exist, however!
349 client = connect_syndicate()
350 return syndicate_provisioning.ensure_volume_access_right_exists( client, user_email, volume_name, caps, allowed_gateways )
352 #-------------------------------
353 def ensure_volume_access_right_absent( user_email, volume_name ):
355 Ensure that acess to a particular volume is revoked.
357 client = connect_syndicate()
358 return syndicate_provisioning.ensure_volume_access_right_absent( client, user_email, volume_name )
361 #-------------------------------
362 def setup_volume_access( user_email, volume_name, caps, RG_port, slice_secret, RG_closure=None ):
364 Set up the Volume to allow the slice to provision UGs in it, and to fire up RGs.
365 * create the Volume Access Right for the user, so (s)he can create Gateways.
366 * provision a single Replica Gateway, serving on localhost.
368 client = connect_syndicate()
371 rc = ensure_volume_access_right_exists( user_email, volume_name, caps )
372 assert rc is True, "Failed to create access right for %s in %s" % (user_email, volume_name)
378 RG_name = syndicate_provisioning.make_gateway_name( "OpenCloud", "RG", volume_name, "localhost" )
379 RG_key_password = syndicate_provisioning.make_gateway_private_key_password( RG_name, slice_secret )
382 rc = syndicate_provisioning.ensure_RG_exists( client, user_email, volume_name, RG_name, "localhost", RG_port, RG_key_password, closure=RG_closure )
390 #-------------------------------
391 def teardown_volume_access( user_email, volume_name ):
393 Revoke access to a Volume for a User.
394 * remove the user's Volume Access Right
395 * remove the use'rs gateways
397 client = connect_syndicate()
399 # block the user from creating more gateways, and delete the gateways
401 rc = client.remove_user_from_volume( user_email, volume_name )
402 assert rc is True, "Failed to remove access right for %s in %s" % (user_email, volume_name)
411 #-------------------------------
412 def create_sealed_and_signed_blob( private_key_pem, secret, data ):
414 Create a sealed and signed message.
417 # seal it with the password
418 logger.info("Sealing credential data")
420 rc, sealed_data = c_syndicate.password_seal( data, secret )
422 logger.error("Failed to seal data with the secret, rc = %s" % rc)
425 msg = syndicate_crypto.sign_and_serialize_json( private_key_pem, sealed_data )
427 logger.error("Failed to sign credential")
433 #-------------------------------
434 def verify_and_unseal_blob( public_key_pem, secret, blob_data ):
436 verify and unseal a serialized string of JSON
440 rc, sealed_data = syndicate_crypto.verify_and_parse_json( public_key_pem, blob_data )
442 logger.error("Failed to verify and parse blob, rc = %s" % rc)
445 logger.info("Unsealing credential data")
447 rc, data = c_syndicate.password_unseal( sealed_data, secret )
449 logger.error("Failed to unseal blob, rc = %s" % rc )
455 #-------------------------------
456 def create_volume_list_blob( private_key_pem, slice_secret, volume_list ):
458 Create a sealed volume list, signed with the private key.
461 "volumes": volume_list
464 list_data_str = json.dumps( list_data )
466 msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, list_data_str )
468 logger.error("Failed to seal volume list")
474 #-------------------------------
475 def create_slice_credential_blob( private_key_pem, slice_name, slice_secret, syndicate_url, volume_name, volume_owner, UG_port, user_pkey_pem ):
477 Create a sealed, signed, encoded slice credentials blob.
480 # create and serialize the data
482 "syndicate_url": syndicate_url,
483 "volume_name": volume_name,
484 "volume_owner": volume_owner,
485 "slice_name": slice_name,
486 "slice_UG_port": UG_port,
487 "principal_pkey_pem": user_pkey_pem,
490 cred_data_str = json.dumps( cred_data )
492 msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, cred_data_str )
494 logger.error("Failed to seal volume list")
500 #-------------------------------
501 def put_principal_data( user_email, observer_secret, public_key_pem, private_key_pem ):
503 Seal and store the principal's private key into the database, in a SyndicatePrincipal object,
504 so the sliver-side Syndicate daemon syndicated.py can get them later.
505 Overwrite an existing principal if one exists.
508 sealed_private_key = create_sealed_and_signed_blob( private_key_pem, observer_secret, private_key_pem )
509 if sealed_private_key is None:
513 sp = models.SyndicatePrincipal( sealed_private_key=sealed_private_key, public_key_pem=public_key_pem, principal_id=user_email )
515 except IntegrityError, e:
516 logger.error("WARN: overwriting existing principal %s" % user_email)
523 #-------------------------------
524 def delete_principal_data( user_email ):
526 Delete an OpenCloud SyndicatePrincipal object.
529 sp = get_principal_data( user_email )
536 #-------------------------------
537 def get_principal_data( user_email ):
539 Get a SyndicatePrincipal record from the database
543 sp = models.SyndicatePrincipal.objects.get( principal_id=user_email )
545 except ObjectDoesNotExist:
546 logger.error("No SyndicatePrincipal record for %s" % user_email)
551 #-------------------------------
552 def get_principal_pkey( user_email, observer_secret ):
554 Fetch and unseal the private key of a SyndicatePrincipal.
557 sp = get_principal_data( user_email )
559 logger.error("Failed to find private key for principal %s" % user_email )
562 public_key_pem = sp.public_key_pem
563 sealed_private_key_pem = sp.sealed_private_key
566 private_key_pem = verify_and_unseal_blob(public_key_pem, observer_secret, sealed_private_key_pem)
567 if private_key_pem is None:
568 logger.error("Failed to unseal private key")
570 return private_key_pem
573 #-------------------------------
574 def get_private_key_pem( pkey_path ):
576 Get a private key from storage, PEM-encoded.
579 # get the OpenCloud private key
580 observer_pkey = syndicate_storage.read_private_key( pkey_path )
581 if observer_pkey is None:
582 logger.error("Failed to load Observer private key")
585 observer_pkey_pem = observer_pkey.exportKey()
587 return observer_pkey_pem
590 #-------------------------------
591 def encrypt_slice_secret( observer_pkey_pem, slice_secret ):
593 Encrypt and serialize the slice secret with the Observer private key
598 observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
601 logger.error("Failed to derive public key from private key")
605 rc, sealed_slice_secret = c_syndicate.encrypt_data( observer_pkey_pem, observer_pubkey_pem, slice_secret )
608 logger.error("Failed to encrypt slice secret")
611 sealed_slice_secret_b64 = base64.b64encode( sealed_slice_secret )
613 return sealed_slice_secret_b64
616 #-------------------------------
617 def decrypt_slice_secret( observer_pkey_pem, sealed_slice_secret_b64 ):
619 Unserialize and decrypt a slice secret
624 observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
627 logger.error("Failed to derive public key from private key")
630 sealed_slice_secret = base64.b64decode( sealed_slice_secret_b64 )
633 rc, slice_secret = c_syndicate.decrypt_data( observer_pubkey_pem, observer_pkey_pem, sealed_slice_secret )
636 logger.error("Failed to decrypt '%s', rc = %d" % (sealed_slice_secret_b64, rc))
642 #--------------------------------
643 def get_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
645 Get the shared secret for a slice.
650 # get the sealed slice secret from Django
652 if slice_fk is not None:
653 ss = models.SliceSecret.objects.get( slice_id=slice_fk )
655 ss = models.SliceSecret.objects.get( slice_id__name=slice_name )
656 except ObjectDoesNotExist, e:
657 logger.error("Failed to load slice secret for (%s, %s)" % (slice_fk, slice_name) )
663 #-------------------------------
664 def put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=None, opencloud_slice=None ):
666 Put the shared secret for a slice, encrypting it first.
671 if opencloud_slice is None:
675 opencloud_slice = models.Slice.objects.get( name=slice_name )
677 opencloud_slice = models.Slice.objects.get( id=slice_fk.id )
680 logger.error("Failed to load slice (%s, %s)" % (slice_fk, slice_name) )
683 ss = models.SliceSecret( slice_id=opencloud_slice, secret=slice_secret )
690 #-------------------------------
691 def get_or_create_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
693 Get a slice secret if it already exists, or generate a slice secret if one does not.
696 slice_secret = get_slice_secret( observer_pkey_pem, slice_name, slice_fk=slice_fk )
697 if slice_secret is None or len(slice_secret) == 0:
699 # generate a slice secret
700 slice_secret = "".join( random.sample("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 32) )
703 rc = put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=slice_fk )
706 raise SyndicateObserverError("Failed to create slice secret for (%s, %s)" % (slice_fk, slice_name))
711 #-------------------------------
712 def generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
714 Generate and return the set of credentials to be sent off to the slice VMs.
715 exisitng_user is a Syndicate user, as a dictionary.
717 Return None on failure
720 # get the user's private key
721 logger.info("Obtaining private key for %s" % user_email)
723 # it might be in the existing_user...
725 if existing_user is not None:
726 user_pkey_pem = existing_user.get('signing_private_key', None)
729 if user_pkey_pem is None:
731 # get it from Django DB
732 user_pkey_pem = get_principal_pkey( user_email, observer_secret )
733 assert user_pkey_pem is not None, "No private key for %s" % user_email
736 traceback.print_exc()
737 logger.error("Failed to get private key; cannot generate credentials for %s in %s" % (user_email, volume_name) )
740 # generate a credetials blob
741 logger.info("Generating credentials for %s's slice" % (user_email))
743 creds = create_slice_credential_blob( observer_pkey_pem, slice_name, slice_secret, syndicate_url, volume_name, user_email, UG_port, user_pkey_pem )
744 assert creds is not None, "Failed to create credentials for %s" % user_email
747 traceback.print_exc()
748 logger.error("Failed to generate credentials for %s in %s" % (user_email, volume_name))
754 #-------------------------------
755 def save_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
757 Create and save a credentials blob to a VolumeSlice.
758 Return the creds on success.
759 Return None on failure
762 creds = generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=existing_user )
765 if creds is not None:
767 vs = get_volumeslice( volume_name, slice_name )
770 vs.credentials_blob = creds
776 logger.error("Failed to look up VolumeSlice(%s, %s)" % (volume_name, slice_name))
779 logger.error("Failed to generate credentials for %s, %s" % (volume_name, slice_name))
784 #-------------------------------
785 def get_volumeslice_volume_names( slice_name ):
787 Get the list of Volume names from the datastore.
790 all_vs = models.VolumeSlice.objects.filter( slice_id__name = slice_name )
793 volume_names.append( vs.volume_id.name )
798 logger.error("Failed to query datastore for volumes mounted in %s" % slice_name)
802 #-------------------------------
803 def get_volumeslice( volume_name, slice_name ):
805 Get a volumeslice record from the datastore.
808 vs = models.VolumeSlice.objects.get( volume_id__name = volume_name, slice_id__name = slice_name )
812 logger.error("Failed to query datastore for volumes (mounted in %s)" % (slice_name if (slice_name is not None or len(slice_name) > 0) else "UNKNOWN"))
816 #-------------------------------
817 def do_push( sliver_hosts, portnum, payload ):
819 Push a payload to a list of slivers.
820 NOTE: this has to be done in one go, since we can't import grequests
821 into the global namespace (without wrecking havoc on the credential server),
822 but it has to stick around for the push to work.
825 global TESTING, CONFIG
827 from gevent import monkey
833 # make gevents runnabale from multiple threads (or Django will complain)
834 monkey.patch_all(socket=True, dns=True, time=True, select=True, thread=False, os=True, ssl=True, httplib=False, aggressive=True)
840 for sh in sliver_hosts:
841 rs = grequests.post( "http://" + sh + ":" + str(portnum), data={"observer_message": payload}, timeout=getattr(CONFIG, "SYNDICATE_HTTP_PUSH_TIMEOUT", 60) )
842 requests.append( rs )
845 responses = grequests.map( requests )
847 assert len(responses) == len(requests), "grequests error: len(responses) != len(requests)"
849 for i in xrange(0,len(requests)):
854 logger.error("Failed to connect to %s" % (req.url))
857 # verify they all worked
858 if resp.status_code != 200:
859 logger.error("Failed to POST to %s, status code = %s" % (resp.url, resp.status_code))
865 #-------------------------------
866 def get_slice_hostnames( slice_name ):
868 Query the Django DB and get the list of hosts running in a slice.
871 openstack_slice = Slice.objects.get( name=slice_name )
872 if openstack_slice is None:
873 logger.error("No such slice '%s'" % slice_name)
876 hostnames = [s.node.name for s in openstack_slice.slivers.all()]
881 #-------------------------------
882 def push_credentials_to_slice( slice_name, payload ):
884 Push a credentials payload to the VMs in a slice.
886 hostnames = get_slice_hostnames( slice_name )
887 return do_push( hostnames, CONFIG.SYNDICATE_SLIVER_PORT, payload )
890 #-------------------------------
891 class CredentialServerHandler( BaseHTTPServer.BaseHTTPRequestHandler ):
893 HTTP server handler that allows syndicated.py instances to poll
896 NOTE: this is a fall-back mechanism. The observer should push new
897 volume state to the slices' slivers. However, if that fails, the
898 slivers are configured to poll for volume state periodically. This
899 server allows them to do just that.
902 GET /<slicename> -- Reply with the signed sealed list of volume names, encrypted by the slice secret
903 GET /<slicename>/<volumename> -- Reply with the signed sealed volume access credentials, encrypted by the slice secret
906 GET /<slicename>/SYNDICATE_SLICE_SECRET -- Reply with the slice secret (TEMPORARY)
909 NOTE: We want to limit who can learn which Volumes a slice can access, so we'll seal its slivers'
910 credentials with the SliceSecret secret. The slivers (which have the slice-wide secret) can then decrypt it.
911 However, sealing the listing is a time-consuming process (on the order of 10s), so we only want
912 to do it when we have to. Since *anyone* can ask for the ciphertext of the volume list,
913 we will cache the list ciphertext for each slice for a long-ish amount of time, so we don't
914 accidentally DDoS this server. This necessarily means that the sliver might see a stale
915 volume listing, but that's okay, since the Observer is eventually consistent anyway.
918 cached_volumes_json = {} # map slice_name --> (volume name, timeout)
919 cached_volumes_json_lock = threading.Lock()
921 CACHED_VOLUMES_JSON_LIFETIME = 3600 # one hour
923 SLICE_SECRET_NAME = "SYNDICATE_SLICE_SECRET"
925 def parse_request_path( self, path ):
927 Parse the URL path into a slice name and (possibly) a volume name or SLICE_SECRET_NAME
929 path_parts = path.strip("/").split("/")
931 if len(path_parts) == 0:
935 if len(path_parts) > 2:
939 slice_name = path_parts[0]
940 if len(slice_name) == 0:
941 # empty string is invalid
946 if len(path_parts) > 1:
947 volume_name = path_parts[1]
949 return slice_name, volume_name
952 def reply_data( self, data, datatype="application/json" ):
954 Give back a 200 response with data.
956 self.send_response( 200 )
957 self.send_header( "Content-Type", datatype )
958 self.send_header( "Content-Length", len(data) )
961 self.wfile.write( data )
965 def get_volumes_message( self, private_key_pem, observer_secret, slice_name ):
967 Get the json-ized list of volumes this slice is attached to.
968 Check the cache, evict stale data if necessary, and on miss,
969 regenerate the slice volume list.
973 # NOTE: don't release the lock until we've generated credentials.
974 # Chances are, there's a thundering herd of slivers coming online.
975 # Block them all until we've generated their slice's credentials,
976 # and then serve them the cached one.
978 self.cached_volumes_json_lock.acquire()
981 volume_list_json, cache_timeout = self.cached_volumes_json.get( slice_name, (None, None) )
983 if (cache_timeout is not None) and cache_timeout < time.time():
985 volume_list_json = None
987 if volume_list_json is None:
988 # generate a new list and cache it.
990 volume_names = get_volumeslice_volume_names( slice_name )
991 if volume_names is None:
996 # get the slice secret
997 slice_secret = get_slice_secret( private_key_pem, slice_name )
999 if slice_secret is None:
1001 logger.error("No slice secret for %s" % slice_name)
1006 ret = create_volume_list_blob( private_key_pem, slice_secret, volume_names )
1010 self.cached_volumes_json[ slice_name ] = (ret, time.time() + self.CACHED_VOLUMES_JSON_LIFETIME )
1014 ret = volume_list_json
1016 self.cached_volumes_json_lock.release()
1025 slice_name, volume_name = self.parse_request_path( self.path )
1028 if volume_name is None and slice_name is None:
1029 self.send_error( 400 )
1031 # slice secret request?
1032 elif volume_name == self.SLICE_SECRET_NAME and slice_name is not None:
1034 # get the slice secret
1035 ret = get_slice_secret( self.server.private_key_pem, slice_name )
1038 self.reply_data( ret )
1041 self.send_error( 404 )
1043 # volume list request?
1044 elif volume_name is None and slice_name is not None:
1046 # get the list of volumes for this slice
1047 ret = self.get_volumes_message( self.server.private_key_pem, self.server.observer_secret, slice_name )
1050 self.reply_data( ret )
1053 self.send_error( 404 )
1055 # volume credential request?
1056 elif volume_name is not None and slice_name is not None:
1058 # get the VolumeSlice record
1059 vs = get_volumeslice( volume_name, slice_name )
1062 self.send_error( 404 )
1066 ret = vs.credentials_blob
1068 self.reply_data( vs.credentials_blob )
1074 self.send_error( 503 )
1078 # shouldn't get here...
1079 self.send_error( 500 )
1083 #-------------------------------
1084 class CredentialServer( BaseHTTPServer.HTTPServer ):
1086 def __init__(self, private_key_pem, observer_secret, server, req_handler ):
1087 self.private_key_pem = private_key_pem
1088 self.observer_secret = observer_secret
1089 BaseHTTPServer.HTTPServer.__init__( self, server, req_handler )
1092 #-------------------------------
1093 def credential_server_spawn( old_exit_status ):
1095 Start our credential server (i.e. in a separate process, started by the watchdog)
1098 setproctitle.setproctitle( "syndicate-credential-server" )
1100 private_key = syndicate_storage.read_private_key( CONFIG.SYNDICATE_PRIVATE_KEY )
1101 if private_key is None:
1102 # exit code 255 will be ignored...
1103 logger.error("Cannot load private key. Exiting...")
1106 logger.info("Starting Syndicate Observer credential server on port %s" % CONFIG.SYNDICATE_HTTP_PORT)
1108 srv = CredentialServer( private_key.exportKey(), CONFIG.SYNDICATE_OPENCLOUD_SECRET, ('', CONFIG.SYNDICATE_HTTP_PORT), CredentialServerHandler)
1112 #-------------------------------
1113 def ensure_credential_server_running( foreground=False, run_once=False ):
1115 Instantiate our credential server and keep it running.
1118 # is the watchdog running?
1119 pids = syndicate_watchdog.find_by_attrs( "syndicate-credential-server-watchdog", {} )
1128 return credential_server_spawn( 0 )
1131 return syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) )
1134 # not running, and not foregrounding. fork a new one
1136 watchdog_pid = os.fork()
1138 logger.error("Failed to fork, errno = %s" % oe.errno)
1141 if watchdog_pid != 0:
1143 # child--become watchdog
1144 setproctitle.setproctitle( "syndicate-credential-server-watchdog" )
1147 syndicate_daemon.daemonize( lambda: credential_server_spawn(0), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
1150 syndicate_daemon.daemonize( lambda: syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) ), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
1153 #-------------------------------
1154 # Begin functional tests.
1155 # Any method starting with ft_ is a functional test.
1156 #-------------------------------
1158 #-------------------------------
1159 def ft_syndicate_access():
1161 Functional tests for ensuring objects exist and don't exist in Syndicate.
1164 fake_user = FakeObject()
1165 fake_user.email = "fakeuser@opencloud.us"
1167 print "\nensure_user_exists(%s)\n" % fake_user.email
1168 ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
1170 print "\nensure_user_exists(%s)\n" % fake_user.email
1171 ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
1173 fake_volume = FakeObject()
1174 fake_volume.name = "fakevolume"
1175 fake_volume.description = "This is a fake volume, created for funtional testing"
1176 fake_volume.blocksize = 1024
1177 fake_volume.cap_read_data = True
1178 fake_volume.cap_write_data = True
1179 fake_volume.cap_host_data = False
1180 fake_volume.archive = False
1181 fake_volume.private = True
1184 print "\nensure_volume_exists(%s)\n" % fake_volume.name
1185 ensure_volume_exists( fake_user.email, fake_volume )
1187 print "\nensure_volume_exists(%s)\n" % fake_volume.name
1188 ensure_volume_exists( fake_user.email, fake_volume )
1190 print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
1191 ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
1193 print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
1194 ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
1196 print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
1197 ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
1199 print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
1200 ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
1202 print "\nensure_volume_absent(%s)\n" % fake_volume.name
1203 ensure_volume_absent( fake_volume.name )
1205 print "\nensure_volume_absent(%s)\n" % fake_volume.name
1206 ensure_volume_absent( fake_volume.name )
1208 print "\nensure_user_absent(%s)\n" % fake_user.email
1209 ensure_user_absent( fake_user.email )
1211 print "\nensure_user_absent(%s)\n" % fake_user.email
1212 ensure_user_absent( fake_user.email )
1217 print "\nensure_principal_exists(%s)\n" % fake_user.email
1218 ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
1220 print "\nensure_principal_exists(%s)\n" % fake_user.email
1221 ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
1223 print "\nensure_volume_exists(%s)\n" % fake_volume.name
1224 ensure_volume_exists( fake_user.email, fake_volume )
1226 print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
1227 setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
1229 print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
1230 setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
1232 print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
1233 teardown_volume_access( fake_user.email, fake_volume.name )
1235 print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
1236 teardown_volume_access( fake_user.email, fake_volume.name )
1238 print "\nensure_volume_absent(%s)\n" % fake_volume.name
1239 ensure_volume_absent( fake_volume.name )
1241 print "\nensure_principal_absent(%s)\n" % fake_user.email
1242 ensure_principal_absent( fake_user.email )
1246 #-------------------------------
1247 def ft_volumeslice( slice_name ):
1249 Functional tests for reading VolumeSlice information
1251 print "slice: %s" % slice_name
1253 volumes = get_volumeslice_volume_names( slice_name )
1255 print "volumes mounted in slice %s:" % slice_name
1259 vs = get_volumeslice( v, slice_name )
1261 print " %s" % dir(vs)
1264 #-------------------------------
1265 def ft_get_slice_hostnames( slice_name ):
1267 Functional tests for getting slice hostnames
1270 print "Get slice hostnames for %s" % slice_name
1272 hostnames = get_slice_hostnames( slice_name )
1275 pp = pprint.PrettyPrinter()
1277 pp.pprint( hostnames )
1280 #-------------------------------
1281 def ft_syndicate_principal():
1283 Functional tests for creating, reading, and deleting SyndicatePrincipals.
1285 print "generating key pair"
1286 pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
1288 user_email = "fakeuser@opencloud.us"
1290 print "saving principal"
1291 put_principal_data( user_email, "asdf", pubkey_pem, privkey_pem )
1293 print "fetching principal private key"
1294 saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
1296 assert saved_privkey_pem is not None, "Could not fetch saved private key"
1297 assert saved_privkey_pem == privkey_pem, "Saved private key does not match actual private key"
1299 print "delete principal"
1301 delete_principal_data( user_email )
1303 print "make sure its deleted..."
1305 saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
1307 assert saved_privkey_pem is None, "Principal key not deleted"
1310 #-------------------------------
1311 def ft_credential_server():
1313 Functional test for the credential server
1315 ensure_credential_server_running( run_once=True, foreground=True )
1318 #-------------------------------
1319 def ft_seal_and_unseal():
1321 Functional test for sealing/unsealing data
1323 print "generating key pair"
1324 pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
1326 sealed_buf = create_sealed_and_signed_blob( privkey_pem, "foo", "hello world")
1327 print "sealed data is:\n\n%s\n\n" % sealed_buf
1329 buf = verify_and_unseal_blob( pubkey_pem, "foo", sealed_buf )
1330 print "unsealed data is: \n\n%s\n\n" % buf
1333 # run functional tests
1334 if __name__ == "__main__":
1335 sys.path.append("/opt/planetstack")
1336 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
1338 if len(sys.argv) < 2:
1339 print "Usage: %s testname [args]" % sys.argv[0]
1341 # call a method starting with ft_, and then pass the rest of argv as its arguments
1342 testname = sys.argv[1]
1343 ft_testname = "ft_%s" % testname
1345 test_call = "%s(%s)" % (ft_testname, ",".join(sys.argv[2:]))
1347 print "calling %s" % test_call
1349 rc = eval( test_call )
1351 print "result = %s" % rc