import json
import time
from copy import deepcopy
import requests
import six
import syncano
from syncano.exceptions import RevisionMismatchException, SyncanoRequestError, SyncanoValueError
if six.PY3:
from urllib.parse import urljoin
else:
from urlparse import urljoin
__all__ = ['Connection', 'ConnectionMixin']
def is_success(code):
"""Checks if response code is successful."""
return 200 <= code <= 299
def is_client_error(code):
"""Checks if response code has client error."""
return 400 <= code <= 499
def is_server_error(code):
"""Checks if response code has server error."""
return 500 <= code <= 599
class DefaultConnection(object):
"""Singleton class which holds default connection."""
def __init__(self):
self._connection = None
def __call__(self):
if not self._connection:
raise SyncanoValueError('Please open new connection.')
return self._connection
def open(self, *args, **kwargs):
connection = Connection(*args, **kwargs)
if not self._connection:
self._connection = connection
return connection
[docs]class Connection(object):
"""Base connection class.
:ivar host: Syncano API host
:ivar email: Your Syncano email address
:ivar password: Your Syncano password
:ivar api_key: Your Syncano ``Account Key`` or instance ``Api Key``
:ivar user_key: Your Syncano ``User Key``
:ivar instance_name: Your Syncano ``Instance Name``
:ivar logger: Python logger instance
:ivar timeout: Default request timeout
:ivar verify_ssl: Verify SSL certificate
"""
CONTENT_TYPE = 'application/json'
AUTH_SUFFIX = 'v1.1/account/auth'
ACCOUNT_SUFFIX = 'v1.1/account/'
SOCIAL_AUTH_SUFFIX = AUTH_SUFFIX + '/{social_backend}/'
USER_AUTH_SUFFIX = 'v1.1/instances/{name}/user/auth/'
USER_INFO_SUFFIX = 'v1.1/instances/{name}/user/'
REGISTER_SUFFIX = 'v1.1/account/register/'
LOGIN_PARAMS = {'email',
'password'}
ALT_LOGIN_PARAMS = {'api_key'}
USER_LOGIN_PARAMS = {'username',
'password',
'api_key',
'instance_name'}
USER_ALT_LOGIN_PARAMS = {'user_key',
'api_key',
'instance_name'}
SOCIAL_LOGIN_PARAMS = {'token',
'social_backend'}
def __init__(self, host=None, **kwargs):
self.host = host or syncano.API_ROOT
self.logger = kwargs.get('logger', syncano.logger)
self.timeout = kwargs.get('timeout', 30)
# We don't need to check SSL cert in DEBUG mode
self.verify_ssl = kwargs.pop('verify_ssl', True)
self._init_login_params(kwargs)
if self.is_user:
self.AUTH_SUFFIX = self.USER_AUTH_SUFFIX.format(name=self.instance_name)
self.auth_method = self.authenticate_user
else:
if self.is_social:
self.AUTH_SUFFIX = self.SOCIAL_AUTH_SUFFIX.format(social_backend=self.social_backend)
self.auth_method = self.authenticate_admin
self.session = requests.Session()
def _init_login_params(self, login_kwargs):
for param in self.LOGIN_PARAMS.union(self.ALT_LOGIN_PARAMS,
self.USER_LOGIN_PARAMS,
self.USER_ALT_LOGIN_PARAMS,
self.SOCIAL_LOGIN_PARAMS):
def_name = param.replace('_', '').upper()
value = login_kwargs.get(param, getattr(syncano, def_name, None))
setattr(self, param, value)
def _are_params_ok(self, params):
return all(getattr(self, p) for p in params)
@property
[docs] def is_user(self):
login_params_ok = self._are_params_ok(self.USER_LOGIN_PARAMS)
alt_login_params_ok = self._are_params_ok(self.USER_ALT_LOGIN_PARAMS)
return login_params_ok or alt_login_params_ok
@property
[docs] def is_social(self):
return self._are_params_ok(self.SOCIAL_LOGIN_PARAMS)
@property
[docs] def is_alt_login(self):
if self.is_user:
return self._are_params_ok(self.USER_ALT_LOGIN_PARAMS)
return self._are_params_ok(self.ALT_LOGIN_PARAMS)
@property
[docs] def auth_key(self):
if self.is_user:
return self.user_key
return self.api_key
[docs] def build_params(self, params):
"""
:type params: dict
:param params: Params which will be passed to request
:rtype: dict
:return: Request params
"""
params = deepcopy(params)
params['timeout'] = params.get('timeout', self.timeout)
params['headers'] = params.get('headers', {})
params['verify'] = self.verify_ssl
if 'content-type' not in params['headers']:
params['headers']['content-type'] = self.CONTENT_TYPE
if self.is_user:
params['headers'].update({
'X-USER-KEY': self.user_key,
'X-API-KEY': self.api_key
})
elif self.api_key and 'Authorization' not in params['headers']:
params['headers']['Authorization'] = 'token {}'.format(self.api_key)
# We don't need to check SSL cert in DEBUG mode
if syncano.DEBUG or not self.verify_ssl:
params['verify'] = False
return params
[docs] def build_url(self, path):
"""Ensures proper format for provided path.
:type path: string
:param path: Request path
:rtype: string
:return: Request URL
"""
if not isinstance(path, six.string_types):
raise SyncanoValueError('"path" should be a string.')
query = None
if path.startswith(self.host):
return path
if '?' in path:
path, query = path.split('?', 1)
if not path.endswith('/'):
path += '/'
if path.startswith('/'):
path = path[1:]
if query:
path = '{0}?{1}'.format(path, query)
return urljoin(self.host, path)
[docs] def request(self, method_name, path, **kwargs):
"""Simple wrapper around :func:`~syncano.connection.Connection.make_request` which
will ensure that request is authenticated.
:type method_name: string
:param method_name: HTTP request method e.g: GET
:type path: string
:param path: Request path or full URL
:rtype: dict
:return: JSON response
"""
is_auth = self.is_authenticated()
if not is_auth:
self.authenticate()
return self.make_request(method_name, path, **kwargs)
[docs] def make_request(self, method_name, path, **kwargs):
"""
:type method_name: string
:param method_name: HTTP request method e.g: GET
:type path: string
:param path: Request path or full URL
:rtype: dict
:return: JSON response
:raises SyncanoValueError: if invalid request method was chosen
:raises SyncanoRequestError: if something went wrong during the request
"""
data = kwargs.get('data', {})
files = data.pop('files', None)
self._check_batch_files(data)
if files is None:
files = {k: v for k, v in six.iteritems(data) if hasattr(v, 'read')}
if data:
kwargs['data'] = {k: v for k, v in six.iteritems(data) if k not in files}
params = self.build_params(kwargs)
method = getattr(self.session, method_name.lower(), None)
# JSON dump can be expensive
if syncano.DEBUG:
debug_params = params.copy()
debug_params.update({'files': [f for f in files]}) # show files in debug info;
formatted_params = json.dumps(
debug_params,
sort_keys=True,
indent=2,
separators=(',', ': ')
)
self.logger.debug('API Root: %s', self.host)
self.logger.debug('Request: %s %s\n%s', method_name, path, formatted_params)
if method is None:
raise SyncanoValueError('Invalid request method: {0}.'.format(method_name))
# Encode request payload
if 'data' in params and not isinstance(params['data'], six.string_types):
params['data'] = json.dumps(params['data'])
url = self.build_url(path)
response = method(url, **params)
while response.status_code == 429: # throttling;
retry_after = response.headers.get('retry-after', 1)
time.sleep(float(retry_after))
response = method(url, **params)
content = self.get_response_content(url, response)
if files:
# remove 'data' and 'content-type' to avoid "ValueError: Data must not be a string."
params.pop('data')
params['headers'].pop('content-type')
params['files'] = self._process_apns_cert_files(files)
if response.status_code == 201:
url = '{}{}/'.format(url, content['id'])
patch = getattr(self.session, 'patch')
# second request is needed to upload a file
response = patch(url, **params)
content = self.get_response_content(url, response)
return content
[docs] def get_response_content(self, url, response):
try:
content = response.json()
except ValueError:
content = response.text
if is_server_error(response.status_code):
raise SyncanoRequestError(response.status_code, 'Server error.')
# Validation error
if is_client_error(response.status_code):
if response.status_code == 400 and 'expected_revision' in content:
raise RevisionMismatchException(response.status_code, content)
raise SyncanoRequestError(response.status_code, content)
# Other errors
if not is_success(response.status_code):
self.logger.debug('Request Error: %s', url)
self.logger.debug('Status code: %d', response.status_code)
self.logger.debug('Response: %s', content)
raise SyncanoRequestError(response.status_code, content)
return content
[docs] def is_authenticated(self):
"""Checks if current session is authenticated.
:rtype: boolean
:return: Session authentication state
"""
if self.is_user:
return self.user_key is not None
return self.api_key is not None
[docs] def authenticate(self, **kwargs):
"""
:type email: string
:param email: Your Syncano account email address
:type password: string
:param password: Your Syncano password
:type api_key: string
:param api_key: Your Syncano api_key for instance
:rtype: string
:return: Your ``Account Key``
"""
is_auth = self.is_authenticated()
if is_auth:
msg = 'Connection already authenticated: {}'
else:
msg = 'Authentication successful: {}'
self.logger.debug('Authenticating')
self.auth_method(**kwargs)
key = self.auth_key
self.logger.debug(msg.format(key))
return key
[docs] def validate_params(self, kwargs, params):
for k in params:
kwargs[k] = kwargs.get(k, getattr(self, k))
if kwargs[k] is None:
raise SyncanoValueError('"{}" is required.'.format(k))
return kwargs
[docs] def authenticate_admin(self, **kwargs):
if self.is_alt_login:
request_args = self.validate_params(kwargs,
self.ALT_LOGIN_PARAMS)
else:
if self.is_social:
request_args = self.validate_params(kwargs,
self.SOCIAL_LOGIN_PARAMS)
request_args['access_token'] = request_args.pop('token') # core expects a access_token field;
else:
request_args = self.validate_params(kwargs,
self.LOGIN_PARAMS)
response = self.make_request('POST', self.AUTH_SUFFIX, data=request_args)
self.api_key = response.get('account_key')
return self.api_key
[docs] def authenticate_user(self, **kwargs):
if self.is_alt_login:
request_args = self.validate_params(kwargs,
self.USER_ALT_LOGIN_PARAMS)
else:
request_args = self.validate_params(kwargs,
self.USER_LOGIN_PARAMS)
headers = {
'content-type': self.CONTENT_TYPE,
'X-API-KEY': request_args.pop('api_key')
}
response = self.make_request('POST', self.AUTH_SUFFIX, data=request_args, headers=headers)
self.user_key = response.get('user_key')
return self.user_key
[docs] def get_account_info(self, api_key=None):
self.api_key = api_key or self.api_key
if not self.api_key:
raise SyncanoValueError('api_key is required.')
return self.make_request('GET', self.ACCOUNT_SUFFIX, headers={'X-API-KEY': self.api_key})
[docs] def get_user_info(self, api_key=None, user_key=None):
self.api_key = api_key or self.api_key
self.user_key = user_key or self.user_key
for attribute_name in ('api_key', 'user_key', 'instance_name'):
if not getattr(self, attribute_name, None):
raise SyncanoValueError('{attribute_name} is required.'.format(attribute_name=attribute_name))
return self.make_request('GET', self.USER_INFO_SUFFIX.format(name=self.instance_name), headers={
'X-API-KEY': self.api_key, 'X-USER-KEY': self.user_key})
@classmethod
def _check_batch_files(cls, data):
if 'requests' in data: # batch requests
for request in data['requests']:
per_request_files = request.get('body', {}).get('files', {})
if per_request_files:
raise SyncanoValueError('Batch do not support files upload.')
def _process_apns_cert_files(self, files):
files = files.copy()
for key in [file_name for file_name in files.keys()]:
# remove certificates files (which are bool - True if cert exist, False otherwise)
value = files[key]
if isinstance(value, bool):
files.pop(key)
continue
if key in ['production_certificate', 'development_certificate']:
value = (value.name, value, 'application/x-pkcs12', {'Expires': '0'})
files[key] = value
return files
[docs] def register(self, email, password, first_name=None, last_name=None, invitation_key=None):
register_data = {
'email': email,
'password': password,
}
for name, value in zip(['first_name', 'last_name', 'invitation_key'],
[first_name, last_name, invitation_key]):
if value:
register_data.update({name: value})
response = self.make_request('POST', self.REGISTER_SUFFIX, data=register_data)
self.api_key = response['account_key']
return self.api_key
[docs]class ConnectionMixin(object):
"""Injects connection attribute with support of basic validation."""
def __init__(self, *args, **kwargs):
self._connection = None
super(ConnectionMixin, self).__init__(*args, **kwargs)
@property
def connection(self):
# Sometimes someone will not use super
from syncano.models.registry import registry # TODO: refactor this;
return getattr(self, '_connection', None) or registry.connection()
@connection.setter
def connection(self, value):
if not isinstance(value, Connection):
raise SyncanoValueError('"connection" needs to be a Syncano Connection instance.')
self._connection = value
@connection.deleter
[docs] def connection(self):
self._connection = None