Source code for syncano.models.channels

from threading import Thread

import six
from requests import Timeout
from syncano import logger

from . import fields
from .base import Model
from .instances import Instance


[docs]class PollThread(Thread): def __init__(self, connection, endpoint, callback, error=None, *args, **kwargs): self.connection = connection self.endpoint = endpoint self.callback = callback self.error = error self.abort = False self.timeout = kwargs.pop('timeout', None) or 60 * 5 self.last_id = kwargs.pop('last_id', None) self.room = kwargs.pop('room', None) super(PollThread, self).__init__(*args, **kwargs) logger.debug('%s created.', self) def __str__(self): return '<PollThread: %s>' % self.getName() def __unicode__(self): return six.u(str(self))
[docs] def request(self): kwargs = { 'timeout': self.timeout, 'params': {'last_id': self.last_id, 'room': self.room} } return self.connection.request('GET', self.endpoint, **kwargs)
[docs] def run(self): while self.abort is False: try: response = self.request() except Timeout as e: logger.debug('%s Timeout.', self) if not self.callback(None): self.stop() except Exception as e: logger.error('%s Error "%s"', self, e) if self.error: self.error(e) return else: logger.debug('%s Message "%s"', self, response['id']) self.last_id = response['id'] if not self.callback(Message(**response)): self.stop()
[docs] def stop(self): self.abort = True self.callback = None self.error = None
[docs]class Channel(Model): """ .. _long polling: http://en.wikipedia.org/wiki/Push_technology#Long_polling OO wrapper around channels `link http://docs.syncano.io/docs/realtime-communication`_. :ivar name: :class:`~syncano.models.fields.StringField` :ivar type: :class:`~syncano.models.fields.ChoiceField` :ivar group: :class:`~syncano.models.fields.IntegerField` :ivar group_permissions: :class:`~syncano.models.fields.ChoiceField` :ivar other_permissions: :class:`~syncano.models.fields.ChoiceField` :ivar custom_publish: :class:`~syncano.models.fields.BooleanField` .. note:: **Channel** has two special methods called ``publish`` and ``poll``. First one will send message to the channel:: >>> channel = Channel.please.get('instance-name', 'channel-name') >>> channel.publish({"x": 1}) second one will create `long polling`_ connection which will listen for messages:: >>> def callback(message=None): ... print message ... return True >>> channel = Channel.please.get('instance-name', 'channel-name') >>> channel.poll(callback=callback) """ TYPE_CHOICES = ( {'display_name': 'Default', 'value': 'default'}, {'display_name': 'Separate rooms', 'value': 'separate_rooms'}, ) PERMISSIONS_CHOICES = ( {'display_name': 'None', 'value': 'none'}, {'display_name': 'Subscribe', 'value': 'subscribe'}, {'display_name': 'Publish', 'value': 'publish'}, ) name = fields.StringField(max_length=64, primary_key=True) type = fields.ChoiceField(choices=TYPE_CHOICES, required=False, default='default') group = fields.IntegerField(label='group id', required=False) group_permissions = fields.ChoiceField(choices=PERMISSIONS_CHOICES, default='none') other_permissions = fields.ChoiceField(choices=PERMISSIONS_CHOICES, default='none') custom_publish = fields.BooleanField(default=False, required=False) links = fields.LinksField() class Meta: parent = Instance endpoints = { 'detail': { 'methods': ['put', 'get', 'patch', 'delete'], 'path': '/channels/{name}/', }, 'list': { 'methods': ['post', 'get'], 'path': '/channels/', }, 'poll': { 'methods': ['get'], 'path': '/channels/{name}/poll/', }, 'publish': { 'methods': ['post'], 'path': '/channels/{name}/publish/', }, 'history': { 'methods': ['get'], 'path': '/channels/{name}/history/', }, }
[docs] def poll(self, room=None, last_id=None, callback=None, error=None, timeout=None): properties = self.get_endpoint_data() endpoint = self._meta.resolve_endpoint('poll', properties, http_method='GET') connection = self._get_connection() thread = PollThread(connection, endpoint, callback, error, timeout=timeout, last_id=last_id, room=room, name='poll_%s' % self.name) thread.start() return thread.stop
[docs] def publish(self, payload, room=None): properties = self.get_endpoint_data() http_method = 'POST' endpoint = self._meta.resolve_endpoint('publish', properties, http_method) connection = self._get_connection() request = {'data': Message(payload=payload, room=room).to_native()} response = connection.request(http_method, endpoint, **request) return Message(**response)
[docs]class Message(Model): """ OO wrapper around channel hisotry `link http://docs.syncano.io/docs/realtime-communication`_. :ivar room: :class:`~syncano.models.fields.StringField` :ivar action: :class:`~syncano.models.fields.ChoiceField` :ivar author: :class:`~syncano.models.fields.JSONField` :ivar metadata: :class:`~syncano.models.fields.JSONField` :ivar payload: :class:`~syncano.models.fields.JSONField` :ivar created_at: :class:`~syncano.models.fields.DateTimeField` """ ACTION_CHOICES = ( {'display_name': 'custom', 'value': 0}, {'display_name': 'create', 'value': 1}, {'display_name': 'update', 'value': 2}, {'display_name': 'delete', 'value': 3}, ) room = fields.StringField(max_length=50, required=False) action = fields.ChoiceField(choices=ACTION_CHOICES, read_only=True) author = fields.JSONField(required=False, read_only=True) metadata = fields.JSONField(required=False, read_only=True) payload = fields.JSONField() created_at = fields.DateTimeField(required=False, read_only=True) class Meta: parent = Channel endpoints = { 'detail': { 'methods': ['get'], 'path': '/history/{pk}/', }, 'list': { 'methods': ['get', 'post'], 'path': '/history/', }, }