--- /dev/null
+from fnmatch import fnmatch
+
+class AccessControlList:
+ def __init__(self, aclText=None):
+ self.rules = []
+ if aclText:
+ self.import_text(aclText)
+
+ def import_text(self, aclText):
+ # allow either newline or ';' to separate rules
+ aclText = aclText.replace("\n", ";")
+ for line in aclText.split(";"):
+ line = line.strip()
+ if line.startswith("#"):
+ continue
+
+ if line=="":
+ continue
+
+ parts = line.split()
+
+ if len(parts)==2 and (parts[1]=="all"):
+ # "allow all" has no pattern
+ parts = (parts[0], parts[1], "")
+
+ if len(parts)!=3:
+ raise ACLValidationError(line)
+
+ (action, object, pattern) = parts
+
+ if action not in ["allow", "deny"]:
+ raise ACLValidationError(line)
+
+ if object not in ["site", "user", "all"]:
+ raise ACLValidationError(line)
+
+ self.rules.append( (action, object, pattern) )
+
+ def __str__(self):
+ lines = []
+ for rule in self.rules:
+ lines.append( " ".join(rule) )
+ return ";\n".join(lines)
+
+ def test(self, user):
+ for rule in self.rules:
+ if self.match_rule(rule, user):
+ return rule[0]
+ return "deny"
+
+ def match_rule(self, rule, user):
+ (action, object, pattern) = rule
+
+ if (object == "site"):
+ if fnmatch(user.site.name, pattern):
+ return True
+ elif (object == "user"):
+ if fnmatch(user.email, pattern):
+ return True
+ elif (object == "all"):
+ return True
+
+ return False
+
+
+if __name__ == '__main__':
+ class fakesite:
+ def __init__(self, siteName):
+ self.name = siteName
+
+ class fakeuser:
+ def __init__(self, email, siteName):
+ self.email = email
+ self.site = fakesite(siteName)
+
+ u_scott = fakeuser("scott@onlab.us", "ON.Lab")
+ u_bill = fakeuser("bill@onlab.us", "ON.Lab")
+ u_andy = fakeuser("acb@cs.princeton.edu", "Princeton")
+ u_john = fakeuser("jhh@cs.arizona.edu", "Arizona")
+ u_hacker = fakeuser("somehacker@foo.com", "Not A Real Site")
+
+ # check the "deny all" rule
+ acl = AccessControlList("deny all")
+ assert(acl.test(u_scott) == "deny")
+
+ # a blank ACL results in "deny all"
+ acl = AccessControlList("")
+ assert(acl.test(u_scott) == "deny")
+
+ # check the "allow all" rule
+ acl = AccessControlList("allow all")
+ assert(acl.test(u_scott) == "allow")
+
+ # allow only one site
+ acl = AccessControlList("allow site ON.Lab")
+ assert(acl.test(u_scott) == "allow")
+ assert(acl.test(u_andy) == "deny")
+
+ # some complicated ACL
+ acl = AccessControlList("""allow site Princeton
+ allow user *@cs.arizona.edu
+ deny site Arizona
+ deny user scott@onlab.us
+ allow site ON.Lab""")
+
+ assert(acl.test(u_scott) == "deny")
+ assert(acl.test(u_bill) == "allow")
+ assert(acl.test(u_andy) == "allow")
+ assert(acl.test(u_john) == "allow")
+ assert(acl.test(u_hacker) == "deny")
+
+ print acl
+
model = Deployment
def __init__(self, *args, **kwargs):
+ request = kwargs.pop('request', None)
super(DeploymentAdminForm, self).__init__(*args, **kwargs)
+ self.fields['accessControl'].initial = "allow site " + request.user.site.name
+
if self.instance and self.instance.pk:
self.fields['sites'].initial = [x.site for x in self.instance.sitedeployments_set.all()]
suit_classes = 'suit-tab suit-tab-sites'
class DeploymentAdmin(PlanetStackBaseAdmin):
- #form = DeploymentAdminForm
model = Deployment
- fieldList = ['name','sites']
+ fieldList = ['name','sites', 'accessControl']
fieldsets = [(None, {'fields': fieldList, 'classes':['suit-tab suit-tab-sites']})]
inlines = [DeploymentPrivilegeInline,NodeInline,TagInline]
kwargs["form"] = DeploymentAdminROForm
else:
kwargs["form"] = DeploymentAdminForm
- return super(DeploymentAdmin,self).get_form(request, obj, **kwargs)
-\r
+ adminForm = super(DeploymentAdmin,self).get_form(request, obj, **kwargs)
+
+ # from stackexchange: pass the request object into the form
+
+ class AdminFormMetaClass(adminForm):
+ def __new__(cls, *args, **kwargs):
+ kwargs['request'] = request
+ return adminForm(*args, **kwargs)
+
+ return AdminFormMetaClass
+
class ServiceAttrAsTabROInline(ReadOnlyTabularInline):
model = ServiceAttribute
fields = ['name','value']
from .service import ServiceAttribute
from .tag import Tag
from .role import Role
-#from .deployment import Deployment
from .site import Site,Deployment, DeploymentRole, DeploymentPrivilege, SiteDeployments
from .dashboard import DashboardView
from .user import User, UserDeployments, UserDashboardView
from .slice import SlicePrivilege
from .site import SiteRole
from .site import SitePrivilege
-#from .deployment import DeploymentRole
-#from .deployment import DeploymentPrivilege
from .planetstack import PlanetStackRole
from .planetstack import PlanetStackPrivilege
from .slicetag import SliceTag
+++ /dev/null
-import os
-from django.db import models
-from core.models import PlCoreBase
-from django.contrib.contenttypes import generic
-
-# Create your models here.
-
-class ManyToManyField_NoSyncdb(models.ManyToManyField):
- def __init__(self, *args, **kwargs):
- super(ManyToManyField_NoSyncdb, self).__init__(*args, **kwargs)
- self.creates_table = False
-
-class Deployment(PlCoreBase):
- name = models.CharField(max_length=200, unique=True, help_text="Name of the Deployment")
- admin_user = models.CharField(max_length=200, null=True, blank=True, help_text="Username of an admin user at this deployment")
- admin_password = models.CharField(max_length=200, null=True, blank=True, help_text="Password of theadmin user at this deployment")
- admin_tenant = models.CharField(max_length=200, null=True, blank=True, help_text="Name of the tenant the admin user belongs to")
- auth_url = models.CharField(max_length=200, null=True, blank=True, help_text="Auth url for the deployment")
-# sites = ManyToManyField_NoSyncdb('Site', through=Site.deployments.through, blank=True)
-
- def __unicode__(self): return u'%s' % (self.name)
-
-
-class DeploymentRole(PlCoreBase):
-
- ROLE_CHOICES = (('admin','Admin'),)
- role = models.CharField(choices=ROLE_CHOICES, unique=True, max_length=30)
-
- def __unicode__(self): return u'%s' % (self.role)
-
-class DeploymentPrivilege(PlCoreBase):
-
- user = models.ForeignKey('User', related_name='deployment_privileges')
- deployment = models.ForeignKey('Deployment', related_name='deployment_privileges')
- role = models.ForeignKey('DeploymentRole')
-
- def __unicode__(self): return u'%s %s %s' % (self.deployment, self.user, self.role)
-
-
- def can_update(self, user):
- if user.is_readonly:
- return False
- if user.is_admin:
- return True
- dprivs = DeploymentPrivilege.objects.filter(user=user)
- for dpriv in dprivs:
- if dpriv.role.role_type == 'admin':
- return True
- return False
-
- @staticmethod
- def select_by_user(user):
- if user.is_admin:
- qs = DeploymentPrivilege.objects.all()
- else:
- dpriv_ids = [dp.id for dp in DeploymentPrivilege.objects.filter(user=user)]
- qs = DeploymentPrivilege.objects.filter(id__in=dpriv_ids)
- return qs
import os
from django.db import models
from core.models import PlCoreBase
-#from core.models import Deployment
from core.models import Tag
from django.contrib.contenttypes import generic
from geoposition.fields import GeopositionField
+from core.acl import AccessControlList
class Site(PlCoreBase):
"""
class Deployment(PlCoreBase):
name = models.CharField(max_length=200, unique=True, help_text="Name of the Deployment")
- #sites = models.ManyToManyField('Site', through='SiteDeployments', blank=True)
+
+ # smbaker: the default of 'allow all' is intended for evolutions of existing
+ # deployments. When new deployments are created via the GUI, they are
+ # given a default of 'allow site <site_of_creator>'
+ accessControl = models.TextField(max_length=200, blank=False, null=False, default="allow all",
+ help_text="Access control list that specifies which sites/users may use nodes in this deployment")
+
+ def get_acl(self):
+ return AccessControlList(self.accessControl)
+
+ def test_acl(self, slice=None, user=None):
+ potential_users=[]
+
+ if user:
+ potential_users.append(user)
+
+ if slice:
+ potential_users.append(slice.creator)
+ for priv in slice.slice_privileges.all():
+ if priv.user not in potential_users:
+ potential_users.append(priv.user)
+
+ acl = self.get_acl()
+ for user in potential_users:
+ if acl.test(user) == "allow":
+ return True
+
+ return False
+
+ def select_by_acl(self, user):
+ acl = self.get_acl()
+ result = []
+ for deployment in Deployment.objects.all():
+ if acl.test(user):
+ result.append(deployment)
+ return result
def __unicode__(self): return u'%s' % (self.name)
else:
return u'unsaved-sliver'
-
def save(self, *args, **kwds):
if not self.name:
self.name = self.slice.name
if not self.creator and hasattr(self, 'caller'):
self.creator = self.caller
self.deploymentNetwork = self.node.deployment
+
+ if not self.deploymentNetwork.test_acl(slice=self.slice):
+ raise exceptions.ValidationError("Deployment %s's ACL does not allow any of this slice %s's users" % (self.deploymentNetwork.name, self.slice.name))
+
super(Sliver, self).save(*args, **kwds)
def can_update(self, user):