+# focusing on pylint, ignoring flake8
+# flake8: noqa
+
+# pylint: disable=invalid-name
+# pylint: disable=redefined-builtin
+# pylint: disable=unused-argument
+# pylint: disable=missing-function-docstring
+# pylint: disable=missing-module-docstring
+# pylint: disable=consider-using-f-string
+
import time
import re
-from PLC.Faults import *
+from PLC.Faults import PLCInvalidArgument
from PLC.Parameter import Parameter, Mixed
from PLC.Filter import Filter
-from PLC.Debug import profile
+# from PLC.Debug import profile
from PLC.Table import Row, Table
-from PLC.SliceInstantiations import SliceInstantiation, SliceInstantiations
-from PLC.Nodes import Node
+from PLC.SliceInstantiations import SliceInstantiations # , SliceInstantiation
+from PLC.Nodes import Node, Nodes
from PLC.Persons import Person, Persons
-from PLC.SliceTags import SliceTag
+# from PLC.SliceTags import SliceTag
from PLC.Timestamp import Timestamp
+
class Slice(Row):
"""
Representation of a row in the slices table. To use, optionally
table_name = 'slices'
primary_key = 'slice_id'
- join_tables = ['slice_node', 'slice_person', 'slice_tag', 'peer_slice', 'node_slice_whitelist', 'leases', ]
+ join_tables = ['slice_node', 'slice_person', 'slice_tag', 'peer_slice', 'node_slice_whitelist', 'leases']
fields = {
'slice_id': Parameter(int, "Slice identifier"),
'site_id': Parameter(int, "Identifier of the site to which this slice belongs"),
'description': Parameter(str, "Slice description", max = 2048, nullok = True),
'max_nodes': Parameter(int, "Maximum number of nodes that can be assigned to this slice"),
'creator_person_id': Parameter(int, "Identifier of the account that created this slice"),
- 'created': Parameter(int, "Date and time when slice was created, in seconds since UNIX epoch", ro = True),
+ 'created': Parameter(int,
+ "Date and time when slice was created, in seconds since UNIX epoch", ro = True),
'expires': Parameter(int, "Date and time when slice expires, in seconds since UNIX epoch"),
'node_ids': Parameter([int], "List of nodes in this slice", ro = True),
'person_ids': Parameter([int], "List of accounts that can use this slice", ro = True),
Parameter(str, "Fully qualified hostname"))]
}
- view_tags_name="view_slice_tags"
+ view_tags_name = "view_slice_tags"
tags = {}
def validate_name(self, name):
# N.B.: Responsibility of the caller to ensure that expires is
# not too far into the future.
check_future = not ('is_deleted' in self and self['is_deleted'])
- return Timestamp.sql_validate( expires, check_future = check_future)
+ return Timestamp.sql_validate(expires, check_future=check_future)
add_person = Row.add_object(Person, 'slice_person')
remove_person = Row.remove_object(Person, 'slice_person')
Deletes persons not found in value list from this slice (using DeletePersonFromSlice).
"""
+ from PLC.Methods.AddPersonToSlice import AddPersonToSlice # pylint: disable=import-outside-toplevel
+ from PLC.Methods.DeletePersonFromSlice import DeletePersonFromSlice # pylint: disable=import-outside-toplevel
+
assert 'person_ids' in self
assert 'slice_id' in self
assert isinstance(value, list)
# Add new ids, remove stale ids
if self['person_ids'] != person_ids:
- from PLC.Methods.AddPersonToSlice import AddPersonToSlice
- from PLC.Methods.DeletePersonFromSlice import DeletePersonFromSlice
new_persons = set(person_ids).difference(self['person_ids'])
stale_persons = set(self['person_ids']).difference(person_ids)
for new_person in new_persons:
AddPersonToSlice.__call__(AddPersonToSlice(self.api), auth, new_person, self['slice_id'])
for stale_person in stale_persons:
- DeletePersonFromSlice.__call__(DeletePersonFromSlice(self.api), auth, stale_person, self['slice_id'])
+ DeletePersonFromSlice.__call__(
+ DeletePersonFromSlice(self.api), auth, stale_person, self['slice_id'])
def associate_nodes(self, auth, field, value):
"""
Deletes nodes not found in value list from this slice (using DeleteSliceFromNodes).
"""
- from PLC.Nodes import Nodes
+ from PLC.Methods.AddSliceToNodes import AddSliceToNodes # pylint: disable=import-outside-toplevel
+ from PLC.Methods.DeleteSliceFromNodes import DeleteSliceFromNodes # pylint: disable=import-outside-toplevel
assert 'node_ids' in self
assert 'slice_id' in self
# Add new ids, remove stale ids
if self['node_ids'] != node_ids:
- from PLC.Methods.AddSliceToNodes import AddSliceToNodes
- from PLC.Methods.DeleteSliceFromNodes import DeleteSliceFromNodes
new_nodes = set(node_ids).difference(self['node_ids'])
stale_nodes = set(self['node_ids']).difference(node_ids)
if new_nodes:
- AddSliceToNodes.__call__(AddSliceToNodes(self.api), auth, self['slice_id'], list(new_nodes))
+ AddSliceToNodes.__call__(
+ AddSliceToNodes(self.api), auth, self['slice_id'], list(new_nodes))
if stale_nodes:
- DeleteSliceFromNodes.__call__(DeleteSliceFromNodes(self.api), auth, self['slice_id'], list(stale_nodes))
+ DeleteSliceFromNodes.__call__(
+ DeleteSliceFromNodes(self.api), auth, self['slice_id'], list(stale_nodes))
+
def associate_slice_tags(self, auth, fields, value):
"""
Deletes slice_tag_ids not found in value list (using DeleteSliceTag).
Updates slice_tag if slice_fields w/ slice_id is found (using UpdateSlceiAttribute).
"""
+ from PLC.Methods.DeleteSliceTag import DeleteSliceTag # pylint: disable=import-outside-toplevel
+
assert 'slice_tag_ids' in self
assert isinstance(value, list)
- (attribute_ids, blank, attributes) = self.separate_types(value)
+ (attribute_ids, _, attributes) = self.separate_types(value)
# There is no way to add attributes by id. They are
# associated with a slice when they are created.
# So we are only looking to delete here
if self['slice_tag_ids'] != attribute_ids:
- from PLC.Methods.DeleteSliceTag import DeleteSliceTag
stale_attributes = set(self['slice_tag_ids']).difference(attribute_ids)
for stale_attribute in stale_attributes:
# If dictionary exists, we are either adding new
# attributes or updating existing ones.
if attributes:
- from PLC.Methods.AddSliceTag import AddSliceTag
- from PLC.Methods.UpdateSliceTag import UpdateSliceTag
-
+ from PLC.Methods.AddSliceTag import AddSliceTag # pylint: disable=import-outside-toplevel
+ from PLC.Methods.UpdateSliceTag import UpdateSliceTag # pylint: disable=import-outside-toplevel
added_attributes = [x for x in attributes if 'slice_tag_id' not in x]
updated_attributes = [x for x in attributes if 'slice_tag_id' in x]
else:
nodegroup_id = None
- AddSliceTag.__call__(AddSliceTag(self.api), auth, self['slice_id'], type, value, node_id, nodegroup_id)
+ AddSliceTag.__call__(
+ AddSliceTag(self.api), auth, self['slice_id'], type, value, node_id, nodegroup_id)
for updated_attribute in updated_attributes:
attribute_id = updated_attribute.pop('slice_tag_id')
if attribute_id not in self['slice_tag_ids']:
else:
UpdateSliceTag.__call__(UpdateSliceTag(self.api), auth, attribute_id, updated_attribute)
- def sync(self, commit = True):
+ def sync(self, commit = True): # pylint: disable=arguments-differ
"""
Add or update a slice.
"""
slice_filter = Filter(Slice.fields, {'slice_id': ints, 'name': strs})
sql += " AND (%s) %s" % slice_filter.sql(api, "OR")
elif isinstance(slice_filter, dict):
- allowed_fields=dict(list(Slice.fields.items())+list(Slice.tags.items()))
+ allowed_fields = dict(list(Slice.fields.items())+list(Slice.tags.items()))
slice_filter = Filter(allowed_fields, slice_filter)
sql += " AND (%s) %s" % slice_filter.sql(api, "AND")
- elif isinstance (slice_filter, str):
- slice_filter = Filter(Slice.fields, {'name':slice_filter})
+ elif isinstance(slice_filter, str):
+ slice_filter = Filter(Slice.fields, {'name': slice_filter})
sql += " AND (%s) %s" % slice_filter.sql(api, "AND")
- elif isinstance (slice_filter, int):
- slice_filter = Filter(Slice.fields, {'slice_id':slice_filter})
+ elif isinstance(slice_filter, int):
+ slice_filter = Filter(Slice.fields, {'slice_id': slice_filter})
sql += " AND (%s) %s" % slice_filter.sql(api, "AND")
else:
raise PLCInvalidArgument(f"Wrong slice filter {slice_filter!r}")