Merge branch 'master' of ssh://git.planet-lab.org/git/plstackapi
[plstackapi.git] / planetstack / syndicate_observer / syndicatelib.py
1 #!/usr/bin/python
2
3 """
4 Define some common methods for the Syndicate observer.
5 """
6 import os
7 import sys
8 import random
9 import json
10 import time
11 import requests
12 import traceback
13 import base64
14 import BaseHTTPServer
15 import setproctitle
16 import threading
17 import urllib
18
19 from Crypto.Hash import SHA256 as HashAlg
20 from Crypto.PublicKey import RSA as CryptoKey
21 from Crypto import Random
22 from Crypto.Signature import PKCS1_PSS as CryptoSigner
23
24 import logging
25 from logging import Logger
26 logging.basicConfig( format='[%(levelname)s] [%(module)s:%(lineno)d] %(message)s' )
27 logger = logging.getLogger()
28 logger.setLevel( logging.INFO )
29
30 # get config package 
31 import syndicatelib_config.config as CONFIG
32
33 # get the Syndicate modules
34 import syndicate
35
36 import syndicate.client.bin.syntool as syntool
37 import syndicate.client.common.msconfig as msconfig
38 import syndicate.client.common.api as api
39 import syndicate.util.storage as syndicate_storage
40 import syndicate.util.watchdog as syndicate_watchdog
41 import syndicate.util.daemonize as syndicate_daemon
42 import syndicate.util.crypto as syndicate_crypto
43 import syndicate.util.provisioning as syndicate_provisioning
44 import syndicate.syndicate as c_syndicate
45
46 # for testing 
47 TESTING = False
48 class FakeObject(object):
49    def __init__(self):
50        pass
51
52 if os.getenv("OPENCLOUD_PYTHONPATH") is not None:
53    sys.path.insert(0, os.getenv("OPENCLOUD_PYTHONPATH"))
54 else:
55    logger.warning("No OPENCLOUD_PYTHONPATH set.  Assuming Syndicate models are in your PYTHONPATH")
56
57 try:
58    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
59
60    # get our models
61    import syndicate_storage.models as models
62
63    # get OpenCloud models 
64    from core.models import Slice,Sliver
65    
66    from django.core.exceptions import ObjectDoesNotExist
67    from django.db import IntegrityError
68
69 except ImportError, ie:
70    logger.warning("Failed to import models; some tests will not work")
71
72    # create a fake "models" package that has exactly the members we need for testing.
73    models = FakeObject()
74    models.Volume = FakeObject()
75    models.Volume.CAP_READ_DATA = 1
76    models.Volume.CAP_WRITE_DATA = 2
77    models.Volume.CAP_HOST_DATA = 4
78    
79    TESTING = True
80
81
82 #-------------------------------
83 class SyndicateObserverError( Exception ):
84     pass
85
86 #-------------------------------
87 def get_config():
88     """
89     Return the imported config
90     """
91     return CONFIG
92
93
94 #-------------------------------
95 def make_openid_url( email ):
96     """
97     Generate an OpenID identity URL from an email address.
98     """
99     return os.path.join( CONFIG.SYNDICATE_OPENID_TRUSTROOT, "id", email )
100
101
102 #-------------------------------
103 def connect_syndicate( username=CONFIG.SYNDICATE_OPENCLOUD_USER, password=CONFIG.SYNDICATE_OPENCLOUD_PASSWORD, user_pkey_pem=CONFIG.SYNDICATE_OPENCLOUD_PKEY ):
104     """
105     Connect to the OpenCloud Syndicate SMI, using the OpenCloud user credentials.
106     """
107     debug = True 
108     if hasattr(CONFIG, "DEBUG"):
109        debug = CONFIG.DEBUG
110        
111     client = syntool.Client( username, CONFIG.SYNDICATE_SMI_URL,
112                              password=password,
113                              user_pkey_pem=user_pkey_pem,
114                              debug=debug )
115
116     return client
117
118
119 #-------------------------------
120 def opencloud_caps_to_syndicate_caps( cap_read, cap_write, cap_host ):
121     """
122     Convert OpenCloud capability bits from the UI into Syndicate's capability bits.
123     """
124     syn_caps = 0
125     
126     if cap_read:
127         syn_caps |= (msconfig.GATEWAY_CAP_READ_DATA | msconfig.GATEWAY_CAP_READ_METADATA)
128     if cap_write:
129         syn_caps |= (msconfig.GATEWAY_CAP_WRITE_DATA | msconfig.GATEWAY_CAP_WRITE_METADATA)
130     if cap_host:
131         syn_caps |= (msconfig.GATEWAY_CAP_COORDINATE)
132
133     return syn_caps
134
135 #-------------------------------
136 def ensure_user_exists( user_email, **user_kw ):
137     """
138     Given an OpenCloud user, ensure that the corresponding
139     Syndicate user exists on the MS.  This method does NOT 
140     create any OpenCloud-specific data.
141
142     Return the (created, user), where created==True if the user 
143     was created and created==False if the user was read.
144     Raise an exception on error.
145     """
146     
147     client = connect_syndicate()
148     user_openid_url = make_openid_url( user_email )
149     
150     return syndicate_provisioning.ensure_user_exists( client, user_email, user_openid_url, **user_kw )
151
152
153 #-------------------------------
154 def ensure_user_absent( user_email ):
155     """
156     Ensure that a given OpenCloud user's associated Syndicate user record
157     has been deleted.  This method does NOT delete any OpenCloud-specific data.
158
159     Returns True on success
160     Raises an exception on error
161     """
162
163     client = connect_syndicate()
164
165     return client.delete_user( user_email )
166  
167
168 #-------------------------------
169 def make_volume_principal_id( user_email, volume_name ):
170     """
171     Create a principal id for a Volume owner.
172     """
173     
174     volume_name_safe = urllib.quote( volume_name )
175     
176     return "volume_%s.%s" % (volume_name_safe, user_email)
177  
178  
179 #-------------------------------
180 def make_slice_principal_id( user_email, slice_name ):
181     """
182     Create a principal id for a slice owner.
183     """
184     
185     slice_name_safe = urllib.quote( slice_name )
186     
187     return "slice_%s.%s" % (slice_name, user_email)
188  
189
190 #-------------------------------
191 def ensure_principal_exists( user_email, observer_secret, **user_kw ):
192     """ 
193     Ensure that a Syndicate user exists, as well as its OpenCloud-specific data.
194     
195     Return (True, (None OR user)) on success.  Returns a user if the user was created.
196     Return (False, None) on error
197     """
198     
199     try:
200          created, new_user = ensure_user_exists( user_email, **user_kw )
201     except Exception, e:
202          traceback.print_exc()
203          logger.error("Failed to ensure user '%s' exists" % user_email )
204          return (False, None)
205       
206     # if we created a new user, then save its (sealed) credentials to the Django DB
207     if created:
208          try:
209             rc = put_principal_data( user_email, observer_secret, new_user['signing_public_key'], new_user['signing_private_key'] )
210             assert rc == True, "Failed to save SyndicatePrincipal"
211          except Exception, e:
212             traceback.print_exc()
213             logger.error("Failed to save private key for principal %s" % (user_email))
214             return (False, None)
215
216     return (True, new_user)
217
218
219
220 #-------------------------------
221 def ensure_principal_absent( user_email ):
222     """
223     Ensure that a Syndicate user does not exists, and remove the OpenCloud-specific data.
224     
225     Return True on success.
226     """
227     
228     ensure_user_absent( user_email )
229     delete_principal_data( user_email )
230     return True
231
232 #-------------------------------
233 def ensure_volume_exists( user_email, opencloud_volume, user=None ):
234     """
235     Given the email address of a user, ensure that the given
236     Volume exists and is owned by that user.
237     Do not try to ensure that the user exists.
238
239     Return the Volume if we created it, or return None if we did not.
240     Raise an exception on error.
241     """
242     client = connect_syndicate()
243
244     try:
245         volume = client.read_volume( opencloud_volume.name )
246     except Exception, e:
247         # transport error 
248         logger.exception(e)
249         raise e
250
251     if volume is None:
252         # the volume does not exist....try to create it 
253         vol_name = opencloud_volume.name
254         vol_blocksize = opencloud_volume.blocksize
255         vol_description = opencloud_volume.description
256         vol_private = opencloud_volume.private
257         vol_archive = opencloud_volume.archive 
258         vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
259
260         try:
261             vol_info = client.create_volume( user_email, vol_name, vol_description, vol_blocksize,
262                                              private=vol_private,
263                                              archive=vol_archive,
264                                              active=True,
265                                              default_gateway_caps=vol_default_gateway_caps,
266                                              store_private_key=False,
267                                              metadata_private_key="MAKE_METADATA_KEY" )
268
269         except Exception, e:
270             # transport error
271             logger.exception(e)
272             raise e
273
274         else:
275             # successfully created the volume!
276             return vol_info
277
278     else:
279         
280         # volume already exists.  Verify its owned by this user.
281         if user is None:
282            try:
283                user = client.read_user( volume['owner_id'] )
284            except Exception, e:
285                # transport error, or user doesn't exist (either is unacceptable)
286                logger.exception(e)
287                raise e
288
289         if user is None or user['email'] != user_email:
290             raise Exception("Volume '%s' already exists, but is NOT owned by '%s'" % (opencloud_volume.name, user_email) )
291
292         # we're good!
293         return None
294
295
296 #-------------------------------
297 def ensure_volume_absent( volume_name ):
298     """
299     Given an OpenCloud volume, ensure that the corresponding Syndicate
300     Volume does not exist.
301     """
302
303     client = connect_syndicate()
304
305     # this is idempotent, and returns True even if the Volume doesn't exist
306     return client.delete_volume( volume_name )
307     
308     
309 #-------------------------------
310 def update_volume( opencloud_volume ):
311     """
312     Update a Syndicate Volume from an OpenCloud Volume model.
313     Fails if the Volume does not exist in Syndicate.
314     """
315
316     client = connect_syndicate()
317
318     vol_name = opencloud_volume.name
319     vol_description = opencloud_volume.description
320     vol_private = opencloud_volume.private
321     vol_archive = opencloud_volume.archive
322     vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
323
324     try:
325         rc = client.update_volume( vol_name,
326                                    description=vol_description,
327                                    private=vol_private,
328                                    archive=vol_archive,
329                                    default_gateway_caps=vol_default_gateway_caps )
330
331         if not rc:
332             raise Exception("update_volume(%s) failed!" % vol_name )
333
334     except Exception, e:
335         # transort or method error 
336         logger.exception(e)
337         return False
338
339     else:
340         return True
341
342
343 #-------------------------------
344 def ensure_volume_access_right_exists( user_email, volume_name, caps, allowed_gateways=[msconfig.GATEWAY_TYPE_UG] ):
345     """
346     Ensure that a particular user has particular access to a particular volume.
347     Do not try to ensure that the user or volume exist, however!
348     """
349     client = connect_syndicate()
350     return syndicate_provisioning.ensure_volume_access_right_exists( client, user_email, volume_name, caps, allowed_gateways )
351
352 #-------------------------------
353 def ensure_volume_access_right_absent( user_email, volume_name ):
354     """
355     Ensure that acess to a particular volume is revoked.
356     """
357     client = connect_syndicate()
358     return syndicate_provisioning.ensure_volume_access_right_absent( client, user_email, volume_name )
359     
360
361 #-------------------------------
362 def setup_volume_access( user_email, volume_name, caps, RG_port, slice_secret, RG_closure=None ):
363     """
364     Set up the Volume to allow the slice to provision UGs in it, and to fire up RGs.
365        * create the Volume Access Right for the user, so (s)he can create Gateways.
366        * provision a single Replica Gateway, serving on localhost.
367     """
368     client = connect_syndicate()
369     
370     try:
371        rc = ensure_volume_access_right_exists( user_email, volume_name, caps )
372        assert rc is True, "Failed to create access right for %s in %s" % (user_email, volume_name)
373        
374     except Exception, e:
375        logger.exception(e)
376        return False
377     
378     RG_name = syndicate_provisioning.make_gateway_name( "OpenCloud", "RG", volume_name, "localhost" )
379     RG_key_password = syndicate_provisioning.make_gateway_private_key_password( RG_name, slice_secret )
380     
381     try:
382        rc = syndicate_provisioning.ensure_RG_exists( client, user_email, volume_name, RG_name, "localhost", RG_port, RG_key_password, closure=RG_closure )
383     except Exception, e:
384        logger.exception(e)
385        return False
386     
387     return True
388        
389
390 #-------------------------------
391 def teardown_volume_access( user_email, volume_name ):
392     """
393     Revoke access to a Volume for a User.
394       * remove the user's Volume Access Right
395       * remove the use'rs gateways
396     """
397     client = connect_syndicate()
398     
399     # block the user from creating more gateways, and delete the gateways
400     try:
401        rc = client.remove_user_from_volume( user_email, volume_name )
402        assert rc is True, "Failed to remove access right for %s in %s" % (user_email, volume_name)
403        
404     except Exception, e:
405        logger.exception(e)
406        return False
407     
408     return True
409     
410
411 #-------------------------------
412 def create_sealed_and_signed_blob( private_key_pem, secret, data ):
413     """
414     Create a sealed and signed message.
415     """
416     
417     # seal it with the password 
418     logger.info("Sealing credential data")
419     
420     rc, sealed_data = c_syndicate.password_seal( data, secret )
421     if rc != 0:
422        logger.error("Failed to seal data with the secret, rc = %s" % rc)
423        return None
424     
425     msg = syndicate_crypto.sign_and_serialize_json( private_key_pem, sealed_data )
426     if msg is None:
427        logger.error("Failed to sign credential")
428        return None 
429     
430     return msg 
431
432
433 #-------------------------------
434 def verify_and_unseal_blob( public_key_pem, secret, blob_data ):
435     """
436     verify and unseal a serialized string of JSON
437     """
438
439     # verify it 
440     rc, sealed_data = syndicate_crypto.verify_and_parse_json( public_key_pem, blob_data )
441     if rc != 0:
442         logger.error("Failed to verify and parse blob, rc = %s" % rc)
443         return None
444
445     logger.info("Unsealing credential data")
446
447     rc, data = c_syndicate.password_unseal( sealed_data, secret )
448     if rc != 0:
449         logger.error("Failed to unseal blob, rc = %s" % rc )
450         return None
451
452     return data
453
454
455 #-------------------------------
456 def create_volume_list_blob( private_key_pem, slice_secret, volume_list ):
457     """
458     Create a sealed volume list, signed with the private key.
459     """
460     list_data = {
461        "volumes": volume_list
462     }
463     
464     list_data_str = json.dumps( list_data )
465     
466     msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, list_data_str )
467     if msg is None:
468        logger.error("Failed to seal volume list")
469        return None 
470     
471     return msg
472  
473
474 #-------------------------------
475 def create_slice_credential_blob( private_key_pem, slice_name, slice_secret, syndicate_url, volume_name, volume_owner, UG_port, user_pkey_pem ):
476     """
477     Create a sealed, signed, encoded slice credentials blob.
478     """
479     
480     # create and serialize the data 
481     cred_data = {
482        "syndicate_url":   syndicate_url,
483        "volume_name":     volume_name,
484        "volume_owner":    volume_owner,
485        "slice_name":      slice_name,
486        "slice_UG_port":   UG_port,
487        "principal_pkey_pem": user_pkey_pem,
488     }
489     
490     cred_data_str = json.dumps( cred_data )
491     
492     msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, cred_data_str )
493     if msg is None:
494        logger.error("Failed to seal volume list")
495        return None 
496     
497     return msg 
498
499
500 #-------------------------------
501 def put_principal_data( user_email, observer_secret, public_key_pem, private_key_pem ):
502     """
503     Seal and store the principal's private key into the database, in a SyndicatePrincipal object,
504     so the sliver-side Syndicate daemon syndicated.py can get them later.
505     Overwrite an existing principal if one exists.
506     """
507     
508     sealed_private_key = create_sealed_and_signed_blob( private_key_pem, observer_secret, private_key_pem )
509     if sealed_private_key is None:
510         return False
511
512     try:
513        sp = models.SyndicatePrincipal( sealed_private_key=sealed_private_key, public_key_pem=public_key_pem, principal_id=user_email )
514        sp.save()
515     except IntegrityError, e:
516        logger.error("WARN: overwriting existing principal %s" % user_email)
517        sp.delete()
518        sp.save()
519     
520     return True
521
522
523 #-------------------------------
524 def delete_principal_data( user_email ):
525     """
526     Delete an OpenCloud SyndicatePrincipal object.
527     """
528     
529     sp = get_principal_data( user_email )
530     if sp is not None:
531       sp.delete()
532     
533     return True
534
535
536 #-------------------------------
537 def get_principal_data( user_email ):
538     """
539     Get a SyndicatePrincipal record from the database 
540     """
541     
542     try:
543         sp = models.SyndicatePrincipal.objects.get( principal_id=user_email )
544         return sp
545     except ObjectDoesNotExist:
546         logger.error("No SyndicatePrincipal record for %s" % user_email)
547         return None
548     
549
550
551 #-------------------------------
552 def get_principal_pkey( user_email, observer_secret ):
553     """
554     Fetch and unseal the private key of a SyndicatePrincipal.
555     """
556     
557     sp = get_principal_data( user_email )
558     if sp is None:
559         logger.error("Failed to find private key for principal %s" % user_email )
560         return None 
561      
562     public_key_pem = sp.public_key_pem
563     sealed_private_key_pem = sp.sealed_private_key
564
565     # unseal
566     private_key_pem = verify_and_unseal_blob(public_key_pem, observer_secret, sealed_private_key_pem)
567     if private_key_pem is None:
568         logger.error("Failed to unseal private key")
569
570     return private_key_pem
571
572
573 #-------------------------------
574 def get_private_key_pem( pkey_path ):
575     """
576     Get a private key from storage, PEM-encoded.
577     """
578     
579     # get the OpenCloud private key 
580     observer_pkey = syndicate_storage.read_private_key( pkey_path )
581     if observer_pkey is None:
582        logger.error("Failed to load Observer private key")
583        return None
584     
585     observer_pkey_pem = observer_pkey.exportKey()
586     
587     return observer_pkey_pem
588
589
590 #-------------------------------
591 def encrypt_slice_secret( observer_pkey_pem, slice_secret ):
592     """
593     Encrypt and serialize the slice secret with the Observer private key
594     """
595     
596     # get the public key
597     try:
598        observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
599     except Exception, e:
600        logger.exception(e)
601        logger.error("Failed to derive public key from private key")
602        return None 
603     
604     # encrypt the data 
605     rc, sealed_slice_secret = c_syndicate.encrypt_data( observer_pkey_pem, observer_pubkey_pem, slice_secret )
606     
607     if rc != 0:
608        logger.error("Failed to encrypt slice secret")
609        return None 
610     
611     sealed_slice_secret_b64 = base64.b64encode( sealed_slice_secret )
612     
613     return sealed_slice_secret_b64
614     
615
616 #-------------------------------
617 def decrypt_slice_secret( observer_pkey_pem, sealed_slice_secret_b64 ):
618     """
619     Unserialize and decrypt a slice secret
620     """
621         
622     # get the public key
623     try:
624        observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
625     except Exception, e:
626        logger.exception(e)
627        logger.error("Failed to derive public key from private key")
628        return None 
629     
630     sealed_slice_secret = base64.b64decode( sealed_slice_secret_b64 )
631     
632     # decrypt it 
633     rc, slice_secret = c_syndicate.decrypt_data( observer_pubkey_pem, observer_pkey_pem, sealed_slice_secret )
634     
635     if rc != 0:
636        logger.error("Failed to decrypt '%s', rc = %d" % (sealed_slice_secret_b64, rc))
637        return None
638     
639     return slice_secret
640  
641
642 #--------------------------------
643 def get_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
644     """
645     Get the shared secret for a slice.
646     """
647     
648     ss = None 
649     
650     # get the sealed slice secret from Django
651     try:
652        if slice_fk is not None:
653           ss = models.SliceSecret.objects.get( slice_id=slice_fk )
654        else:
655           ss = models.SliceSecret.objects.get( slice_id__name=slice_name )
656     except ObjectDoesNotExist, e:
657        logger.error("Failed to load slice secret for (%s, %s)" % (slice_fk, slice_name) )
658        return None 
659
660     return ss.secret 
661  
662
663 #-------------------------------
664 def put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=None, opencloud_slice=None ):
665     """
666     Put the shared secret for a slice, encrypting it first.
667     """
668     
669     ss = None 
670     
671     if opencloud_slice is None:
672        # look up the slice 
673        try:
674           if slice_fk is None:
675              opencloud_slice = models.Slice.objects.get( name=slice_name )
676           else:
677              opencloud_slice = models.Slice.objects.get( id=slice_fk.id )
678        except Exception, e:
679           logger.exception(e)
680           logger.error("Failed to load slice (%s, %s)" % (slice_fk, slice_name) )
681           return False 
682     
683     ss = models.SliceSecret( slice_id=opencloud_slice, secret=slice_secret )
684     
685     ss.save()
686     
687     return True
688
689
690 #-------------------------------
691 def get_or_create_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
692    """
693    Get a slice secret if it already exists, or generate a slice secret if one does not.
694    """
695    
696    slice_secret = get_slice_secret( observer_pkey_pem, slice_name, slice_fk=slice_fk )
697    if slice_secret is None or len(slice_secret) == 0:
698       
699       # generate a slice secret 
700       slice_secret = "".join( random.sample("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 32) )
701       
702       # store it 
703       rc = put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=slice_fk )
704       
705       if not rc:
706          raise SyndicateObserverError("Failed to create slice secret for (%s, %s)" % (slice_fk, slice_name))
707       
708    return slice_secret
709
710
711 #-------------------------------
712 def generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
713     """
714     Generate and return the set of credentials to be sent off to the slice VMs.
715     exisitng_user is a Syndicate user, as a dictionary.
716     
717     Return None on failure
718     """
719     
720     # get the user's private key 
721     logger.info("Obtaining private key for %s" % user_email)
722     
723     # it might be in the existing_user...
724     user_pkey_pem = None
725     if existing_user is not None:
726        user_pkey_pem = existing_user.get('signing_private_key', None)
727        
728     # no luck?
729     if user_pkey_pem is None:
730       try:
731          # get it from Django DB
732          user_pkey_pem = get_principal_pkey( user_email, observer_secret )
733          assert user_pkey_pem is not None, "No private key for %s" % user_email
734          
735       except:
736          traceback.print_exc()
737          logger.error("Failed to get private key; cannot generate credentials for %s in %s" % (user_email, volume_name) )
738          return None
739     
740     # generate a credetials blob 
741     logger.info("Generating credentials for %s's slice" % (user_email))
742     try:
743        creds = create_slice_credential_blob( observer_pkey_pem, slice_name, slice_secret, syndicate_url, volume_name, user_email, UG_port, user_pkey_pem )
744        assert creds is not None, "Failed to create credentials for %s" % user_email 
745     
746     except:
747        traceback.print_exc()
748        logger.error("Failed to generate credentials for %s in %s" % (user_email, volume_name))
749        return None
750     
751     return creds
752
753
754 #-------------------------------
755 def save_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ): 
756     """
757     Create and save a credentials blob to a VolumeSlice.
758     Return the creds on success.
759     Return None on failure
760     """
761     
762     creds = generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=existing_user )
763     ret = None
764     
765     if creds is not None:
766        # save it 
767        vs = get_volumeslice( volume_name, slice_name )
768        
769        if vs is not None:
770           vs.credentials_blob = creds
771           vs.save()
772           
773           # success!
774           ret = creds
775        else:
776           logger.error("Failed to look up VolumeSlice(%s, %s)" % (volume_name, slice_name))
777        
778     else:
779        logger.error("Failed to generate credentials for %s, %s" % (volume_name, slice_name))
780        
781     return ret
782
783
784 #-------------------------------
785 def get_volumeslice_volume_names( slice_name ):
786     """
787     Get the list of Volume names from the datastore.
788     """
789     try:
790         all_vs = models.VolumeSlice.objects.filter( slice_id__name = slice_name )
791         volume_names = []
792         for vs in all_vs:
793            volume_names.append( vs.volume_id.name )
794            
795         return volume_names
796     except Exception, e:
797         logger.exception(e)
798         logger.error("Failed to query datastore for volumes mounted in %s" % slice_name)
799         return None 
800  
801
802 #-------------------------------
803 def get_volumeslice( volume_name, slice_name ):
804     """
805     Get a volumeslice record from the datastore.
806     """
807     try:
808         vs = models.VolumeSlice.objects.get( volume_id__name = volume_name, slice_id__name = slice_name )
809         return vs
810     except Exception, e:
811         logger.exception(e)
812         logger.error("Failed to query datastore for volumes (mounted in %s)" % (slice_name if (slice_name is not None or len(slice_name) > 0) else "UNKNOWN"))
813         return None 
814
815
816 #-------------------------------
817 def do_push( sliver_hosts, portnum, payload ):
818     """
819     Push a payload to a list of slivers.
820     NOTE: this has to be done in one go, since we can't import grequests
821     into the global namespace (without wrecking havoc on the credential server),
822     but it has to stick around for the push to work.
823     """
824     
825     global TESTING, CONFIG
826     
827     from gevent import monkey
828     
829     if TESTING:
830        monkey.patch_all()
831     
832     else:
833        # make gevents runnabale from multiple threads (or Django will complain)
834        monkey.patch_all(socket=True, dns=True, time=True, select=True, thread=False, os=True, ssl=True, httplib=False, aggressive=True)
835     
836     import grequests
837     
838     # fan-out 
839     requests = []
840     for sh in sliver_hosts:
841       rs = grequests.post( "http://" + sh + ":" + str(portnum), data={"observer_message": payload}, timeout=getattr(CONFIG, "SYNDICATE_HTTP_PUSH_TIMEOUT", 60) )
842       requests.append( rs )
843       
844     # fan-in
845     responses = grequests.map( requests )
846     
847     assert len(responses) == len(requests), "grequests error: len(responses) != len(requests)"
848     
849     for i in xrange(0,len(requests)):
850        resp = responses[i]
851        req = requests[i]
852        
853        if resp is None:
854           logger.error("Failed to connect to %s" % (req.url))
855           continue 
856        
857        # verify they all worked 
858        if resp.status_code != 200:
859           logger.error("Failed to POST to %s, status code = %s" % (resp.url, resp.status_code))
860           continue
861           
862     return True
863    
864
865 #-------------------------------
866 def get_slice_hostnames( slice_name ):
867    """
868    Query the Django DB and get the list of hosts running in a slice.
869    """
870
871    openstack_slice = Slice.objects.get( name=slice_name )
872    if openstack_slice is None:
873        logger.error("No such slice '%s'" % slice_name)
874        return None
875
876    hostnames = [s.node.name for s in openstack_slice.slivers.all()]
877
878    return hostnames
879
880    
881 #-------------------------------
882 def push_credentials_to_slice( slice_name, payload ):
883    """
884    Push a credentials payload to the VMs in a slice.
885    """
886    hostnames = get_slice_hostnames( slice_name )
887    return do_push( hostnames, CONFIG.SYNDICATE_SLIVER_PORT, payload )
888
889    
890 #-------------------------------
891 class CredentialServerHandler( BaseHTTPServer.BaseHTTPRequestHandler ):
892    """
893    HTTP server handler that allows syndicated.py instances to poll
894    for volume state.
895    
896    NOTE: this is a fall-back mechanism.  The observer should push new 
897    volume state to the slices' slivers.  However, if that fails, the 
898    slivers are configured to poll for volume state periodically.  This 
899    server allows them to do just that.
900    
901    Responses:
902       GET /<slicename>              -- Reply with the signed sealed list of volume names, encrypted by the slice secret
903       GET /<slicename>/<volumename> -- Reply with the signed sealed volume access credentials, encrypted by the slice secret
904       
905       !!! TEMPORARY !!!
906       GET /<slicename>/SYNDICATE_SLICE_SECRET    -- Reply with the slice secret (TEMPORARY)
907    
908    
909    NOTE: We want to limit who can learn which Volumes a slice can access, so we'll seal its slivers'
910    credentials with the SliceSecret secret.  The slivers (which have the slice-wide secret) can then decrypt it.
911    However, sealing the listing is a time-consuming process (on the order of 10s), so we only want 
912    to do it when we have to.  Since *anyone* can ask for the ciphertext of the volume list,
913    we will cache the list ciphertext for each slice for a long-ish amount of time, so we don't
914    accidentally DDoS this server.  This necessarily means that the sliver might see a stale
915    volume listing, but that's okay, since the Observer is eventually consistent anyway.
916    """
917    
918    cached_volumes_json = {}             # map slice_name --> (volume name, timeout)
919    cached_volumes_json_lock = threading.Lock()
920    
921    CACHED_VOLUMES_JSON_LIFETIME = 3600          # one hour
922    
923    SLICE_SECRET_NAME = "SYNDICATE_SLICE_SECRET"
924    
925    def parse_request_path( self, path ):
926       """
927       Parse the URL path into a slice name and (possibly) a volume name or SLICE_SECRET_NAME
928       """
929       path_parts = path.strip("/").split("/")
930       
931       if len(path_parts) == 0:
932          # invalid 
933          return (None, None)
934       
935       if len(path_parts) > 2:
936          # invalid
937          return (None, None)
938       
939       slice_name = path_parts[0]
940       if len(slice_name) == 0:
941          # empty string is invalid 
942          return (None, None)
943       
944       volume_name = None
945       
946       if len(path_parts) > 1:
947          volume_name = path_parts[1]
948          
949       return slice_name, volume_name
950    
951    
952    def reply_data( self, data, datatype="application/json" ):
953       """
954       Give back a 200 response with data.
955       """
956       self.send_response( 200 )
957       self.send_header( "Content-Type", datatype )
958       self.send_header( "Content-Length", len(data) )
959       self.end_headers()
960       
961       self.wfile.write( data )
962       return 
963    
964    
965    def get_volumes_message( self, private_key_pem, observer_secret, slice_name ):
966       """
967       Get the json-ized list of volumes this slice is attached to.
968       Check the cache, evict stale data if necessary, and on miss, 
969       regenerate the slice volume list.
970       """
971       
972       # block the cache.
973       # NOTE: don't release the lock until we've generated credentials.
974       # Chances are, there's a thundering herd of slivers coming online.
975       # Block them all until we've generated their slice's credentials,
976       # and then serve them the cached one.
977       
978       self.cached_volumes_json_lock.acquire()
979       
980       ret = None
981       volume_list_json, cache_timeout = self.cached_volumes_json.get( slice_name, (None, None) )
982       
983       if (cache_timeout is not None) and cache_timeout < time.time():
984          # expired
985          volume_list_json = None
986       
987       if volume_list_json is None:
988          # generate a new list and cache it.
989          
990          volume_names = get_volumeslice_volume_names( slice_name )
991          if volume_names is None:
992             # nothing to do...
993             ret = None
994          
995          else:
996             # get the slice secret 
997             slice_secret = get_slice_secret( private_key_pem, slice_name )
998             
999             if slice_secret is None:
1000                # no such slice 
1001                logger.error("No slice secret for %s" % slice_name)
1002                ret = None
1003             
1004             else:
1005                # seal and sign 
1006                ret = create_volume_list_blob( private_key_pem, slice_secret, volume_names )
1007          
1008          # cache this 
1009          if ret is not None:
1010             self.cached_volumes_json[ slice_name ] = (ret, time.time() + self.CACHED_VOLUMES_JSON_LIFETIME )
1011       
1012       else:
1013          # hit the cache
1014          ret = volume_list_json
1015       
1016       self.cached_volumes_json_lock.release()
1017       
1018       return ret
1019       
1020    
1021    def do_GET( self ):
1022       """
1023       Handle one GET
1024       """
1025       slice_name, volume_name = self.parse_request_path( self.path )
1026       
1027       # valid request?
1028       if volume_name is None and slice_name is None:
1029          self.send_error( 400 )
1030       
1031       # slice secret request?
1032       elif volume_name == self.SLICE_SECRET_NAME and slice_name is not None:
1033          
1034          # get the slice secret 
1035          ret = get_slice_secret( self.server.private_key_pem, slice_name )
1036          
1037          if ret is not None:
1038             self.reply_data( ret )
1039             return 
1040          else:
1041             self.send_error( 404 )
1042       
1043       # volume list request?
1044       elif volume_name is None and slice_name is not None:
1045          
1046          # get the list of volumes for this slice
1047          ret = self.get_volumes_message( self.server.private_key_pem, self.server.observer_secret, slice_name )
1048          
1049          if ret is not None:
1050             self.reply_data( ret )
1051             return
1052          else:
1053             self.send_error( 404 )
1054       
1055       # volume credential request?
1056       elif volume_name is not None and slice_name is not None:
1057          
1058          # get the VolumeSlice record
1059          vs = get_volumeslice( volume_name, slice_name )
1060          if vs is None:
1061             # not found
1062             self.send_error( 404 )
1063             return
1064          
1065          else:
1066             ret = vs.credentials_blob 
1067             if ret is not None:
1068                self.reply_data( vs.credentials_blob )
1069             else:
1070                # not generated???
1071                print ""
1072                print vs
1073                print ""
1074                self.send_error( 503 )
1075             return
1076          
1077       else:
1078          # shouldn't get here...
1079          self.send_error( 500 )
1080          return 
1081    
1082    
1083 #-------------------------------
1084 class CredentialServer( BaseHTTPServer.HTTPServer ):
1085    
1086    def __init__(self, private_key_pem, observer_secret, server, req_handler ):
1087       self.private_key_pem = private_key_pem
1088       self.observer_secret = observer_secret
1089       BaseHTTPServer.HTTPServer.__init__( self, server, req_handler )
1090
1091
1092 #-------------------------------
1093 def credential_server_spawn( old_exit_status ):
1094    """
1095    Start our credential server (i.e. in a separate process, started by the watchdog)
1096    """
1097    
1098    setproctitle.setproctitle( "syndicate-credential-server" )
1099    
1100    private_key = syndicate_storage.read_private_key( CONFIG.SYNDICATE_PRIVATE_KEY )
1101    if private_key is None:
1102       # exit code 255 will be ignored...
1103       logger.error("Cannot load private key.  Exiting...")
1104       sys.exit(255)
1105    
1106    logger.info("Starting Syndicate Observer credential server on port %s" % CONFIG.SYNDICATE_HTTP_PORT)
1107                
1108    srv = CredentialServer( private_key.exportKey(), CONFIG.SYNDICATE_OPENCLOUD_SECRET, ('', CONFIG.SYNDICATE_HTTP_PORT), CredentialServerHandler)
1109    srv.serve_forever()
1110
1111
1112 #-------------------------------
1113 def ensure_credential_server_running( foreground=False, run_once=False ):
1114    """
1115    Instantiate our credential server and keep it running.
1116    """
1117    
1118    # is the watchdog running?
1119    pids = syndicate_watchdog.find_by_attrs( "syndicate-credential-server-watchdog", {} )
1120    if len(pids) > 0:
1121       # it's running
1122       return True
1123    
1124    if foreground:
1125       # run in foreground 
1126       
1127       if run_once:
1128          return credential_server_spawn( 0 )
1129       
1130       else:
1131          return syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) )
1132       
1133    
1134    # not running, and not foregrounding.  fork a new one
1135    try:
1136       watchdog_pid = os.fork()
1137    except OSError, oe:
1138       logger.error("Failed to fork, errno = %s" % oe.errno)
1139       return False
1140    
1141    if watchdog_pid != 0:
1142       
1143       # child--become watchdog
1144       setproctitle.setproctitle( "syndicate-credential-server-watchdog" )
1145       
1146       if run_once:
1147          syndicate_daemon.daemonize( lambda: credential_server_spawn(0), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
1148       
1149       else:
1150          syndicate_daemon.daemonize( lambda: syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) ), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
1151
1152
1153 #-------------------------------
1154 # Begin functional tests.
1155 # Any method starting with ft_ is a functional test.
1156 #-------------------------------
1157   
1158 #-------------------------------
1159 def ft_syndicate_access():
1160     """
1161     Functional tests for ensuring objects exist and don't exist in Syndicate.
1162     """
1163     
1164     fake_user = FakeObject()
1165     fake_user.email = "fakeuser@opencloud.us"
1166
1167     print "\nensure_user_exists(%s)\n" % fake_user.email
1168     ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
1169
1170     print "\nensure_user_exists(%s)\n" % fake_user.email
1171     ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
1172
1173     fake_volume = FakeObject()
1174     fake_volume.name = "fakevolume"
1175     fake_volume.description = "This is a fake volume, created for funtional testing"
1176     fake_volume.blocksize = 1024
1177     fake_volume.cap_read_data = True 
1178     fake_volume.cap_write_data = True 
1179     fake_volume.cap_host_data = False
1180     fake_volume.archive = False
1181     fake_volume.private = True
1182     
1183     # test idempotency
1184     print "\nensure_volume_exists(%s)\n" % fake_volume.name
1185     ensure_volume_exists( fake_user.email, fake_volume )
1186
1187     print "\nensure_volume_exists(%s)\n" % fake_volume.name
1188     ensure_volume_exists( fake_user.email, fake_volume )
1189     
1190     print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
1191     ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
1192     
1193     print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
1194     ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
1195     
1196     print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
1197     ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
1198     
1199     print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
1200     ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
1201  
1202     print "\nensure_volume_absent(%s)\n" % fake_volume.name
1203     ensure_volume_absent( fake_volume.name )
1204
1205     print "\nensure_volume_absent(%s)\n" % fake_volume.name
1206     ensure_volume_absent( fake_volume.name )
1207
1208     print "\nensure_user_absent(%s)\n" % fake_user.email
1209     ensure_user_absent( fake_user.email )
1210
1211     print "\nensure_user_absent(%s)\n" % fake_user.email
1212     ensure_user_absent( fake_user.email )
1213     
1214     
1215     
1216     
1217     print "\nensure_principal_exists(%s)\n" % fake_user.email
1218     ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
1219     
1220     print "\nensure_principal_exists(%s)\n" % fake_user.email
1221     ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
1222
1223     print "\nensure_volume_exists(%s)\n" % fake_volume.name
1224     ensure_volume_exists( fake_user.email, fake_volume )
1225
1226     print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
1227     setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
1228     
1229     print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
1230     setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
1231     
1232     print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
1233     teardown_volume_access( fake_user.email, fake_volume.name )
1234     
1235     print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
1236     teardown_volume_access( fake_user.email, fake_volume.name )
1237     
1238     print "\nensure_volume_absent(%s)\n" % fake_volume.name
1239     ensure_volume_absent( fake_volume.name )
1240
1241     print "\nensure_principal_absent(%s)\n" % fake_user.email
1242     ensure_principal_absent( fake_user.email )
1243     
1244
1245
1246 #-------------------------------
1247 def ft_volumeslice( slice_name ):
1248     """
1249     Functional tests for reading VolumeSlice information
1250     """
1251     print "slice: %s" % slice_name
1252     
1253     volumes = get_volumeslice_volume_names( slice_name )
1254     
1255     print "volumes mounted in slice %s:" % slice_name
1256     for v in volumes:
1257        print "   %s:" % v
1258       
1259        vs = get_volumeslice( v, slice_name )
1260        
1261        print "      %s" % dir(vs)
1262           
1263
1264 #-------------------------------
1265 def ft_get_slice_hostnames( slice_name ):
1266    """
1267    Functional tests for getting slice hostnames
1268    """
1269    
1270    print "Get slice hostnames for %s" % slice_name
1271    
1272    hostnames = get_slice_hostnames( slice_name )
1273    import pprint 
1274    
1275    pp = pprint.PrettyPrinter()
1276    
1277    pp.pprint( hostnames )
1278
1279
1280 #-------------------------------
1281 def ft_syndicate_principal():
1282    """
1283    Functional tests for creating, reading, and deleting SyndicatePrincipals.
1284    """
1285    print "generating key pair"
1286    pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
1287    
1288    user_email = "fakeuser@opencloud.us"
1289    
1290    print "saving principal"
1291    put_principal_data( user_email, "asdf", pubkey_pem, privkey_pem )
1292    
1293    print "fetching principal private key"
1294    saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
1295    
1296    assert saved_privkey_pem is not None, "Could not fetch saved private key"
1297    assert saved_privkey_pem == privkey_pem, "Saved private key does not match actual private key"
1298    
1299    print "delete principal"
1300    
1301    delete_principal_data( user_email )
1302    
1303    print "make sure its deleted..."
1304    
1305    saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
1306    
1307    assert saved_privkey_pem is None, "Principal key not deleted"
1308    
1309
1310 #-------------------------------
1311 def ft_credential_server():
1312    """
1313    Functional test for the credential server
1314    """
1315    ensure_credential_server_running( run_once=True, foreground=True )
1316
1317
1318 #-------------------------------
1319 def ft_seal_and_unseal():
1320     """
1321     Functional test for sealing/unsealing data
1322     """
1323     print "generating key pair"
1324     pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
1325     
1326     sealed_buf = create_sealed_and_signed_blob( privkey_pem, "foo", "hello world")
1327     print "sealed data is:\n\n%s\n\n" % sealed_buf
1328
1329     buf = verify_and_unseal_blob( pubkey_pem, "foo", sealed_buf )
1330     print "unsealed data is: \n\n%s\n\n" % buf
1331     
1332
1333 # run functional tests
1334 if __name__ == "__main__":
1335     sys.path.append("/opt/planetstack")
1336     os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
1337
1338     if len(sys.argv) < 2:
1339       print "Usage: %s testname [args]" % sys.argv[0]
1340       
1341     # call a method starting with ft_, and then pass the rest of argv as its arguments
1342     testname = sys.argv[1]
1343     ft_testname = "ft_%s" % testname
1344     
1345     test_call = "%s(%s)" % (ft_testname, ",".join(sys.argv[2:]))
1346    
1347     print "calling %s" % test_call
1348    
1349     rc = eval( test_call )
1350    
1351     print "result = %s" % rc
1352       
1353