import inspect
import six
from syncano.exceptions import SyncanoDoesNotExist, SyncanoValidationError
from . import fields
from .manager import Manager
from .options import Options
from .registry import registry
class ModelMetaclass(type):
"""Metaclass for all models.
[docs] """
def __init__(self, **kwargs):
self.is_lazy = kwargs.pop('is_lazy', False)
self._raw_data = {}
self.to_python(kwargs)
def __repr__(self):
"""Displays current instance class name and pk.
"""
return '<{0}: {1}>'.format(
self.__class__.__name__,
self.pk
)
def __str__(self):
"""Wrapper around ```repr`` method.
"""
return repr(self)
def __unicode__(self):
"""Wrapper around ```repr`` method with proper encoding.
"""
return six.u(repr(self))
def __eq__(self, other):
if isinstance(other, Model):
return self.pk == other.pk
return NotImplemented
def _get_connection(self, **kwargs):
connection = kwargs.pop('connection', None)
return connection or self._meta.connection
def save(self, **kwargs):
"""
[docs] Creates or updates the current instance.
Override this in a subclass if you want to control the saving process.
"""
self.validate()
data = self.to_native()
connection = self._get_connection(**kwargs)
properties = self.get_endpoint_data()
endpoint_name = 'list'
method = 'POST'
if not self.is_new():
endpoint_name = 'detail'
methods = self._meta.get_endpoint_methods(endpoint_name)
if 'put' in methods:
method = 'PUT'
endpoint = self._meta.resolve_endpoint(endpoint_name, properties, method)
if 'expected_revision' in kwargs:
data.update({'expected_revision': kwargs['expected_revision']})
request = {'data': data}
if not self.is_lazy:
response = connection.request(method, endpoint, **request)
self.to_python(response)
return self
return self.batch_object(method=method, path=endpoint, body=request['data'], properties=data)
@classmethod
def batch_object(cls, method, path, body, properties=None):
properties = properties if properties else {}
[docs] return {
'body': {
'method': method,
'path': path,
'body': body,
},
'meta': {
'model': cls,
'properties': properties
}
}
def mark_for_batch(self):
self.is_lazy = True
[docs]
def delete(self, **kwargs):
"""Removes the current instance.
[docs] """
if self.is_new():
raise SyncanoValidationError('Method allowed only on existing model.')
properties = self.get_endpoint_data()
http_method = 'DELETE'
endpoint = self._meta.resolve_endpoint('detail', properties, http_method)
connection = self._get_connection(**kwargs)
connection.request(http_method, endpoint)
if self.__class__.__name__ == 'Instance': # avoid circular import;
registry.clear_used_instance()
self._raw_data = {}
def reload(self, **kwargs):
"""Reloads the current instance.
[docs] """
if self.is_new():
raise SyncanoValidationError('Method allowed only on existing model.')
properties = self.get_endpoint_data()
http_method = 'GET'
endpoint = self._meta.resolve_endpoint('detail', properties, http_method)
connection = self._get_connection(**kwargs)
response = connection.request(http_method, endpoint)
self.to_python(response)
def validate(self):
"""
[docs] Validates the current instance.
:raises: SyncanoValidationError, SyncanoFieldError
"""
for field in self._meta.fields:
if not field.read_only:
value = getattr(self, field.name)
field.validate(value, self)
def is_valid(self):
try:
[docs] self.validate()
except SyncanoValidationError:
return False
else:
return True
def is_new(self):
if 'links' in self._meta.field_names:
[docs] return not self.links
if self._meta.pk.read_only and not self.pk:
return True
return False
def to_python(self, data):
"""
[docs] Converts raw data to python types and built-in objects.
:type data: dict
:param data: Raw data
"""
for field in self._meta.fields:
field_name = field.name
# some explanation needed here:
# When data comes from Syncano Platform the 'class' field is there
# so to map correctly the 'class' value to the 'class_name' field
# the mapping is required.
# But. When DataEndpoint (and probably others models with mapping) is created from
# syncano LIB directly: DataEndpoint(class_name='some_class')
# the data dict has only 'class_name' key - not the 'class',
# later the transition between class_name and class is made in to_native on model;
if field.mapping is not None and field.mapping in data and self.is_new():
field_name = field.mapping
if field_name in data:
value = data[field_name]
setattr(self, field.name, value)
if isinstance(field, fields.RelationField):
setattr(self, "{}_set".format(field_name), field(instance=self, field_name=field_name))
def to_native(self):
"""Converts the current instance to raw data which
[docs] can be serialized to JSON and send to API.
"""
data = {}
for field in self._meta.fields:
if not field.read_only and field.has_data:
value = getattr(self, field.name)
if value is None and field.blank:
continue
if field.mapping:
data[field.mapping] = field.to_native(value)
else:
param_name = getattr(field, 'param_name', field.name)
if param_name == 'files' and param_name in data:
data[param_name].update(field.to_native(value))
else:
data[param_name] = field.to_native(value)
return data
def get_endpoint_data(self):
properties = {}
[docs] for field in self._meta.fields:
if field.has_endpoint_data:
properties[field.name] = getattr(self, field.name)
return properties