Source code for haka_mqtt.frontends.poll

import os
import socket
import ssl
from datetime import datetime
from select import select
from time import sleep

from haka_mqtt.clock import SystemClock
from haka_mqtt.dns_async import AsyncFutureDnsResolver
from haka_mqtt.dns_sync import SynchronousFutureDnsResolver
from haka_mqtt.reactor import ReactorProperties, Reactor, ACTIVE_STATES
from haka_mqtt.scheduler import ClockScheduler
from haka_mqtt.socket_factory import SslSocketFactory, SocketFactory, BlockingSocketFactory, BlockingSslSocketFactory


[docs]def generate_client_id(): """Generates a client id based on current time, hostname, and process-id. Returns ------- str """ return 'client-{}-{}-{}'.format( datetime.now().strftime('%Y-%m-%dT%H:%M:%S'), socket.gethostname(), os.getpid())
class _PollClientSelector(object): def __init__(self, async_dns_resolver): self.__rmap = {async_dns_resolver.read_fd(): async_dns_resolver.poll} self.__wmap = {} def add_read(self, fd, reactor): """ Parameters ---------- fd: file descriptor File-like object. reactor: haka_mqtt.reactor.Reactor """ self.__rmap[fd] = reactor.read def del_read(self, fd, reactor): """ Parameters ---------- fd: file descriptor File-like object. reactor: haka_mqtt.reactor.Reactor """ del self.__rmap[fd] def add_write(self, fd, reactor): """ Parameters ---------- fd: file descriptor File-like object. reactor: haka_mqtt.reactor.Reactor """ self.__wmap[fd] = reactor.write def del_write(self, fd, reactor): """ Parameters ---------- fd: file descriptor File-like object. reactor: haka_mqtt.reactor.Reactor """ del self.__wmap[fd] def select(self, select_timeout=None): rlist, wlist, xlist = select(self.__rmap.keys(), self.__wmap.keys(), [], select_timeout) for fd in rlist: self.__rmap[fd]() for fd in wlist: self.__wmap[fd]()
[docs]class MqttPollClientProperties(object): """ Attributes ---------- client_id: str or None The MQTT client id to pass to the MQTT server. If `None` then a client-id will be generated with A client id will be randomly generated based on :meth:`generate_client_id`. address_family: int Address family; one of the socket.AF_* constants (eg. :data:`socket.AF_UNSPEC` for any family, :data:`socket.AF_INET` for IP4 :data:`socket.AF_INET6` for IP6). By default this will be :data:`socket.AF_UNSPEC`. host: str IP address or host name. port: int Integer such that 0 <= port <= 2**16-1. keepalive_period: str 0 <= keepalive_period <= 2*16-1; zero disables keepalive. Sends a ``MqttPingreq`` packet to the server after this many seconds without sending and data over the socket. The server will disconnect the client as if there has been a network error after 1.5x``self.keepalive_period`` seconds without receiving any bytes [MQTT-3.1.2-24]. recv_idle_ping_period: int 0 < recv_idle_ping_period; sends a ``MqttPingreq`` packet to the server after this many seconds without receiving and bytes on the socket. recv_idle_abort_period: int 0 < recv_idle_abort_period; aborts connection after this time without receiving any bytes from remote (typically set to 1.5x ``self.recv_idle_ping_period``). ssl: bool or SSLContext When `True` connects to server using a default SSL socket context created with :func:`ssl.create_default_context`. If ``ssl`` has a callable ``wrap_socket`` method then it is assumed that ``ssl`` is a SSLContext to be used for securing sockets. """ def __init__(self): self.host = None self.port = None self.client_id = None self.keepalive_period = 0 self.recv_idle_ping_period = 60 self.recv_idle_abort_period = 2 * self.recv_idle_ping_period self.ssl = True self.address_family = socket.AF_UNSPEC
[docs]class MqttPollClient(Reactor): """ Parameters ---------- properties: MqttPollClientProperties """ def __init__(self, properties, log='haka'): self._clock = SystemClock() self._scheduler = ClockScheduler(self._clock) self._async_name_resolver = AsyncFutureDnsResolver() self._selector = _PollClientSelector(self._async_name_resolver) endpoint = (properties.host, properties.port) p = ReactorProperties() if hasattr(properties.ssl, 'wrap_socket') and callable(properties.ssl.wrap_socket): p.socket_factory = SslSocketFactory(properties.ssl) elif properties.ssl: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) p.socket_factory = SslSocketFactory(ssl_context) else: p.socket_factory = SocketFactory() p.endpoint = endpoint p.keepalive_period = properties.keepalive_period p.recv_idle_ping_period = properties.recv_idle_ping_period p.recv_idle_abort_period = properties.recv_idle_abort_period if properties.client_id is None: p.client_id = generate_client_id() else: p.client_id = properties.client_id p.scheduler = self._scheduler p.name_resolver = self._async_name_resolver p.selector = self._selector p.address_family = properties.address_family Reactor.__init__(self, p, log=log)
[docs] def poll(self, period=0.): poll_end_time = self._clock.time() + period while True: select_timeout = self._scheduler.remaining() if select_timeout is None or self._clock.time() + select_timeout > poll_end_time: select_timeout = poll_end_time - self._clock.time() if select_timeout < 0.: select_timeout = 0 self._selector.select(select_timeout) self._scheduler.poll() if self._clock.time() > poll_end_time or self.state not in ACTIVE_STATES: break
[docs]class BlockingMqttClient(Reactor): """A client that employs socket.settimeout to use blocking operations on sockets. Although in general the socket timeout is respected by the operating system, this method uses a synchronous DNS lookup and this lookup does not have any timeout. Parameters ---------- properties: MqttPollClientProperties """ def __init__(self, properties, log='haka'): self._clock = SystemClock() self._scheduler = ClockScheduler(self._clock) endpoint = (properties.host, properties.port) p = ReactorProperties() if hasattr(properties.ssl, 'wrap_socket') and callable(properties.ssl.wrap_socket): p.socket_factory = BlockingSslSocketFactory(properties.ssl) elif properties.ssl: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) p.socket_factory = BlockingSslSocketFactory(ssl_context) else: p.socket_factory = BlockingSocketFactory() p.endpoint = endpoint p.keepalive_period = properties.keepalive_period if properties.client_id is None: p.client_id = generate_client_id() else: p.client_id = properties.client_id p.scheduler = self._scheduler p.name_resolver = SynchronousFutureDnsResolver() p.address_family = properties.address_family Reactor.__init__(self, p, log=log)
[docs] def poll(self, period=0.): poll_end_time = self._clock.time() + period while True: select_timeout = self._scheduler.remaining() if select_timeout is None or self._clock.time() + select_timeout > poll_end_time: select_timeout = poll_end_time - self._clock.time() if select_timeout <= 0.: select_timeout = 0.001 if self.socket is not None: self.socket.settimeout(select_timeout) if self.want_write(): self.write() elif self.want_read(): self.read() else: sleep(select_timeout) self._scheduler.poll() if self._clock.time() > poll_end_time or self.state not in ACTIVE_STATES: break