# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import StringTypes
from PLC.Faults import *
from PLC.Parameter import Parameter
from PLC.Filter import Filter
if isinstance(address_type_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in address_type_filter if isinstance(x, int)]
- strs = [x for x in address_type_filter if isinstance(x, StringTypes)]
+ strs = [x for x in address_type_filter if isinstance(x, str)]
address_type_filter = Filter(AddressType.fields, {'address_type_id': ints, 'name': strs})
sql += " AND (%s) %s" % address_type_filter.sql(api, "OR")
elif isinstance(address_type_filter, dict):
elif isinstance(address_type_filter, int):
address_type_filter = Filter(AddressType.fields, {'address_type_id': address_type_filter})
sql += " AND (%s) %s" % address_type_filter.sql(api, "AND")
- elif isinstance(address_type_filter, StringTypes):
+ elif isinstance(address_type_filter, str):
address_type_filter = Filter(AddressType.fields, {'name': address_type_filter})
sql += " AND (%s) %s" % address_type_filter.sql(api, "AND")
else:
#
# Thierry Parmentelat - INRIA
#
-from types import StringTypes
import time
from PLC.Faults import *
* similarly the two special keys below allow to change the semantics of multi-keys filters
* '-AND' : select rows that match ALL the criteria (default)
* '-OR' : select rows that match ANY criteria
- The value attached to these keys is ignored.
+ The value attached to these keys is ignored.
Please note however that because a Filter is a dict, you cannot provide two criteria on a given key.
-
+
Here are a few realistic examples
if value is None:
operator = "IS"
value = "NULL"
- elif isinstance(value, StringTypes) and \
+ elif isinstance(value, str) and \
(value.find("*") > -1 or value.find("%") > -1):
operator = "ILIKE"
# insert *** in pattern instead of either * or %
import os
import xmlrpc.client
import shutil
-from types import StringTypes
from io import StringIO
from subprocess import Popen, PIPE, call
from tempfile import NamedTemporaryFile, mkdtemp
xml = xmlrpc.client.dumps(args, methodname, methodresponse, encoding = 'utf-8', allow_none = 1)
dom = etree.fromstring(xml)
canonical=etree.tostring(dom)
- # pre-f20 version was using Canonicalize from PyXML
+ # pre-f20 version was using Canonicalize from PyXML
# from xml.dom.ext import Canonicalize
# Canonicalize(), though it claims to, does not encode unicode
# nodes to UTF-8 properly and throws an exception unless you write
"""
# Accept either an opaque string blob or a Python tuple
- if isinstance(args, StringTypes):
+ if isinstance(args, str):
message = args
elif isinstance(args, tuple):
message = canonicalize(args, methodname, methodresponse)
"""
# Accept either an opaque string blob or a Python tuple
- if isinstance(args, StringTypes):
+ if isinstance(args, str):
message = args
else:
message = canonicalize(args, methodname, methodresponse)
# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import StringTypes
from PLC.Faults import *
from PLC.Parameter import Parameter
from PLC.Filter import Filter
if isinstance(initscript_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in initscript_filter if isinstance(x, int)]
- strs = [x for x in initscript_filter if isinstance(x, StringTypes)]
+ strs = [x for x in initscript_filter if isinstance(x, str)]
initscript_filter = Filter(InitScript.fields, {'initscript_id': ints, 'name': strs })
sql += " AND (%s) %s" % initscript_filter.sql(api, "OR")
elif isinstance(initscript_filter, dict):
elif isinstance(initscript_filter, int):
initscript_filter = Filter(InitScript.fields, {'initscript_id': initscript_filter})
sql += " AND (%s) %s" % initscript_filter.sql(api, "AND")
- elif isinstance(initscript_filter, StringTypes):
+ elif isinstance(initscript_filter, str):
initscript_filter = Filter(InitScript.fields, {'name': initscript_filter})
sql += " AND (%s) %s" % initscript_filter.sql(api, "AND")
else:
# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import StringTypes
import socket
import struct
ip = socket.inet_ntop(socket.AF_INET6, socket.inet_pton(socket.AF_INET6, ip))
return True
except socket.error:
- return False
+ return False
def valid_ip(ip):
return valid_ipv4(ip) or valid_ipv6(ip)
def in_same_network(address1, address2, netmask):
return in_same_network_ipv4(address1, address2, netmask) or \
- in_same_network_ipv6(address1, address2, netmask)
+ in_same_network_ipv6(address1, address2, netmask)
class Interface(Row):
"""
### need to cleanup ilinks
self.api.db.do("DELETE FROM ilink WHERE src_interface_id=%d OR dst_interface_id=%d" % \
(self['interface_id'],self['interface_id']))
-
+
Row.delete(self)
class Interfaces(Table):
if isinstance(interface_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in interface_filter if isinstance(x, int)]
- strs = [x for x in interface_filter if isinstance(x, StringTypes)]
+ strs = [x for x in interface_filter if isinstance(x, str)]
interface_filter = Filter(Interface.fields, {'interface_id': ints, 'ip': strs})
sql += " AND (%s) %s" % interface_filter.sql(api, "OR")
elif isinstance(interface_filter, dict):
elif isinstance(interface_filter, int):
interface_filter = Filter(Interface.fields, {'interface_id': [interface_filter]})
sql += " AND (%s) %s" % interface_filter.sql(api)
- elif isinstance (interface_filter, StringTypes):
+ elif isinstance (interface_filter, str):
interface_filter = Filter(Interface.fields, {'ip':[interface_filter]})
sql += " AND (%s) %s" % interface_filter.sql(api, "AND")
else:
import time
import calendar
-from types import StringTypes
from PLC.Faults import *
from PLC.Filter import Filter
from PLC.Parameter import Parameter, Mixed
# hooks for the local fields
def sql_alive(self, alive):
- if isinstance(alive, int) or isinstance(alive, StringTypes):
+ if isinstance(alive, int) or isinstance(alive, str):
# the lease is alive at that time if from <= alive <= until
alive = LeaseFilter.quote(alive)
return LeaseFilter.sql_time_in_range(alive, 't_from', 't_until')
.format(alive))
def sql_clip(self, clip):
- if isinstance(clip, int) or isinstance(clip, StringTypes):
+ if isinstance(clip, int) or isinstance(clip, str):
start = LeaseFilter.quote(clip)
return LeaseFilter.sql_timeslot_after('t_from', 't_until', start)
elif isinstance(clip, tuple):
raise PLCInvalidArgument("LeaseFilter: clip field {}"
.format(clip))
- # the whole key to implementing day is to compute today's beginning
+ # the whole key to implementing day is to compute today's beginning
def today_start(self):
# a struct_time
st = time.localtime()
else:
self['clip'] = (today, today + nb_days * 24 * 3600)
del self['day']
-
+
# preserve locally what belongs to us, hide it from the superclass
# self.local is a dict local_key : user_value
# self.negation is a dict local_key : string
import time
import pprint
-from types import StringTypes
-
from PLC.Faults import *
from PLC.Parameter import Parameter, Mixed, python_type, xmlrpc_type
from PLC.Auth import Auth
self.name = self.__class__.__name__
self.api = api
- if caller:
+ if caller:
# let a method call another one by propagating its caller
self.caller=caller
else:
# Auth may set this to a Person instance (if an anonymous
# method, will remain None).
self.caller = None
-
+
# API may set this to a (addr, port) tuple if known
self.source = None
# Strings are a special case. Accept either unicode or str
# types if a string is expected.
- if expected_type in StringTypes and isinstance(value, StringTypes):
+ if expected_type is str and isinstance(value, str):
pass
# Integers and long integers are also special types. Accept
name)
# If a minimum or maximum (length, value) has been specified
- if expected_type in StringTypes:
+ if expected_type is str:
if min is not None and \
len(value.encode(self.api.encoding)) < min:
raise PLCInvalidArgument("%s must be at least %d bytes long" % (name, min))
import time
import urllib.request, urllib.parse, urllib.error
-from types import StringTypes
-
from PLC.Logger import logger
from PLC.Faults import *
from PLC.Method import Method
def call(self, auth, person_id_or_email, verification_key = None, verification_expires = None):
# Get account information
# we need to search in local objects only
- if isinstance (person_id_or_email,StringTypes):
- filter={'email':person_id_or_email}
+ if isinstance (person_id_or_email, str):
+ filter = {'email': person_id_or_email}
else:
- filter={'person_id':person_id_or_email}
+ filter = {'person_id': person_id_or_email}
filter['peer_id']=None
persons = Persons(self.api, filter)
if not persons:
-from types import StringTypes
-
from PLC.Method import Method
from PLC.Parameter import Parameter, Mixed
from PLC.Filter import Filter
from PLC.Auth import Auth
-from PLC.Slices import Slice, Slices
+from PLC.Slices import Slice, Slices
from PLC.Persons import Person, Persons
from PLC.Keys import Key, Keys
from functools import reduce
This method exposes the public ssh keys for people in a slice
It expects a slice name or id, and returns a dictionary on emails.
This method is designed to help third-party software authenticate
- users (e.g. the OMF Experiment Controller).
+ users (e.g. the OMF Experiment Controller).
For this reason it is accessible with anonymous authentication.
"""
# the people in the slice
slice=Slices (self.api, slice_id_or_name, ['person_ids'])[0]
slice_person_ids = slice['person_ids']
-
+
# if caller has not specified person_id, use slice_person_ids
if 'person_id' not in person_filter:
person_filter['person_id']=slice_person_ids
if not isinstance (caller_provided,list):
caller_provided = [ caller_provided, ]
person_filter['person_id'] = list ( set(caller_provided).intersection(slice_person_ids) )
-
+
def merge (l1,l2): return l1+l2
persons = Persons (self.api, person_filter, ['email','key_ids'] )
key_id_to_email_hash = \
- dict ( reduce ( merge , [ [ (kid,p['email']) for kid in p['key_ids']] for p in persons ] ) )
-
+ dict ( reduce ( merge , [ [ (kid,p['email']) for kid in p['key_ids']] for p in persons ] ) )
+
all_key_ids = reduce (merge, [ p['key_ids'] for p in persons ] )
all_keys = Keys (self.api, all_key_ids)
-
+
result={}
for key in all_keys:
key_id=key['key_id']
-from types import StringTypes
-
from PLC.Method import Method
from PLC.Parameter import Parameter, Mixed
from PLC.Filter import Filter
from PLC.Auth import Auth
from PLC.Nodes import Node, Nodes
from PLC.SliceTags import SliceTag, SliceTags
-from PLC.Slices import Slice, Slices
+from PLC.Slices import Slice, Slices
class RetrieveSliceSliverKeys(Method):
"""
This method exposes the public ssh keys for a slice's slivers.
It expects a slice name or id, and returns a dictionary on hostnames.
This method is designed to help third-party software authenticate
- slivers (e.g. the OMF Experiment Controller).
+ slivers (e.g. the OMF Experiment Controller).
For this reason it is accessible with anonymous authentication.
"""
returns = Parameter (dict, " ssh keys hashed on hostnames")
def call(self, auth, slice_id_or_name, node_filter=None):
-
+
filter={}
if isinstance(slice_id_or_name,int):
- filter['slice_id']=slice_id_or_name
- elif isinstance(slice_id_or_name,StringTypes):
- filter['name']=slice_id_or_name
- filter['tagname']='ssh_key'
+ filter['slice_id'] = slice_id_or_name
+ elif isinstance(slice_id_or_name, str):
+ filter['name'] = slice_id_or_name
+ filter['tagname'] = 'ssh_key'
# retrieve only sliver tags
- filter['~node_id']=None
+ filter['~node_id'] = None
if node_filter:
# make sure we only deal with local nodes
- node_filter['peer_id']=None
+ node_filter['peer_id'] = None
nodes = Nodes(self.api, node_filter, ['node_id'])
- node_ids = [ node ['node_id'] for node in nodes ]
- filter['node_id']=node_ids
-
+ node_ids = [node['node_id'] for node in nodes]
+ filter['node_id'] = node_ids
+
# slice_tags don't expose hostname, sigh..
- slice_tags=SliceTags(self.api,filter,['node_id','tagname','value'])
+ slice_tags = SliceTags(self.api, filter, ['node_id', 'tagname', 'value'])
node_ids = [st['node_id'] for st in slice_tags]
# fetch nodes
- nodes=Nodes(self.api,node_ids,['node_id','hostname'])
+ nodes = Nodes(self.api, node_ids, ['node_id', 'hostname'])
# hash on node_id
- nodes_hash=dict( [ (n['node_id'],n['hostname']) for n in nodes])
+ nodes_hash = {n['node_id']: n['hostname'] for n in nodes}
# return values hashed on hostname
- return dict([ (nodes_hash[st['node_id']],st['value']) for st in slice_tags])
+ return {nodes_hash[st['node_id']]: st['value'] for st in slice_tags}
# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import StringTypes
-
from PLC.Faults import *
from PLC.Parameter import Parameter, Mixed
from PLC.Filter import Filter
if isinstance(nodegroup_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in nodegroup_filter if isinstance(x, int)]
- strs = [x for x in nodegroup_filter if isinstance(x, StringTypes)]
+ strs = [x for x in nodegroup_filter if isinstance(x, str)]
nodegroup_filter = Filter(NodeGroup.fields, {'nodegroup_id': ints, 'groupname': strs})
sql += " AND (%s) %s" % nodegroup_filter.sql(api, "OR")
elif isinstance(nodegroup_filter, dict):
elif isinstance(nodegroup_filter, int):
nodegroup_filter = Filter(NodeGroup.fields, {'nodegroup_id': nodegroup_filter})
sql += " AND (%s) %s" % nodegroup_filter.sql(api, "AND")
- elif isinstance(nodegroup_filter, StringTypes):
+ elif isinstance(nodegroup_filter, str):
nodegroup_filter = Filter(NodeGroup.fields, {'groupname': nodegroup_filter})
sql += " AND (%s) %s" % nodegroup_filter.sql(api, "AND")
else:
# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import StringTypes
import re
from PLC.Faults import *
if isinstance(node_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in node_filter if isinstance(x, int)]
- strs = [x for x in node_filter if isinstance(x, StringTypes)]
+ strs = [x for x in node_filter if isinstance(x, str)]
node_filter = Filter(Node.fields, {'node_id': ints, 'hostname': strs})
sql += " AND (%s) %s" % node_filter.sql(api, "OR")
elif isinstance(node_filter, dict):
allowed_fields=dict(list(Node.fields.items())+list(Node.tags.items()))
node_filter = Filter(allowed_fields, node_filter)
sql += " AND (%s) %s" % node_filter.sql(api, "AND")
- elif isinstance (node_filter, StringTypes):
+ elif isinstance (node_filter, str):
node_filter = Filter(Node.fields, {'hostname':node_filter})
sql += " AND (%s) %s" % node_filter.sql(api, "AND")
elif isinstance (node_filter, int):
# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import StringTypes
-
from PLC.Faults import *
from PLC.Parameter import Parameter
from PLC.Table import Row, Table
if isinstance(pcu_type_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in pcu_type_filter if isinstance(x, int)]
- strs = [x for x in pcu_type_filter if isinstance(x, StringTypes)]
+ strs = [x for x in pcu_type_filter if isinstance(x, str)]
pcu_type_filter = Filter(PCUType.fields, {'pcu_type_id': ints, 'model': strs})
sql += " AND (%s) %s" % pcu_type_filter.sql(api, "OR")
elif isinstance(pcu_type_filter, dict):
pcu_type_filter = Filter(PCUType.fields, pcu_type_filter)
sql += " AND (%s) %s" % pcu_type_filter.sql(api, "AND")
- elif isinstance (pcu_type_filter, StringTypes):
+ elif isinstance (pcu_type_filter, str):
pcu_type_filter = Filter(PCUType.fields, {'model':pcu_type_filter})
sql += " AND (%s) %s" % pcu_type_filter.sql(api, "AND")
elif isinstance (pcu_type_filter, int):
# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import *
+# from types import *
from PLC.Faults import *
class Parameter:
arg_type = python_type(arg)
- if arg_type == NoneType:
+ if arg_type is type(None):
return "nil"
- elif arg_type == IntType or arg_type == LongType:
+ elif arg_type is int:
return "int"
- elif arg_type == bool:
+ elif arg_type is bool:
return "boolean"
- elif arg_type == FloatType:
+ elif arg_type is float:
return "double"
- elif arg_type in StringTypes:
+ elif arg_type is str:
return "string"
- elif arg_type == ListType or arg_type == TupleType:
+ elif arg_type in (list, tuple):
return "array"
- elif arg_type == DictType:
+ elif arg_type is dict:
return "struct"
elif arg_type == Mixed:
# Not really an XML-RPC type but return "mixed" for
#
import re
-from types import StringTypes
import traceback
from urllib.parse import urlparse
if isinstance(peer_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in peer_filter if isinstance(x, int)]
- strs = [x for x in peer_filter if isinstance(x, StringTypes)]
+ strs = [x for x in peer_filter if isinstance(x, str)]
peer_filter = Filter(Peer.fields, {'peer_id': ints, 'peername': strs})
sql += " AND (%s) %s" % peer_filter.sql(api, "OR")
elif isinstance(peer_filter, dict):
elif isinstance(peer_filter, int):
peer_filter = Filter(Peer.fields, {'peer_id': peer_filter})
sql += " AND (%s) %s" % peer_filter.sql(api, "AND")
- elif isinstance(peer_filter, StringTypes):
+ elif isinstance(peer_filter, str):
peer_filter = Filter(Peer.fields, {'peername': peer_filter})
sql += " AND (%s) %s" % peer_filter.sql(api, "AND")
else:
# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import StringTypes
try:
from hashlib import md5
except ImportError:
if isinstance(person_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in person_filter if isinstance(x, int)]
- strs = [x for x in person_filter if isinstance(x, StringTypes)]
+ strs = [x for x in person_filter if isinstance(x, str)]
person_filter = Filter(Person.fields, {'person_id': ints, 'email': strs})
sql += " AND (%s) %s" % person_filter.sql(api, "OR")
elif isinstance(person_filter, dict):
allowed_fields=dict(list(Person.fields.items())+list(Person.tags.items()))
person_filter = Filter(allowed_fields, person_filter)
sql += " AND (%s) %s" % person_filter.sql(api, "AND")
- elif isinstance (person_filter, StringTypes):
+ elif isinstance (person_filter, str):
person_filter = Filter(Person.fields, {'email':person_filter})
sql += " AND (%s) %s" % person_filter.sql(api, "AND")
elif isinstance (person_filter, int):
#
-# PostgreSQL database interface.
+# PostgreSQL database interface.
# Sort of like DBI(3) (Database independent interface for Perl).
#
# Mark Huang <mlhuang@cs.princeton.edu>
psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)
import types
-from types import StringTypes, NoneType
import traceback
import subprocess
import re
x = str(x)
elif isinstance(x, str):
x = x.encode( 'utf-8' )
-
+
if isinstance(x, bytes):
x = "'%s'" % str(x).replace("\\", "\\\\").replace("'", "''")
elif isinstance(x, (int, float)):
@classmethod
def param(self, name, value):
# None is converted to the unquoted string NULL
- if isinstance(value, NoneType):
+ if isinstance(value, type(None)):
conversion = "s"
# True and False are also converted to unquoted strings
elif isinstance(value, bool):
conversion = "s"
elif isinstance(value, float):
conversion = "f"
- elif not isinstance(value, StringTypes):
+ elif not isinstance(value, str):
conversion = "d"
else:
conversion = "s"
# Copyright (C) 2006 The Trustees of Princeton University
#
-from types import StringTypes
from PLC.Faults import *
from PLC.Parameter import Parameter
from PLC.Filter import Filter
if isinstance(role_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in role_filter if isinstance(x, int)]
- strs = [x for x in role_filter if isinstance(x, StringTypes)]
+ strs = [x for x in role_filter if isinstance(x, str)]
role_filter = Filter(Role.fields, {'role_id': ints, 'name': strs})
sql += " AND (%s) %s" % role_filter.sql(api, "OR")
elif isinstance(role_filter, dict):
elif isinstance(role_filter, int):
role_filter = Filter(Role.fields, {'role_id': role_filter})
sql += " AND (%s) %s" % role_filter.sql(api, "AND")
- elif isinstance(role_filter, StringTypes):
+ elif isinstance(role_filter, str):
role_filter = Filter(Role.fields, {'name': role_filter})
sql += " AND (%s) %s" % role_filter.sql(api, "AND")
else:
-from types import StringTypes
import random
import base64
import time
if isinstance(session_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in session_filter if isinstance(x, int)]
- strs = [x for x in session_filter if isinstance(x, StringTypes)]
+ strs = [x for x in session_filter if isinstance(x, str)]
session_filter = Filter(Session.fields, {'person_id': ints, 'session_id': strs})
sql += " AND (%s) %s" % session_filter.sql(api, "OR")
elif isinstance(session_filter, dict):
elif isinstance(session_filter, int):
session_filter = Filter(Session.fields, {'person_id': session_filter})
sql += " AND (%s) %s" % session_filter.sql(api, "AND")
- elif isinstance(session_filter, StringTypes):
+ elif isinstance(session_filter, str):
session_filter = Filter(Session.fields, {'session_id': session_filter})
sql += " AND (%s) %s" % session_filter.sql(api, "AND")
else:
-from types import StringTypes
import string
from PLC.Faults import *
if isinstance(site_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in site_filter if isinstance(x, int)]
- strs = [x for x in site_filter if isinstance(x, StringTypes)]
+ strs = [x for x in site_filter if isinstance(x, str)]
site_filter = Filter(Site.fields, {'site_id': ints, 'login_base': strs})
sql += " AND (%s) %s" % site_filter.sql(api, "OR")
elif isinstance(site_filter, dict):
allowed_fields=dict(list(Site.fields.items())+list(Site.tags.items()))
site_filter = Filter(allowed_fields, site_filter)
sql += " AND (%s) %s" % site_filter.sql(api, "AND")
- elif isinstance (site_filter, StringTypes):
+ elif isinstance (site_filter, str):
site_filter = Filter(Site.fields, {'login_base':site_filter})
sql += " AND (%s) %s" % site_filter.sql(api, "AND")
elif isinstance (site_filter, int):
-from types import StringTypes
import time
import re
if isinstance(slice_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in slice_filter if isinstance(x, int)]
- strs = [x for x in slice_filter if isinstance(x, StringTypes)]
+ strs = [x for x in slice_filter if isinstance(x, str)]
slice_filter = Filter(Slice.fields, {'slice_id': ints, 'name': strs})
sql += " AND (%s) %s" % slice_filter.sql(api, "OR")
elif isinstance(slice_filter, dict):
allowed_fields=dict(list(Slice.fields.items())+list(Slice.tags.items()))
slice_filter = Filter(allowed_fields, slice_filter)
sql += " AND (%s) %s" % slice_filter.sql(api, "AND")
- elif isinstance (slice_filter, StringTypes):
+ elif isinstance (slice_filter, str):
slice_filter = Filter(Slice.fields, {'name':slice_filter})
sql += " AND (%s) %s" % slice_filter.sql(api, "AND")
elif isinstance (slice_filter, int):
-from types import StringTypes, IntType, LongType
import time
import calendar
if isinstance(items, (list, tuple, set)):
ints = [x for x in items if isinstance(x, int)]
- strs = [x for x in items if isinstance(x, StringTypes)]
+ strs = [x for x in items if isinstance(x, str)]
dicts = [x for x in items if isinstance(x, dict)]
return (ints, strs, dicts)
else:
insert is True:
# If primary key id is a serial int and it isnt included, get next id
- if self.fields[self.primary_key].type in (IntType, LongType) and \
- self.primary_key not in self:
+ if (self.fields[self.primary_key].type is int
+ and self.primary_key not in self):
pk_id = self.api.db.next_id(self.table_name, self.primary_key)
self[self.primary_key] = pk_id
db_fields[self.primary_key] = pk_id
.format(self.table_name,
", ".join(columns),
self.primary_key,
- self.api.db.param(self.primary_key, self[self.primary_key]))
+ self.api.db.param(self.primary_key, self[self.primary_key]))
self.api.db.do(sql, db_fields)
#
# Thierry Parmentelat - INRIA
#
-from types import StringTypes
-
from PLC.Faults import *
from PLC.Parameter import Parameter
from PLC.Filter import Filter
if isinstance(tag_type_filter, (list, tuple, set)):
# Separate the list into integers and strings
ints = [x for x in tag_type_filter if isinstance(x, int)]
- strs = [x for x in tag_type_filter if isinstance(x, StringTypes)]
+ strs = [x for x in tag_type_filter if isinstance(x, str)]
tag_type_filter = Filter(TagType.fields, {'tag_type_id': ints, 'tagname': strs})
sql += " AND (%s) %s" % tag_type_filter.sql(api, "OR")
elif isinstance(tag_type_filter, dict):
elif isinstance(tag_type_filter, int):
tag_type_filter = Filter(TagType.fields, {'tag_type_id':tag_type_filter})
sql += " AND (%s) %s" % tag_type_filter.sql(api, "AND")
- elif isinstance(tag_type_filter, StringTypes):
+ elif isinstance(tag_type_filter, str):
tag_type_filter = Filter(TagType.fields, {'tagname':tag_type_filter})
sql += " AND (%s) %s" % tag_type_filter.sql(api, "AND")
else:
# natively marshalled over xmlrpc
#
-from types import StringTypes
import time, calendar
import datetime
Returns a GMT timestamp string suitable to feed SQL.
"""
- if not timezone: output_format = Timestamp.sql_format
- else: output_format = Timestamp.sql_format_utc
+ output_format = (Timestamp.sql_format_utc if timezone
+ else Timestamp.sql_format)
- if Timestamp.debug: print('sql_validate, in:',input, end=' ')
- if isinstance(input, StringTypes):
- sql=''
+ if Timestamp.debug:
+ print('sql_validate, in:', input, end=' ')
+ if isinstance(input, str):
+ sql = ''
# calendar.timegm() is the inverse of time.gmtime()
for time_format in Timestamp.input_formats:
try:
sql = time.strftime(output_format, time.gmtime(timestamp))
break
# wrong format: ignore
- except ValueError: pass
+ except ValueError:
+ pass
# could not parse it
if not sql:
raise PLCInvalidArgument("Cannot parse timestamp %r - not in any of %r formats"%(input,Timestamp.input_formats))
- elif isinstance (input,(int,float)):
+ elif isinstance(input, (int, float)):
try:
timestamp = int(input)
sql = time.strftime(output_format, time.gmtime(timestamp))
@staticmethod
- def cast_long (input):
+ def cast_long(input):
"""
Translates input timestamp as a unix timestamp.
00:00:00 GMT), a string (in one of the supported input formats above).
"""
- if Timestamp.debug: print('cast_long, in:',input, end=' ')
- if isinstance(input, StringTypes):
- timestamp=0
+ if Timestamp.debug:
+ print('cast_long, in:', input, end=' ')
+ if isinstance(input, str):
+ timestamp = 0
for time_format in Timestamp.input_formats:
try:
- result=calendar.timegm(time.strptime(input, time_format))
- if Timestamp.debug: print('out:',result)
+ result = calendar.timegm(time.strptime(input, time_format))
+ if Timestamp.debug:
+ print('out:', result)
return result
# wrong format: ignore
- except ValueError: pass
- raise PLCInvalidArgument("Cannot parse timestamp %r - not in any of %r formats"%(input,Timestamp.input_formats))
- elif isinstance (input,(int,float)):
- result=int(input)
- if Timestamp.debug: print('out:',result)
+ except ValueError:
+ pass
+ raise PLCInvalidArgument("Cannot parse timestamp %r - not in any of %r formats"%(input, Timestamp.input_formats))
+ elif isinstance(input, (int, float)):
+ result = int(input)
+ if Timestamp.debug:
+ print('out:',result)
return result
else:
raise PLCInvalidArgument("Timestamp %r - unsupported type %r"%(input,type(input)))
@staticmethod
def to_string(duration):
result=[]
- left=duration
- (days,left) = divmod(left,Duration.DAY)
- if days: result.append("%d d)"%td.days)
- (hours,left) = divmod (left,Duration.HOUR)
- if hours: result.append("%d h"%hours)
+ left = duration
+ (days, left) = divmod(left, Duration.DAY)
+ if days:
+ result.append("%d d)"%td.days)
+ (hours, left) = divmod (left, Duration.HOUR)
+ if hours:
+ result.append("%d h"%hours)
(minutes, seconds) = divmod (left, Duration.MINUTE)
if minutes: result.append("%d m"%minutes)
if seconds: result.append("%d s"%seconds)
def validate (duration):
# support seconds only for now, works for int/long/str
try:
- return int (duration)
+ return int(duration)
except:
raise PLCInvalidArgument("Could not parse duration %r"%duration)