Source code for devourer.async_api

"""
.. module:: async_api
    :platform: Unix, Windows
    :synopsis: This module extends basic api with gevent-based async capabilities.

"""
from functools import partial

from concurrent.futures import ThreadPoolExecutor
from six import with_metaclass

from .api import GenericAPIBase
from .api import GenericAPICreator


# Default time (seconds) to wait for thread before timing out.
DEFAULT_ASYNC_TIMEOUT = 5.0

# Default executor class.
DEFAULT_EXECUTOR = ThreadPoolExecutor  # pylint: disable=invalid-name

# Default number of executors for an API instance.
DEFAULT_EXECUTORS = 2


class AsyncAPIBase(GenericAPIBase):
    """This is the async API representation class without declarative syntax.

    Requires GenericAPICreator metaclass to work.

    :type _methods: dict
    """
    _methods = None

    def __init__(self, *args, **kwargs):
        """
        Add async settings and invoke base initializer.
        :param args:
        :param executors: number of concurrent executor workers.
        :param executor_class: executor class.
        :param executor: executor instance. Takes priority over executor_class.
        :param kwargs:
        """
        executor = kwargs.pop('executor', None)
        executors = kwargs.pop('executors', DEFAULT_EXECUTORS)
        executor_class = kwargs.pop('executor_class', DEFAULT_EXECUTOR)
        if executor:
            self._executor = executor
        else:
            self._executor = executor_class(max_workers=executors)
        super(AsyncAPIBase, self).__init__(*args, **kwargs)

    def call(self, name, *args, **kwargs):
        """
        This function invokes the API method from the class declaration
        according to the name parameter along with all the hooks.

        :param name: name of method to call.
        :param args: non-keyword arguments of API method call.
        :param kwargs: keyword arguments of API method call.
        :returns: Result of finalize_method call, by default content of API's response.
        """
        prepared = getattr(self, 'prepare_{}'.format(name))(name, *args, **kwargs)
        method_partial = partial(prepared.call, self, *prepared.args, **prepared.kwargs)
        callback_partial = partial(
            getattr(self, 'finalize_{}'.format(name)),
            name,
            *prepared.args,
            **prepared.kwargs
        )
        future = self._executor.submit(lambda c, m: c(m()), callback_partial, method_partial)
        return future


[docs]class AsyncAPI(with_metaclass(GenericAPICreator, AsyncAPIBase)): """This is the async API representation class. You can build a concrete API by declaring methods while creating the class, ie.: >>> class MyAPI(AsyncAPI): >>> method1 = APIMethod('get', 'people/') >>> method2 = APIMethod('post', 'my/news/items/') Hooks can be overridden globally: >>> def prepare(self, name, **args, **kwargs): >>> return PrepareCallArgs(call=self._methods[name], >>> args=args, >>> kwargs=kwargs) As well as for particular methods only: >>> def prepare_method1(self, name, *args, **kwargs): >>> return PrepareCallArgs(call=self._methods[name], >>> args=args, >>> kwargs=kwargs) >>> def call_method1(self, name, *args, **kwargs): >>> prepared = getattr(self, 'prepare_{}'.format(name)) >>> prepared = prepared(name, *args, **kwargs) >>> callback = getattr(self, 'finalize_{}'.format(name)) >>> return callback(name, >>> prepared.call(*prepared.args, >>> **prepared.kwargs), >>> *prepared.args, >>> **prepared.kwargs) >>> def finalize_method2(self, name, future, *args, **kwargs): >>> result = future.result() >>> if self.throw_on_error and result.status_code >= 400: >>> error_msg = "Error when invoking {} with parameters {} {}: {}" >>> params = (name, args, kwargs, result.__dict__) >>> raise APIError(error_msg.format(*params)) >>> if self.load_json: >>> return json.loads(result.content) >>> return result.content """