import json
from copy import deepcopy
import six
from syncano.connection import ConnectionMixin
from syncano.exceptions import SyncanoRequestError, SyncanoValidationError, SyncanoValueError
from syncano.models.bulk import ModelBulkCreate, ObjectBulkCreate
from syncano.models.manager_mixins import ArrayOperationsMixin, IncrementMixin, clone
from .registry import registry
# The maximum number of items to display in a Manager.__repr__
REPR_OUTPUT_SIZE = 20
[docs]class ManagerDescriptor(object):
def __init__(self, manager):
self.manager = manager
def __get__(self, instance, owner=None):
if instance is not None:
raise AttributeError("Manager isn't accessible via {0} instances.".format(owner.__name__))
return self.manager.all()
[docs]class Manager(ConnectionMixin):
"""Base class responsible for all ORM (``please``) actions."""
BATCH_URI = '/v1.1/instances/{name}/batch/'
def __init__(self):
self.name = None
self.model = None
self.endpoint = None
self.properties = {}
self.method = None
self.query = {}
self.data = {}
self.is_lazy = False
self._filter_kwargs = {}
self._limit = None
self._serialize = True
self._connection = None
self._template = None
def __repr__(self): # pragma: no cover
data = list(self[:REPR_OUTPUT_SIZE + 1])
if len(data) > REPR_OUTPUT_SIZE:
data[-1] = '...(remaining elements truncated)...'
return repr(data)
def __str__(self): # pragma: no cover
return '<Manager: {0}>'.format(self.model.__name__)
def __unicode__(self): # pragma: no cover
return six.u(str(self))
def __len__(self): # pragma: no cover
return self.iterator()
def __iter__(self): # pragma: no cover
return iter(self.iterator())
def __nonzero__(self):
try:
self[0]
return True
except IndexError:
return False
def __bool__(self): # pragma: no cover
try:
self[0]
return True
except IndexError:
return False
def __getitem__(self, k):
"""
Retrieves an item or slice from the set of results.
"""
if not isinstance(k, (slice,) + six.integer_types):
raise TypeError
assert ((not isinstance(k, slice) and (k >= 0)) or
(isinstance(k, slice) and (k.start is None or k.start >= 0) and
(k.stop is None or k.stop >= 0))), \
"Negative indexing is not supported."
manager = self._clone()
if isinstance(k, slice):
if k.stop is not None:
manager.limit(int(k.stop) + 1)
return list(manager)[k.start:k.stop:k.step]
manager.limit(k + 1)
return list(manager)[k]
def _set_default_properties(self, endpoint_properties):
for field in self.model._meta.fields:
is_demanded = field.name in endpoint_properties
has_default = field.default is not None
if is_demanded and has_default:
self.properties[field.name] = field.default
[docs] def as_batch(self):
self.is_lazy = True
return self
[docs] def batch(self, *args):
"""
A convenience method for making a batch request. Only create, update and delete manager method are supported.
Batch request are limited to 50. So the args length should be equal or less than 50.
Usage::
klass = instance.classes.get(name='some_class')
Object.please.batch(
klass.objects.as_batch().delete(id=652),
klass.objects.as_batch().delete(id=653),
...
)
and::
Object.please.batch(
klass.objects.as_batch().update(id=652, arg='some_b'),
klass.objects.as_batch().update(id=653, arg='some_b'),
...
)
and::
Object.please.batch(
klass.objects.as_batch().create(arg='some_c'),
klass.objects.as_batch().create(arg='some_c'),
...
)
and::
Object.please.batch(
klass.objects.as_batch().delete(id=653),
klass.objects.as_batch().update(id=652, arg='some_a'),
klass.objects.as_batch().create(arg='some_c'),
...
)
are posible.
But::
Object.please.batch(
klass.objects.as_batch().get_or_create(id=653, arg='some_a')
)
will not work as expected.
Some snippet for working with instance users::
instance = Instance.please.get(name='Nabuchodonozor')
model_users = instance.users.batch(
instance.users.as_batch().delete(id=7),
instance.users.as_batch().update(id=9, username='username_a'),
instance.users.as_batch().create(username='username_b', password='5432'),
...
)
And sample response will be::
[{u'code': 204}, <User: 9>, <User: 11>, ...]
:param args: a arg is on of the: klass.objects.as_batch().create(...), klass.objects.as_batch().update(...),
klass.objects.as_batch().delete(...)
:return: a list with objects corresponding to batch arguments; update and create will return a populated Object,
when delete return a raw response from server (usually a dict: {'code': 204}, sometimes information about not
found resource to delete);
"""
# firstly turn off lazy mode:
self.is_lazy = False
meta = []
requests = []
for arg in args:
if isinstance(arg, list): # update now can return a list;
for nested_arg in arg:
meta.append(nested_arg['meta'])
requests.append(nested_arg['body'])
else:
meta.append(arg['meta'])
requests.append(arg['body'])
response = self.connection.request(
'POST',
self.BATCH_URI.format(name=registry.instance_name),
**{'data': {'requests': requests}}
)
populated_response = []
for meta, res in zip(meta, response):
if res['code'] in [200, 201]: # success response: update or create;
content = res['content']
model = meta['model']
properties = meta['properties']
content.update(properties)
populated_response.append(model(**content))
else:
populated_response.append(res)
return populated_response
# Object actions
[docs] def create(self, **kwargs):
"""
A convenience method for creating an object and saving it all in one step. Thus::
instance = Instance.please.create(name='test-one', description='description')
and::
instance = Instance(name='test-one', description='description')
instance.save()
are equivalent.
"""
data = self.properties.copy()
attrs = kwargs.copy()
data.update(attrs)
data.update({'is_lazy': self.is_lazy})
instance = self._get_instance(data)
if instance.__class__.__name__ == 'Instance':
registry.set_used_instance(instance.name)
saved_instance = instance.save()
if not self.is_lazy:
return instance
return saved_instance
[docs] def bulk_create(self, *objects):
"""
Creates many new instances based on provided list of objects.
Usage::
instance = Instance.please.get(name='instance_a')
instances = instance.users.bulk_create(
User(username='user_a', password='1234'),
User(username='user_b', password='4321')
)
Warning::
This method is restricted to handle 50 objects at once.
"""
return ModelBulkCreate(objects, self).process()
@clone
[docs] def get(self, *args, **kwargs):
"""
Returns the object matching the given lookup parameters.
Usage::
instance = Instance.please.get('test-one')
instance = Instance.please.get(name='test-one')
"""
self.method = 'GET'
self.endpoint = 'detail'
self._filter(*args, **kwargs)
return self.request()
@clone
[docs] def in_bulk(self, object_ids_list, **kwargs):
"""
A method which allows to bulk get objects;
Use::
response = Classes.please.in_bulk(['test_class', ...])
response is:
> {'test_class': <Class: test_class>}
For objects:
res = Object.please.in_bulk([1, 2], class_name='test_class')
or
res = klass.objects.in_bulk([1, 2])
response is:
{1: <SyncanoTestClassObject: 1>, 2: {u'content': {u'detail': u'Not found.'}, u'code': 404}}
:param object_ids_list: This list expects the primary keys - id in api, a names, ids can be used here;
:return: a dict in which keys are the object_ids_list elements, and values are a populated objects;
"""
self.properties.update(kwargs)
path, defaults = self._get_endpoint_properties()
requests = [
{'method': 'GET', 'path': '{path}{id}/'.format(path=path, id=object_id)} for object_id in object_ids_list
]
response = self.connection.request(
'POST',
self.BATCH_URI.format(name=registry.instance_name),
**{'data': {'requests': requests}}
)
bulk_response = {}
for object_id, object in zip(object_ids_list, response):
if object['code'] == 200:
data = object['content'].copy()
data.update(self.properties)
bulk_response[object_id] = self.model(**data)
else:
bulk_response[object_id] = object
return bulk_response
[docs] def detail(self, *args, **kwargs):
"""
Wrapper around ``get`` method.
Usage::
instance = Instance.please.detail('test-one')
instance = Instance.please.detail(name='test-one')
"""
return self.get(*args, **kwargs)
[docs] def get_or_create(self, **kwargs):
"""
A convenience method for looking up an object with the given
lookup parameters, creating one if necessary.
Returns a tuple of **(object, created)**, where **object** is the retrieved or
**created** object and created is a boolean specifying whether a new object was created.
This is meant as a shortcut to boilerplatish code. For example::
try:
instance = Instance.please.get(name='test-one')
except Instance.DoesNotExist:
instance = Instance(name='test-one', description='test')
instance.save()
The above example can be rewritten using **get_or_create()** like so::
instance, created = Instance.please.get_or_create(name='test-one', defaults={'description': 'test'})
"""
defaults = deepcopy(kwargs.pop('defaults', {}))
try:
instance = self.get(**kwargs)
except self.model.DoesNotExist:
defaults.update(kwargs)
instance = self.create(**defaults)
created = True
else:
created = False
return instance, created
@clone
[docs] def delete(self, *args, **kwargs):
"""
Removes single instance based on provided arguments.
Returns None if deletion went fine.
Usage::
Instance.please.delete('test-one')
Instance.please.delete(name='test-one')
"""
self.method = 'DELETE'
self.endpoint = 'detail'
self._filter(*args, **kwargs)
if not self.is_lazy:
return self.request()
path, defaults = self._get_endpoint_properties()
return self.model.batch_object(method=self.method, path=path, body=self.data, properties=defaults)
@clone
[docs] def filter(self, **kwargs):
endpoint_fields = [field.name for field in self.model._meta.fields if field.has_endpoint_data]
for kwarg_name in kwargs:
if kwarg_name not in endpoint_fields:
raise SyncanoValueError('Only endpoint properties can be used in filter: {}'.format(endpoint_fields))
self._filter_kwargs = kwargs
return self
@clone
[docs] def update(self, *args, **kwargs):
if self._filter_kwargs or self.query: # means that .filter() was run;
return self.new_update(**kwargs)
return self.old_update(*args, **kwargs)
@clone
[docs] def new_update(self, **kwargs):
"""
Updates multiple instances based on provided arguments. There to ways to do so:
1. Django-style update.
2. By specifying arguments.
Usage::
objects = Object.please.list(instance_name=INSTANCE_NAME,
class_name='someclass').filter(id=1).update(arg='103')
objects = Object.please.list(instance_name=INSTANCE_NAME,
class_name='someclass').filter(id=1).update(arg='103')
The return value is a list of objects;
"""
model_fields = [field.name for field in self.model._meta.fields if not field.has_endpoint_data]
for field_name in kwargs:
if field_name not in model_fields:
raise SyncanoValueError('This model has not field {}'.format(field_name))
self.endpoint = 'detail'
self.method = self.get_allowed_method('PATCH', 'PUT', 'POST')
self.data = kwargs.copy()
if self._filter_kwargs: # Manager context;
# do a single object update: Class, Instance for example;
self.data.update(self._filter_kwargs)
serialized = self._get_serialized_data()
self._filter(*(), **self.data) # sets the proper self.properties here
if not self.is_lazy:
return [self.serialize(self.request(), self.model)]
path, defaults = self._get_endpoint_properties()
return [self.model.batch_object(method=self.method, path=path, body=serialized, properties=defaults)]
instances = [] # ObjectManager context;
for obj in self:
self._filter(*(), **kwargs)
serialized = self._get_serialized_data()
self.properties.update({'id': obj.id})
path, defaults = self._get_endpoint_properties()
updated_instance = self.model.batch_object(method=self.method, path=path, body=serialized,
properties=defaults)
instances.append(updated_instance) # always a batch structure here;
if not self.is_lazy:
instances = self.batch(instances)
return instances
@clone
[docs] def old_update(self, *args, **kwargs):
"""
Updates single instance based on provided arguments. There to ways to do so:
1. Django-style update.
2. By specifying **data** argument.
The **data** is a dictionary of (field, value) pairs used to update the object.
Usage::
instance = Instance.please.update('test-one', description='new one')
instance = Instance.please.update(name='test-one', description='new one')
instance = Instance.please.update('test-one', data={'description': 'new one'})
instance = Instance.please.update(name='test-one', data={'description': 'new one'})
"""
self.endpoint = 'detail'
self.method = self.get_allowed_method('PATCH', 'PUT', 'POST')
data = kwargs.pop('data', {})
self.data = kwargs.copy()
self.data.update(data)
model = self.serialize(self.data, self.model)
serialized = model.to_native()
serialized = {k: v for k, v in six.iteritems(serialized)
if k in self.data}
self.data.update(serialized)
self._filter(*args, **kwargs)
if not self.is_lazy:
return self.request()
path, defaults = self._get_endpoint_properties()
return self.model.batch_object(method=self.method, path=path, body=self.data, properties=defaults)
[docs] def update_or_create(self, defaults=None, **kwargs):
"""
A convenience method for updating an object with the given parameters, creating a new one if necessary.
The ``defaults`` is a dictionary of (field, value) pairs used to update the object.
Returns a tuple of **(object, created)**, where object is the created or updated object and created
is a boolean specifying whether a new object was created.
The **update_or_create** method tries to fetch an object from Syncano API based on the given kwargs.
If a match is found, it updates the fields passed in the defaults dictionary.
This is meant as a shortcut to boilerplatish code. For example::
try:
instance = Instance.please.update(name='test-one', data=updated_values)
except Instance.DoesNotExist:
updated_values.update({'name': 'test-one'})
instance = Instance(**updated_values)
instance.save()
This pattern gets quite unwieldy as the number of fields in a model goes up.
The above example can be rewritten using **update_or_create()** like so::
instance, created = Instance.please.update_or_create(name='test-one',
defaults=updated_values)
"""
defaults = deepcopy(defaults or {})
try:
instance = self.update(**kwargs)
except self.model.DoesNotExist:
defaults.update(kwargs)
instance = self.create(**defaults)
created = True
else:
created = False
return instance, created
# List actions
@clone
[docs] def all(self, *args, **kwargs):
"""
Returns a copy of the current ``Manager`` with limit removed.
Usage::
instances = Instance.please.all()
"""
self._limit = None
return self.list(*args, **kwargs)
@clone
[docs] def list(self, *args, **kwargs):
"""
Returns a copy of the current ``Manager`` containing objects that match the given lookup parameters.
Usage::
instance = Instance.please.list()
classes = Class.please.list(instance_name='test-one')
"""
self.method = 'GET'
self.endpoint = 'list'
self._filter(*args, **kwargs)
return self
@clone
[docs] def first(self, *args, **kwargs):
"""
Returns the first object matched by the lookup parameters or None, if there is no matching object.
Usage::
instance = Instance.please.first()
classes = Class.please.first(instance_name='test-one')
"""
try:
self._limit = 1
return self.list(*args, **kwargs)[0]
except KeyError:
return None
@clone
[docs] def page_size(self, value):
"""
Sets page size.
Usage::
instances = Instance.please.page_size(20).all()
"""
if not value or not isinstance(value, six.integer_types):
raise SyncanoValueError('page_size value needs to be an int.')
self.query['page_size'] = value
return self
@clone
[docs] def limit(self, value):
"""
Sets limit of returned objects.
Usage::
instances = Instance.please.list().limit(10)
classes = Class.please.list(instance_name='test-one').limit(10)
"""
if not value or not isinstance(value, six.integer_types):
raise SyncanoValueError('Limit value needs to be an int.')
self._limit = value
return self
@clone
[docs] def ordering(self, order='asc'):
"""
Sets order of returned objects.
Usage::
instances = Instance.please.ordering()
"""
if order not in ('asc', 'desc'):
raise SyncanoValueError('Invalid order value.')
self.query['ordering'] = order
return self
@clone
[docs] def raw(self):
"""
Disables serialization. ``request`` method will return raw Python types.
Usage::
>>> instances = Instance.please.list().raw()
>>> instances
[{'description': 'new one', 'name': 'test-one'...}...]
"""
self._serialize = False
return self
@clone
[docs] def template(self, name):
"""
Disables serialization. ``request`` method will return raw text.
Usage::
>>> instances = Instance.please.list().template('test')
>>> instances
u'text'
"""
self._serialize = False
self._template = name
return self
@clone
[docs] def using(self, connection):
"""
Connection juggling.
"""
# ConnectionMixin will validate this
self.connection = connection
return self
# Other stuff
[docs] def contribute_to_class(self, model, name): # pragma: no cover
setattr(model, name, ManagerDescriptor(self))
self.model = model
if not self.name:
self.name = name
def _get_serialized_data(self):
model = self.serialize(self.data, self.model)
serialized = model.to_native()
serialized = {k: v for k, v in six.iteritems(serialized)
if k in self.data}
self.data.update(serialized)
return serialized
def _filter(self, *args, **kwargs):
properties = self.model._meta.get_endpoint_properties(self.endpoint)
self._set_default_properties(properties)
if args and self.endpoint:
# let user get object by 'id'
too_much_properties = len(args) < len(properties)
id_specified = 'id' in properties
if too_much_properties and id_specified:
properties = ['id']
mapped_args = {k: v for k, v in zip(properties, args)}
self.properties.update(mapped_args)
self.properties.update(kwargs)
def _clone(self):
# Maybe deepcopy ?
manager = self.__class__()
manager.name = self.name
manager.model = self.model
manager._connection = self._connection
manager._template = self._template
manager.endpoint = self.endpoint
manager.properties = deepcopy(self.properties)
manager._limit = self._limit
manager.method = self.method
manager.query = deepcopy(self.query)
manager._filter_kwargs = deepcopy(self._filter_kwargs)
manager.data = deepcopy(self.data)
manager._serialize = self._serialize
manager.is_lazy = self.is_lazy
return manager
[docs] def serialize(self, data, model=None):
"""Serializes passed data to related :class:`~syncano.models.base.Model` class."""
model = model or self.model
if data == '':
return
if isinstance(data, model):
return data
if not isinstance(data, dict):
raise SyncanoValueError('Unsupported data type.')
properties = deepcopy(self.properties)
properties.update(data)
return model(**properties) if self._serialize else data
[docs] def build_request(self, request):
if 'params' not in request and self.query:
request['params'] = self.query
if 'data' not in request and self.data:
request['data'] = self.data
if 'headers' not in request:
request['headers'] = {}
if self._template is not None and 'X-TEMPLATE-RESPONSE' not in request['headers']:
request['headers']['X-TEMPLATE-RESPONSE'] = self._template
[docs] def request(self, method=None, path=None, **request):
"""Internal method, which calls Syncano API and returns serialized data."""
meta = self.model._meta
method = method or self.method
allowed_methods = meta.get_endpoint_methods(self.endpoint)
if not path:
path, defaults = self._get_endpoint_properties()
if method.lower() not in allowed_methods:
methods = ', '.join(allowed_methods)
raise SyncanoValueError('Unsupported request method "{0}" allowed are {1}.'.format(method, methods))
self.build_request(request)
try:
response = self.connection.request(method, path, **request)
except SyncanoRequestError as e:
if e.status_code == 404:
obj_id = path.rsplit('/')[-2]
raise self.model.DoesNotExist("{} not found.".format(obj_id))
raise
if 'next' not in response and not self._template:
return self.serialize(response)
return response
[docs] def get_allowed_method(self, *methods):
meta = self.model._meta
allowed_methods = meta.get_endpoint_methods(self.endpoint)
for method in methods:
if method.lower() in allowed_methods:
return method
methods = ', '.join(methods)
raise SyncanoValueError('Unsupported request methods {0}.'.format(methods))
[docs] def iterator(self):
"""Pagination handler"""
response = self._get_response()
results = 0
while True:
if self._template:
yield response
break
objects = response.get('objects')
next_url = response.get('next')
for o in objects:
if self._limit and results >= self._limit:
break
results += 1
yield self.serialize(o)
if not objects or not next_url or (self._limit and results >= self._limit):
break
response = self.request(path=next_url)
def _get_response(self):
return self.request()
def _get_instance(self, attrs):
return self.model(**attrs)
def _get_endpoint_properties(self):
defaults = {f.name: f.default for f in self.model._meta.fields if f.default is not None}
defaults.update(self.properties)
return self.model._meta.resolve_endpoint(self.endpoint, defaults), defaults
[docs]class ScriptManager(Manager):
"""
Custom :class:`~syncano.models.manager.Manager`
class for :class:`~syncano.models.base.Script` model.
"""
@clone
[docs] def run(self, *args, **kwargs):
payload = kwargs.pop('payload', {})
if not isinstance(payload, six.string_types):
payload = json.dumps(payload)
self.method = 'POST'
self.endpoint = 'run'
self.data['payload'] = payload
self._filter(*args, **kwargs)
self._serialize = False
response = self.request()
return registry.ScriptTrace(**response)
[docs]class ScriptEndpointManager(Manager):
"""
Custom :class:`~syncano.models.manager.Manager`
class for :class:`~syncano.models.base.ScriptEndpoint` model.
"""
@clone
[docs] def run(self, *args, **kwargs):
payload = kwargs.pop('payload', {})
if not isinstance(payload, six.string_types):
payload = json.dumps(payload)
self.method = 'POST'
self.endpoint = 'run'
self.data['payload'] = payload
self._filter(*args, **kwargs)
self._serialize = False
response = self.request()
# Workaround for circular import
return registry.ScriptEndpointTrace(**response)
[docs]class ObjectManager(IncrementMixin, ArrayOperationsMixin, Manager):
"""
Custom :class:`~syncano.models.manager.Manager`
class for :class:`~syncano.models.base.Object` model.
"""
LOOKUP_SEPARATOR = '__'
ALLOWED_LOOKUPS = [
'gt', 'gte', 'lt', 'lte',
'eq', 'neq', 'exists', 'in', 'nin',
'near', 'is', 'contains',
'startswith', 'endswith',
'contains', 'istartswith',
'iendswith', 'icontains',
'ieq', 'near',
]
def __init__(self):
super(ObjectManager, self).__init__()
self._initial_response = None
[docs] def serialize(self, data, model=None):
model = model or self.model.get_subclass_model(**self.properties)
return super(ObjectManager, self).serialize(data, model)
@clone
[docs] def count(self):
"""
Return the queryset count;
Usage::
Object.please.list(instance_name='raptor', class_name='some_class').filter(id__gt=600).count()
Object.please.list(instance_name='raptor', class_name='some_class').count()
Object.please.all(instance_name='raptor', class_name='some_class').count()
:return: The count of the returned objects: count = DataObjects.please.list(...).count();
"""
self.method = 'GET'
self.query.update({
'include_count': True,
'page_size': 0,
})
response = self.request()
return response['objects_count']
@clone
[docs] def with_count(self, page_size=20):
"""
Return the queryset with count;
Usage::
Object.please.list(instance_name='raptor', class_name='some_class').filter(id__gt=600).with_count()
Object.please.list(instance_name='raptor', class_name='some_class').with_count(page_size=30)
Object.please.all(instance_name='raptor', class_name='some_class').with_count()
:param page_size: The size of the pagination; Default to 20;
:return: The tuple with objects and the count: objects, count = DataObjects.please.list(...).with_count();
"""
query_data = {
'include_count': True,
'page_size': page_size,
}
self.method = 'GET'
self.query.update(query_data)
response = self.request()
self._initial_response = response
return self, self._initial_response['objects_count']
@clone
[docs] def filter(self, **kwargs):
"""
Special method just for data object :class:`~syncano.models.base.Object` model.
Usage::
objects = Object.please.list('instance-name', 'class-name').filter(henryk__gte='hello')
"""
query = self._build_query(query_data=kwargs)
self.query['query'] = json.dumps(query)
self.method = 'GET'
self.endpoint = 'list'
return self
def _build_query(self, query_data, **kwargs):
query = {}
self.properties.update(**kwargs)
model = self.model.get_subclass_model(**self.properties)
for field_name, value in six.iteritems(query_data):
lookup = 'eq'
model_name = None
if self.LOOKUP_SEPARATOR in field_name:
model_name, field_name, lookup = self._get_lookup_attributes(field_name)
# if filter is made on relation field: relation__name__eq='test';
if model_name:
for field in model._meta.fields:
if field.name == model_name:
break
# if filter is made on normal field: name__eq='test';
else:
for field in model._meta.fields:
if field.name == field_name:
break
self._validate_lookup(model, model_name, field_name, lookup, field)
query_main_lookup, query_main_field = self._get_main_lookup(model_name, field_name, lookup)
query.setdefault(query_main_field, {})
query[query_main_field]['_{0}'.format(query_main_lookup)] = field.to_query(
value,
query_main_lookup,
related_field_name=field_name,
related_field_lookup=lookup,
)
return query
def _get_lookup_attributes(self, field_name):
try:
model_name, field_name, lookup = field_name.split(self.LOOKUP_SEPARATOR, 2)
except ValueError:
model_name = None
field_name, lookup = field_name.split(self.LOOKUP_SEPARATOR, 1)
return model_name, field_name, lookup
def _validate_lookup(self, model, model_name, field_name, lookup, field):
if not model_name and field_name not in model._meta.field_names:
allowed = ', '.join(model._meta.field_names)
raise SyncanoValueError('Invalid field name "{0}" allowed are {1}.'.format(field_name, allowed))
if lookup not in self.ALLOWED_LOOKUPS:
allowed = ', '.join(self.ALLOWED_LOOKUPS)
raise SyncanoValueError('Invalid lookup type "{0}" allowed are {1}.'.format(lookup, allowed))
if model_name and field.__class__.__name__ != 'RelationField':
raise SyncanoValueError('Lookup supported only for RelationField.')
@classmethod
def _get_main_lookup(cls, model_name, field_name, lookup):
if model_name:
return 'is', model_name
else:
return lookup, field_name
[docs] def bulk_create(self, *objects):
"""
Creates many new objects.
Usage::
created_objects = Object.please.bulk_create(
Object(instance_name='instance_a', class_name='some_class', title='one'),
Object(instance_name='instance_a', class_name='some_class', title='two'),
Object(instance_name='instance_a', class_name='some_class', title='three')
)
:param objects: a list of the instances of data objects to be created;
:return: a created and populated list of objects; When error occurs a plain dict is returned in that place;
"""
return ObjectBulkCreate(objects, self).process()
def _get_response(self):
return self._initial_response or self.request()
def _get_instance(self, attrs):
return self.model.get_subclass_model(**attrs)(**attrs)
def _get_model_field_names(self):
object_fields = [f.name for f in self.model._meta.fields]
schema = self.model.get_class_schema(**self.properties)
return object_fields + [i['name'] for i in schema.schema]
def _validate_fields(self, model_fields, args):
for arg in args:
if arg not in model_fields:
msg = 'Field "{0}" does not exist in class {1}.'
raise SyncanoValidationError(
msg.format(arg, self.properties['class_name']))
@clone
[docs] def fields(self, *args):
"""
Special method just for data object :class:`~syncano.models.base.Object` model.
Usage::
objects = Object.please.list('instance-name', 'class-name').fields('name', 'id')
"""
model_fields = self._get_model_field_names()
self._validate_fields(model_fields, args)
self.query['fields'] = ','.join(args)
self.method = 'GET'
self.endpoint = 'list'
return self
@clone
[docs] def exclude(self, *args):
"""
Special method just for data object :class:`~syncano.models.base.Object` model.
Usage::
objects = Object.please.list('instance-name', 'class-name').exclude('avatar')
"""
model_fields = self._get_model_field_names()
self._validate_fields(model_fields, args)
fields = [f for f in model_fields if f not in args]
self.query['fields'] = ','.join(fields)
self.method = 'GET'
self.endpoint = 'list'
return self
[docs] def ordering(self, order=None):
raise AttributeError('Ordering not implemented. Use order_by instead.')
@clone
[docs] def order_by(self, field):
"""
Sets ordering field of returned objects.
Usage::
# ASC order
instances = Object.please.order_by('name')
# DESC order
instances = Object.please.order_by('-name')
"""
if not field or not isinstance(field, six.string_types):
raise SyncanoValueError('Order by field needs to be a string.')
self.query['order_by'] = field
return self
def _clone(self):
manager = super(ObjectManager, self)._clone()
manager._initial_response = self._initial_response
return manager
[docs]class SchemaManager(object):
"""
Custom :class:`~syncano.models.manager.Manager`
class for :class:`~syncano.models.fields.SchemaFiled`.
"""
def __init__(self, schema=None):
self.schema = schema or []
def __eq__(self, other):
if isinstance(other, SchemaManager):
return self.schema == other.schema
return NotImplemented
def __str__(self): # pragma: no cover
return str(self.schema)
def __repr__(self): # pragma: no cover
return '<SchemaManager>'
def __getitem__(self, key):
if isinstance(key, int):
return self.schema[key]
if isinstance(key, six.string_types):
for v in self.schema:
if v['name'] == key:
return v
raise KeyError
def __setitem__(self, key, value):
value = deepcopy(value)
value['name'] = key
self.remove(key)
self.add(value)
def __delitem__(self, key):
self.remove(key)
def __iter__(self):
return iter(self.schema)
def __contains__(self, item):
if not self.schema:
return False
return item in self.schema
[docs] def set(self, value):
"""Sets schema value."""
self.schema = value
[docs] def add(self, *objects):
"""Adds multiple objects to schema."""
self.schema.extend(objects)
[docs] def remove(self, *names):
"""Removes selected objects based on their names."""
values = [v for v in self.schema if v['name'] not in names]
self.set(values)
[docs] def clear(self):
"""Sets empty schema."""
self.set([])
[docs] def set_index(self, field, order=False, filter=False):
"""Sets index on selected field.
:type field: string
:param field: Name of schema field
:type filter: bool
:param filter: Sets filter index on selected field
:type order: bool
:param order: Sets order index on selected field
"""
if not order and not filter:
raise ValueError('Choose at least one index.')
if order:
self[field]['order_index'] = True
if filter:
self[field]['filter_index'] = True
[docs] def set_order_index(self, field):
"""Shortcut for ``set_index(field, order=True)``."""
self.set_index(field, order=True)
[docs] def set_filter_index(self, field):
"""Shortcut for ``set_index(field, filter=True)``."""
self.set_index(field, filter=True)
[docs] def remove_index(self, field, order=False, filter=False):
"""Removes index from selected field.
:type field: string
:param field: Name of schema field
:type filter: bool
:param filter: Removes filter index from selected field
:type order: bool
:param order: Removes order index from selected field
"""
if not order and not filter:
raise ValueError('Choose at least one index.')
if order and 'order_index' in self[field]:
del self[field]['order_index']
if filter and 'filter_index' in self[field]:
del self[field]['filter_index']
[docs] def remove_order_index(self, field):
"""Shortcut for ``remove_index(field, order=True)``."""
self.remove_index(field, order=True)
[docs] def remove_filter_index(self, field):
"""Shortcut for ``remove_index(field, filter=True)``."""
self.remove_index(field, filter=True)