Source code for haka_mqtt.dns_async

import errno
import fcntl
import os
import socket
import threading
from Queue import Queue, Empty
from time import sleep


class _Future(object):
    def __init__(self, call, *args, **kwargs):
        assert callable(call)
        self.__cancelled = False
        self.__done = False
        self.__result = None
        self.__exception = None
        self.__notified = False
        self.__callbacks = []
        self.__callable = call
        self.__args = args
        self.__kwargs = kwargs

    def _work(self):
        try:
            self.__result = self.__callable(*self.__args, **self.__kwargs)
        except Exception as e:
            self.__exception = e
        self.__done = True

    def _notify(self):
        if not self.__notified:
            self.__notified = True
            for cb in self.__callbacks:
                cb(self)

    def cancel(self):
        if self.done():
            rv = False
        else:
            self.__cancelled = True
            self.__done = True
            self._notify()
            rv = True

        return rv

    def cancelled(self):
        return self.__cancelled

    def done(self):
        return self.__done

    def result(self, timeout=None):
        if timeout != 0:
            raise NotImplementedError('Blocking, waiting for results is not implemented.  timeout must be zero.')

        return self.__result

    def exception(self, timeout=None):
        if timeout != 0:
            raise NotImplementedError('Blocking, waiting for results is not implemented.  timeout must be zero.')

        return self.__exception

    def add_done_callback(self, fn):
        if self.done():
            fn(self)
        else:
            self.__callbacks.append(fn)


_Poison = object()


def _worker_task(work_queue, wd, done_queue):
    """Listens for tasks on work_queue, completes them, then places the
    completed tasks on the done_queue.

    Parameters
    ----------
    work_queue: Queue
    wd: fileno
    done_queue: Queue of _Future
    """

    while True:
        future = work_queue.get()
        if future is _Poison:
            break
        else:
            future._work()
            done_queue.put(future)

            while True:
                num_bytes_written = os.write(wd, b'x')
                if num_bytes_written == 1:
                    break
                elif num_bytes_written == 0:
                    sleep(0.01)
                else:
                    raise NotImplementedError(num_bytes_written)


[docs]class AsyncFutureDnsResolver(object): """An executor that spawns a small thread pool for performing DNS lookups. DNS lookup task sare submitted by using calling this object as a function and those tasks will be completed asynchronously by threads in a thread pool. The completed tasks are posted back to an internal queue and the :meth:`poll` method gets the completed tasks from the queue and notifies their subscribers of completion. Tasks with done callback methods will be called by poll on the same threat that poll is called on. >>> resolver = AsyncFutureDnsResolver() >>> >>> lookup_result = None >>> >>> def lookup_finished(future): ... global lookup_result ... lookup_result = future.result() ... >>> future = resolver('localhost', 80) >>> future.add_done_callback(lookup_finished) >>> >>> while not future.done(): ... # Calling poll services callbacks and sets future to done. ... resolver.poll() ... sleep(0.1) ... >>> assert future.done() >>> >>> # Only timeout=0 presently supported. >>> assert not future.exception(timeout=0) >>> assert not future.cancelled() >>> # future.result(0) contains outcome of asynchronous call >>> # to socket.getaddrinfo. Note that at this time only timeout=0 >>> # is supported by this limited api. .. aafig:: | Worker Threads | +------------+ | +-->| Worker 0 |-->+ | | +------------+ | | | | | | +------------+ | | +-->| Worker 1 |-->+ | | +------------+ | | | | | | +------------+ | resolver('google.com', 80) |--> task queue----+-->| Worker ... |-->+ | | +------------+ | | | | AsyncFutureDnsResolver | | +------------+ | Thread | +-->| Worker n |-->+ | +------------+ | | | | | resolver.poll() |<-- completion queue<------------------+ | """ def __init__(self, thread_pool_size=1): self.__closed = False self.__work_queue = Queue() self.__done_queue = Queue() self.__threads = [] self.__rd, self.__wd = os.pipe() flags = fcntl.fcntl(self.__rd, fcntl.F_GETFL) fcntl.fcntl(self.__rd, fcntl.F_SETFL, flags | os.O_NONBLOCK) flags = fcntl.fcntl(self.__wd, fcntl.F_GETFL) fcntl.fcntl(self.__wd, fcntl.F_SETFL, flags | os.O_NONBLOCK) for i in range(0, thread_pool_size): t = threading.Thread(target=_worker_task, args=(self.__work_queue, self.__wd, self.__done_queue)) t.daemon = True self.__threads.append(t) t.start() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False
[docs] def closed(self): """bool: True if the object has been closed; False otherwise.""" return self.__closed
[docs] def close(self): """Closes resolver by completing all tasks in queue and joining with worker threads. New dns resolutions cannot be scheduled after this method begins executing (calling the resolver will result in an assertion failure).""" if not self.closed(): self.__closed = True for i in xrange(0, len(self.__threads)): self.__work_queue.put(_Poison) for thread in self.__threads: thread.join() self.poll() os.close(self.__wd) os.close(self.__rd)
def __call__(self, host, port, family=0, socktype=0, proto=0, flags=0): """Queues an asynchronous DNS resolution task. Parameters ---------- host: str or None A host `str` must contain either a domain name for lookup or a string representation of an IPv4/v6 address. port: str or int or None A string service name such as 'http', a numeric port number, or None. family: int One of the socket.AF_* constants. socktype: int One of the socket.SOCK_* constants. proto: int socket.IPPROTO_TCP flags: int One of several of the AI_* constants; default is zero. Returns ------- Future If the DNS lookup succeeds then the `future.result()` will immediately return a 5-tuple with a structure like (family, socktype, proto, canonname, sockaddr). On failure then `future.exception()` will immediately return a `socket.gaierror`. In these tuples, family, socktype, proto are all integers and are meant to be passed to the socket() function. canonname will be a string representing the canonical name of the host if AI_CANONNAME is part of the flags argument; else canonname will be empty. sockaddr is a tuple describing a socket address, whose format depends on the returned family (a (address, port) 2-tuple for AF_INET, a (address, port, flow info, scope id) 4-tuple for AF_INET6), and is meant to be passed to the socket.connect() method. """ assert not self.__closed, 'Async dns lookup after resolver closed.' getaddrinfo_params = (host, port, family, socktype, proto, flags) future = _Future(socket.getaddrinfo, *getaddrinfo_params) self.__work_queue.put(future) return future
[docs] def read_fd(self): """int: fileno""" return self.__rd
[docs] def poll(self): """Calls done callbacks of any newly completed futures.""" try: rc = os.read(self.__rd, 1) except OSError as e: if e.errno is errno.EAGAIN: # No data available in pipe. pass else: # Unknown error; crash! raise else: if rc: # A byte was read. try: future = self.__done_queue.get_nowait() except Empty: pass else: future._notify()