Source code for haka_mqtt.reactor

"""The reactor module provides an MQTT reactor class suitable for use
with :mod:`select` and select-like interfaces like epoll.  An adapter is
available to make it conveniently usable in a poll environment
(:mod:`haka_mqtt.frontends.poll`).
"""

import errno
import socket
import logging
import ssl
from collections import OrderedDict
from io import BytesIO
import os

from enum import (
    IntEnum,
    unique,
)

from haka_mqtt.null_log import NullLogger
from haka_mqtt.packet_ids import PacketIdGenerator
from haka_mqtt.selector import Selector
from mqtt_codec.io import (
    UnderflowDecodeError,
    DecodeError,
    BytesReader,
)
from mqtt_codec.packet import (
    MqttControlPacketType,
    MqttFixedHeader,
    MqttConnect,
    ConnackResult,
    MqttConnack,
    MqttSuback,
    MqttPublish,
    MqttPuback,
    MqttPubrec,
    MqttPubrel,
    MqttPubcomp,
    MqttPingreq,
    MqttPingresp,
    MqttDisconnect,
    MqttWill, MqttUnsuback)
from haka_mqtt.mqtt_request import (
    MqttSubscribeTicket,
    MqttUnsubscribeTicket,
    MqttPublishTicket,
    MqttPublishStatus,
    MqttSubscribeStatus,
)
from haka_mqtt.on_str import HexOnStr, ReprOnStr


[docs]class ReactorProperties(object): """ Attributes ---------- socket_factory: haka_mqtt.socket_factory.SocketFactory name_resolver: callable DNS resolver. scheduler: TODO selector: Selector client_id: str endpoint: tuple 2-tuple of (host: `str`, port: `int`). The `port` value is constrainted such that 0 <= `port` <= 2**16-1. keepalive_period: int 0 <= keepalive_period <= 2*16-1; zero disables keepalive. Sends a :class:`mqtt_codec.packet.MqttPingreq` packet to the server after this many seconds without sending and data over the socket. The server will disconnect the client as if there has been a network error after 1.5x``self.keepalive_period`` seconds without receiving any bytes [MQTT-3.1.2-24]. recv_idle_ping_period: int 0 < recv_idle_ping_period; sends a :class:`mqtt_codec.packet.MqttPingreq` packet to the server after this many seconds without receiving and bytes on the socket. recv_idle_abort_period: int 0 < recv_idle_abort_period; aborts connection after this time without receiving any bytes from remote (typically set to 1.5x ``self.recv_idle_ping_period``). clean_session: bool With clean session set to True reactor will clear all message buffers on disconnect without regard to QoS; otherwise unacknowledged messages will be retransmitted after a re-connect. address_family: int Address family; one of the socket.AF_* constants (eg. :data:`socket.AF_UNSPEC` for any family, :data:`socket.AF_INET` for IP4 :data:`socket.AF_INET6` for IP6). Set to :data:`socket.AF_UNSPEC` by default. username: str optional password: str optional """ def __init__(self): # Dependencies self.socket_factory = None self.selector = Selector() self.name_resolver = None self.scheduler = None # Parameters self.endpoint = None self.client_id = None self.keepalive_period = 10*60 self.recv_idle_ping_period = 10 * 60 self.recv_idle_abort_period = 15 * 60 self.clean_session = True self.username = None self.password = None self.address_family = socket.AF_UNSPEC
[docs]@unique class MqttState(IntEnum): """ Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met. Active States: * :py:const:`MqttState.connack` * :py:const:`MqttState.connected` * :py:const:`MqttState.mute` Inactive States: * :py:const:`MqttState.stopped` """ connack = 0 connected = 1 mute = 2 stopped = 3
ACTIVE_MQTT_STATES = (MqttState.connack, MqttState.connected, MqttState.mute) INACTIVE_MQTT_STATES = (MqttState.stopped,) assert set(ACTIVE_MQTT_STATES).union(INACTIVE_MQTT_STATES) == set(iter(MqttState))
[docs]@unique class SocketState(IntEnum): """ Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met. Active States: * :py:const:`SocketState.name_resolution` * :py:const:`SocketState.connecting` * :py:const:`SocketState.handshake` * :py:const:`SocketState.connected` * :py:const:`SocketState.deaf` * :py:const:`SocketState.mute` Inactive States: * :py:const:`SocketState.stopped` """ name_resolution = 1 connecting = 2 handshake = 3 connected = 5 deaf = 6 mute = 7 stopped = 8
ACTIVE_SOCKET_STATES = ( SocketState.name_resolution, SocketState.connecting, SocketState.handshake, SocketState.connected, SocketState.mute, SocketState.deaf, ) INACTIVE_SOCK_STATES = (SocketState.stopped,) assert set(ACTIVE_SOCKET_STATES).union(INACTIVE_SOCK_STATES) == set(iter(SocketState))
[docs]@unique class ReactorState(IntEnum): """ Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met. Active States: * :py:const:`ReactorState.init` * :py:const:`ReactorState.stopped` * :py:const:`ReactorState.error` Inactive States: * :py:const:`ReactorState.connecting` * :py:const:`ReactorState.handshake` * :py:const:`ReactorState.connack` * :py:const:`ReactorState.connected` """ init = 0 starting = 1 started = 2 stopping = 3 stopped = 4 error = 5
# States where there are no active deadlines, the socket is closed and there # is no active I/O. # INACTIVE_STATES = (ReactorState.init, ReactorState.stopped, ReactorState.error) # States with active deadlines, open sockets, or pending I/O. # ACTIVE_STATES = ( ReactorState.starting, ReactorState.started, ReactorState.stopping, ) assert set(INACTIVE_STATES).union(ACTIVE_STATES) == set(iter(ReactorState))
[docs]class ReactorError(object): def __repr__(self): return '{}()'.format(self.__class__.__name__)
[docs]class MutePeerReactorError(ReactorError): """Error that occurs when the server closes its write stream unexpectedly.""" pass
[docs]class ConnectReactorError(ReactorError): """Error that occurs when the server sends a connack fail in response to an initial connect packet. Parameters ---------- result: ConnackResult Asserted not to be `ConnackResult.accepted`. """ def __init__(self, result): assert result != ConnackResult.accepted self.__result = result @property def result(self): """ConnackResult: guaranteed that value is not `ConnackResult.accepted`.""" return self.__result def __eq__(self, other): return hasattr(other, 'result') and self.result == other.result def __repr__(self): return '{}({})'.format(self.__class__.__name__, repr(self.result))
[docs]class RecvTimeoutReactorError(ReactorError): """Server fails to respond in a timely fashion.""" def __eq__(self, other): return isinstance(other, RecvTimeoutReactorError)
[docs]class SocketReactorError(ReactorError): """A socket.error exception was raised by the socket subsystem and it the error code was `self.errno`. If this errno is in the `errno.errorcode` lookup table then repr will show the description. Parameters ---------- errno_val: int """ def __init__(self, errno_val): self.__errno = errno_val @property def errno(self): """int: value in `errno.errorcode`.""" return self.__errno def __repr__(self): if self.errno in errno.errorcode: rv = 'SocketReactorError(<{}: {}>)'.format(errno.errorcode[self.errno], self.errno) else: rv = 'SocketReactorError({})'.format(self.errno) return rv def __eq__(self, other): return hasattr(other, 'errno') and self.errno == other.errno
[docs]class SslReactorError(ReactorError): """A socket error-code in `errno.errorcode`. Parameters ---------- ssl_error: ssl.SSLError """ def __init__(self, ssl_error): assert ssl_error is not None self.__error = ssl_error @property def error(self): """ssl.SSLError: error value.""" return self.__error def __repr__(self): return 'SslReactorError({})'.format(self.error) def __eq__(self, other): return hasattr(other, 'error') and self.error == other.error
[docs]class AddressReactorError(ReactorError): """Failed to lookup a valid address. Parameters ---------- gaierror: socket.gaierror """ def __init__(self, gaierror): assert isinstance(gaierror, socket.gaierror) self.__gaierror = gaierror @property def gaierror(self): """socket.gaierror: Addressing error.""" return self.__gaierror def __repr__(self): return '{}({})'.format(self.__class__.__name__, repr(self.gaierror)) def __eq__(self, other): return ( hasattr(other, 'gaierror') and self.gaierror.errno == other.gaierror.errno and hasattr(other, 'gaierror') and self.gaierror.strerror == other.gaierror.strerror )
[docs]class DecodeReactorError(ReactorError): """Server wrote a sequence of bytes that could not be interpreted as an MQTT packet.""" def __init__(self, description): self.description = description def __repr__(self): return '{}({})'.format(self.__class__.__name__, self.description)
[docs]class ProtocolReactorError(ReactorError): """Server send an inappropriate MQTT packet to the client.""" def __init__(self, description): self.description = description def __repr__(self): return '{}({})'.format(self.__class__.__name__, self.description)
class _AssertSelectAdapter(object): def __init__(self, reactor, selector): self.__sock = None self.__reactor = reactor self.__selector = selector self.__want_read = False self.__want_write = False def assert_closed(self): assert self.__want_read is False assert self.__want_write is False def update(self, want_read, want_write, f): if self.__sock is not f: # Not permitted to switch to another file while signed up # for an existing read notification. self.assert_closed() self.__sock = f if self.__want_write != want_write: # There has been a change in want write. self.__want_write = want_write if want_write: self.__selector.add_write(f, self.__reactor) else: self.__selector.del_write(f, self.__reactor) if self.__want_read != want_read: # There has been a change in want read. self.__want_read = want_read if want_read: self.__selector.add_read(f, self.__reactor) else: self.__selector.del_read(f, self.__reactor)
[docs]class Reactor(object): """ Parameters ---------- properties: ReactorProperties log: str or logging.Logger or None If `str` then the result of logging.getLogger(log) is used as a logger; otherwise assumes that is a `logging.Logger`-like object and asserts that it has `debug`, `info`, `warning`, `error`, and `critical` methods. If `log` is `None` then logging is disabled. """ def __init__(self, properties, log='haka'): assert properties.client_id is not None assert properties.socket_factory is not None assert properties.endpoint is not None assert properties.scheduler is not None assert 0 <= properties.keepalive_period <= 2**16-1 assert isinstance(properties.keepalive_period, int) assert 0 < properties.recv_idle_abort_period assert isinstance(properties.recv_idle_abort_period, int) assert 0 <= properties.recv_idle_ping_period assert isinstance(properties.recv_idle_ping_period, int) assert isinstance(properties.clean_session, bool) assert callable(properties.name_resolver) host, port = properties.endpoint assert isinstance(host, str) assert 0 <= port <= 2**16-1 assert isinstance(port, int) assert properties.selector is not None assert isinstance(properties.address_family, int) if log is None: self.__log = NullLogger() elif isinstance(log, (str, unicode)): self.__log = logging.getLogger(log) else: assert hasattr(log, 'debug') assert hasattr(log, 'info') assert hasattr(log, 'warning') assert hasattr(log, 'error') assert hasattr(log, 'critical') self.__log = log self.__wbuf = bytearray() self.__rbuf = bytearray() self.__address_family = properties.address_family self.__ssl_want_read = False self.__ssl_want_write = False self.__client_id = properties.client_id self.__username = properties.username self.__password = properties.password self.__keepalive_period = properties.keepalive_period self.__keepalive_due_deadline = None self.__recv_idle_abort_period = properties.recv_idle_abort_period self.__recv_idle_abort_deadline = None self.__recv_idle_ping_period = properties.recv_idle_ping_period self.__recv_idle_ping_deadline = None self.__clean_session = properties.clean_session self.__name_resolver = properties.name_resolver self.__socket_factory = properties.socket_factory self.socket = None self.__host, self.__port = properties.endpoint self.__getaddrinfo_params = ( self.__host, self.__port, self.__address_family, socket.SOCK_STREAM, socket.IPPROTO_TCP, 0 ) self.__state = ReactorState.init self.__mqtt_state = MqttState.stopped self.__sock_state = SocketState.stopped self.__error = None self.__send_packet_ids = set() self.__send_path_packet_ids = PacketIdGenerator() self.__preflight_queue = [] self.__inflight_queue = OrderedDict() # Publish packets must be ack'd in order of publishing # [MQTT-4.6.0-2], [MQTT-4.6.0-3] #self.__in_flight_publish = [] # No specific requirement exists for subscribe suback ordering. # self.__in_flight_subscribe = {} # It MUST send PUBREL packets in the order in which the corresponding PUBREC packets were # received (QoS 2 messages) [MQTT-4.6.0-4] #self.__in_flight_pubrel = [] self.__scheduler = properties.scheduler self.__will = None self.__name_resolution_future = None self.__pingreq_active = False self.__pingreq_due = False # Want read self.__selector = _AssertSelectAdapter(self, properties.selector) # Connection Callbacks
[docs] def on_connect_fail(self, reactor): """ Parameters ---------- reactor: Reactor """ pass
[docs] def on_disconnect(self, reactor): """ Parameters ---------- reactor: Reactor """ pass
[docs] def on_connack(self, reactor, connack): """Called immediately upon receiving a `MqttConnack` packet from the remote. The `reactor.state` will be `ReactorState.started` or `ReactorState.stopping` if the reactor is shutting down. Parameters ---------- reactor: Reactor connack: :class:`mqtt_codec.packet.MqttConnack` """ pass
# Send path
[docs] def on_pubrec(self, reactor, pubrec): """Called immediately upon receiving a `MqttPubrec` packet from the remote. This is part of the QoS=2 message send path. Parameters ---------- reactor: Reactor pubrec: :class:`mqtt_codec.packet.MqttPubrec` """ pass
[docs] def on_pubcomp(self, reactor, pubcomp): """Called immediately upon receiving a `MqttPubcomp` packet from the remote. This is part of the QoS=2 message send path. Parameters ---------- reactor: Reactor pubcomp: :class:`mqtt_codec.packet.MqttPubcomp` """ pass
[docs] def on_puback(self, reactor, puback): """Called immediately upon receiving a `MqttPuback` packet from the remote. This method is part of the QoS=1 message send path. Parameters ---------- reactor: Reactor puback: :class:`mqtt_codec.packet.MqttPuback` """ pass
# Subscribe path
[docs] def on_suback(self, reactor, suback): """Called immediately upon receiving a `MqttSuback` packet from the remote. Parameters ---------- reactor: Reactor suback: :class:`mqtt_codec.packet.MqttSuback` """ pass
[docs] def on_unsuback(self, reactor, unsuback): """Called immediately upon receiving a `MqttUnsuback` packet from the remote. Parameters ---------- reactor: Reactor unsuback: :class:`mqtt_codec.packet.MqttUnsuback` """ pass
# Receive path
[docs] def on_publish(self, reactor, publish): """Called immediately upon receiving a `MqttSuback` packet from the remote. This is part of the QoS=0, 1, and 2 message receive paths. Parameters ---------- reactor: Reactor publish: :class:`mqtt_codec.packet.MqttPublish` """ pass
[docs] def on_pubrel(self, reactor, pubrel): """Called immediately upon receiving a `MqttPubrel` packet from the remote. This is part of the QoS=2 message receive path. Parameters ---------- reactor: Reactor pubrel: :class:`mqtt_codec.packet.MqttPubrel` """ pass
@property def clean_session(self): """bool: Clean session flag is true/false.""" return self.__clean_session @property def client_id(self): """str: Client id.""" return self.__client_id @property def keepalive_period(self): """int: If this period elapses without the client sending a control packet to the server then it will generate a pingreq packet and send it to the server. Will return zero if pingreq requests are not generated.""" return self.__keepalive_period @property def recv_idle_abort_period(self): """int: Connection will be closed if bytes have not been received from remote in this many seconds. Typically this is 1.5x the self.keepalive_period.""" """float: It is the responsibility of the Client to ensure that the interval between Control Packets being sent does not exceed the Keep Alive value. In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet [MQTT-3.1.2-23]. If the Keep Alive value is non-zero and the Server does not receive a Control Packet from the Client within one and a half times the Keep Alive time period, it MUST disconnect the Network Connection to the Client as if the network had failed. [MQTT-3.1.2-24]""" return self.__recv_idle_abort_period @property def recv_idle_ping_period(self): """int: 0 <= ``self.recv_idle_ping_period``; sends a :class:`mqtt_codec.packet.MqttPingreq` packet to the server after this many seconds without receiving and bytes on the socket. If zero then ping messages are not sent when receive stream is idle.""" return self.__recv_idle_ping_period @property def error(self): """ReactorError or None: When `self.state` is `ReactorState.error` returns a subclass of `ReactorError` otherwise returns `None`.""" return self.__error @property def state(self): """ReactorState: Current reactor state.""" return self.__state @property def mqtt_state(self): """MqttState: Current state of mqtt protocol handshake.""" return self.__mqtt_state @property def sock_state(self): """SocketState: Current state of the socket connection.""" return self.__sock_state @property def will(self): """:class:`mqtt_codec.packet.MqttWill` or ``None``: Last will and testament.""" return self.__will @will.setter def will(self, will): """ Parameters ---------- will: mqtt_codec.packet.MqttWill or None Last will and testament or None is no last will and testament is desired. """ if will is None or isinstance(will, MqttWill): self.__will = will else: raise TypeError() def __get_packet_type(self, packet_id, packet_type): """Performs a `packet_id` lookup in `self.__inflight_queue` and returns a packet with type `packet_type`. If the packet does not have the expected `packet_type` then returns None Parameters ---------- packet_id: int packet_type: MqttControlPacketType Returns ------- object or None Mqtt packet with given packet id and type or `None` if the packet with the given id exists but is the wrong type. """ try: maybe_packet = self.__inflight_queue[packet_id] except KeyError: packet = None else: packet = maybe_packet if maybe_packet.packet_type is packet_type else None return packet def __update_io_notification(self): if self.socket is not None: self.__selector.update(self.want_read(), self.want_write(), self.socket) def __assert_state_rules(self): if self.mqtt_state in INACTIVE_MQTT_STATES or self.sock_state in INACTIVE_SOCK_STATES or self.state in INACTIVE_STATES: assert self.mqtt_state in INACTIVE_MQTT_STATES assert self.sock_state in INACTIVE_SOCK_STATES assert self.state in INACTIVE_STATES if self.sock_state in (SocketState.name_resolution, SocketState.connecting, SocketState.handshake): assert self.state is ReactorState.starting assert self.mqtt_state is MqttState.connack if self.want_write() or self.want_read(): assert self.socket is not None if self.sock_state in (SocketState.name_resolution, SocketState.connecting, SocketState.handshake, SocketState.stopped): assert self.__pingreq_active is False if self.sock_state not in (SocketState.connected, SocketState.deaf, SocketState.mute): assert self.__pingreq_active is False if self.sock_state in (SocketState.handshake, SocketState.connected, SocketState.mute, SocketState.deaf): assert self.__recv_idle_abort_deadline is not None elif self.sock_state in (SocketState.name_resolution, SocketState.connecting, SocketState.stopped): assert self.__recv_idle_abort_deadline is None else: raise NotImplementedError(self.sock_state) if self.sock_state not in (SocketState.connected, SocketState.deaf): assert self.__keepalive_due_deadline is None if self.keepalive_period == 0: assert self.__keepalive_due_deadline is None if self.sock_state is not SocketState.connected: # if sock_state is SocketState.deaf, # how can socket receive a ping reply? # if sock_state is SocketState.mute # how can a ping be sent? # else other states: # socket isn't connected so cannot send ping. assert self.__recv_idle_ping_deadline is None if self.sock_state in INACTIVE_SOCK_STATES: self.__selector.assert_closed() if self.state is ReactorState.error: assert self.error is not None if self.state in INACTIVE_STATES: assert self.__recv_idle_abort_deadline is None assert self.__keepalive_due_deadline is None assert self.__recv_idle_ping_deadline is None
[docs] def is_active(self): """``True`` when reactor is active; ``False`` otherwise. An "active" reactor implies that there are outstanding scheduler deadlines active, possibly open sockets, or possibly outstanding DNS lookup futures. This method would return ``True`` for this case. An inactive reactor guarantees that there are no oustanding scheduler deadlines, DNS lookups, or open sockets. An inactive reactor will never change state unless a method like :meth:`start` is called to start the reactor. .. versionadded:: 0.3.5 Returns ------- bool """ if self.state in ACTIVE_STATES: rv = True elif self.state in INACTIVE_STATES: rv = False else: raise NotImplementedError(self.state) return rv
[docs] def send_packet_ids(self): """ Returns ------- set[int] A set of active send-path packet ids. """ return set(self.__send_path_packet_ids)
[docs] def in_flight_packets(self): return list(self.__inflight_queue.values())
[docs] def preflight_packets(self): return list(self.__preflight_queue)
[docs] def subscribe(self, topics): """Places a ``subscribe`` packet on the preflight queue. Messages in the preflight queue will be placed in-flight as soon as the socket allows. Multiple messages may be placed in-flight at the same time. If the reactor encounters an error or stops then unacknowledged ``subscribe`` packets will be dropped whether they are in the preflight or the in-flight queues. Parameters ---------- topics: iterable of MqttTopic Raises ------ haka_mqtt.exception.PacketIdReactorException Raised when there are no free packet ids to create a `MqttSubscribe` packet with. Returns -------- MqttSubscribeTicket """ self.__assert_state_rules() req = MqttSubscribeTicket(self.__send_path_packet_ids.acquire(), topics) self.__preflight_queue.append(req) self.__assert_state_rules() self.__update_io_notification() return req
[docs] def unsubscribe(self, topics): """Places an ``unsubscribe`` packet on the preflight queue. Messages in the preflight queue will be placed in-flight as soon as the socket allows. Multiple messages may be placed in-flight at the same time. If the reactor encounters an error or stops then unacknowledged ``unsubscribe`` packets will be dropped whether they are in the preflight or the in-flight queues. Parameters ---------- topics: iterable of str Raises ------ haka_mqtt.exception.PacketIdReactorException Raised when there are no free packet ids to create a `MqttUnsubscribe` packet with. Returns -------- MqttUnsubscribeTicket """ self.__assert_state_rules() req = MqttUnsubscribeTicket(self.__send_path_packet_ids.acquire(), topics) self.__preflight_queue.append(req) self.__assert_state_rules() self.__update_io_notification() return req
[docs] def publish(self, topic, payload, qos, retain=False): """Places a publish packet on the preflight queue. Messages in the preflight queue are fair-queued and launched to the server. The reactor certainly will try to place as many messages in-flight as it is able to. If you want to limit the number of messages in-flight then a queue should be maintained outside of the core reactor. QoS 0 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. If the reactor encounters an error or stops and is subsequently started then any QoS=0 messages in the preflight queue are discarded. QoS 0 messages are considered delivered as soon as one of their bytes is placed in the socket write buffer regardless of whether the network successfully delivers them to their destination. QoS 1 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. They are placed in the in-flight queue as soon as the first byte of the packet is placed in the socket write buffer. If the reactor encounters an error or stops and is subsequently started then any QoS=1 messages in the preflight queue maintain their positions. Any messages in the in-flight queue are placed in the front of the preflight queue as ``publish`` packets with their dupe flags set to ``True``. QoS 2 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. They are placed in the in-flight queue as soon as the first byte of the packet is placed in the socket write buffer. If the reactor encounters an error or stops and is subsequently started then any QoS=2 messages in the preflight queue maintain their positions. Any messages in the in-flight queue awaiting ``pubrec`` acknowledgements are placed in the front of the preflight queue as publish packets with their dupe flags set to ``True``. Any messages in the in-flight queue awaiting ``pubcomp`` acknowledgements are placed in the front of the preflight queue as ``pubrel`` packets. Parameters ----------- topic: str payload: bytes qos: int 0 <= qos <= 2 retain: bool Raises ------ haka_mqtt.exception.PacketIdReactorException Raised when there are no free packet ids to create a `MqttPublish` packet with. Return ------- MqttPublishTicket A publish ticket. The returned object will satisfy `ticket.status is MqttPublishStatus.preflight`. """ self.__assert_state_rules() assert 0 <= qos <= 2 assert isinstance(payload, bytes) if qos is 0: packet_id = 0 elif qos is 1 or 2: packet_id = self.__send_path_packet_ids.acquire() else: raise NotImplementedError(qos) req = MqttPublishTicket(packet_id, topic, payload, qos, retain) self.__preflight_queue.append(req) self.__assert_state_rules() self.__update_io_notification() return req
def __start(self): assert self.sock_state in INACTIVE_SOCK_STATES assert self.mqtt_state in INACTIVE_MQTT_STATES assert self.state in INACTIVE_STATES self.__log.info('Starting.') self.__error = None self.__ssl_want_write = False self.__ssl_want_read = False self.__pingreq_active = False self.__name_resolution_future = None preflight_queue = [] for p in self.__inflight_queue.values(): if p.packet_type is MqttControlPacketType.publish: # Publish packets in self.__inflight_queue will be # re-transmitted and the dupe flag must be set on the # re-transmitted packet. # # [MQTT-3.3.1.-1] # if p.qos == 1: assert p.status is MqttPublishStatus.puback, p.status p._set_dupe() elif p.qos == 2: if p.status is MqttPublishStatus.pubrec: p._set_dupe() else: raise NotImplementedError(p.qos) preflight_queue.append(p) for p in self.__preflight_queue: if p.packet_type in (MqttControlPacketType.publish, MqttControlPacketType.pubrel): preflight_queue.append(p) self.socket = None self.__inflight_queue = OrderedDict() self.__preflight_queue = preflight_queue self.__wbuf = bytearray() self.__rbuf = bytearray() self.__state = ReactorState.starting self.__sock_state = SocketState.name_resolution self.__mqtt_state = MqttState.connack self.__log.info('Looking up host %s:%d.', self.__host, self.__port) self.__name_resolution_future = self.__name_resolver(*self.__getaddrinfo_params) self.__name_resolution_future.add_done_callback(self.__on_name_resolution) def __on_name_resolution(self, future): """Called when hostname resolution is complete. Parameters ---------- future The future.results will be a 5-tuple of (family, socktype, proto, canonname, sockaddr) or None if there was no result. """ assert self.sock_state is SocketState.name_resolution assert future.done() if not future.cancelled(): results = future.result(timeout=0) if results is None: e = future.exception(timeout=0) self.__log.error('%s (errno=%d). Aborting.', e.strerror, e.errno) self.__abort(AddressReactorError(e)) else: if len(results) == 0: self.__log.error('No hostname entries found. Aborting.') self.__abort(AddressReactorError(socket.gaierror(socket.EAI_NONAME, 'Name or service not known'))) elif len(results) > 0: self.__log_name_resolution(results[0], chosen=True) for result in results[1:]: self.__log_name_resolution(result) self.__connect(results[0]) else: raise NotImplementedError(len(results)) def __log_name_resolution(self, resolution, chosen=False): """Called when hostname resolution is complete. Parameters ---------- resolution: 5-tuple A 5-tuple of (family, socktype, proto, canonname, sockaddr). chosen: bool True if this log entry is to be marked as a chosen sockaddr to connect to. """ family, socktype, proto, canonname, sockaddr = resolution socktype_str = { socket.SOCK_DGRAM: 'sock_dgram', socket.SOCK_STREAM: 'sock_stream', socket.SOCK_RAW: 'sock_raw', }.get(socktype, str(socktype)) proto_str = { socket.IPPROTO_TCP: 'tcp', socket.IPPROTO_IP: 'ip', socket.IPPROTO_UDP: 'udp', }.get(proto, str(proto)) if chosen: chosen_postfix = ' (chosen)' else: chosen_postfix = ' ' if family == socket.AF_INET: ip, port = sockaddr msg = "Found family=inet sock=%s proto=%s addr=%s:%d%s" args = ( socktype_str, proto_str, ip, port, chosen_postfix ) elif family == socket.AF_INET6: ip6, port, flow_info, scope_id = sockaddr msg = "Found family=inet6 sock=%s proto=%s addr=[%s]:%d%s" args = ( socktype_str, proto_str, ip6, port, chosen_postfix ) else: raise NotImplementedError(family) if chosen: self.__log.info(msg, *args) else: self.__log.debug(msg, *args) def __connect(self, resolution): """Connect to the given resolved address. Parameters ---------- resolution: 5-tuple A 5-tuple of (family, socktype, proto, canonname, sockaddr) as returned by `socket.getaddrinfo`.""" assert self.sock_state is SocketState.name_resolution assert self.state is ReactorState.starting family, socktype, proto, canonname, sockaddr = resolution try: self.__sock_state = SocketState.connecting self.socket = self.__socket_factory(self.__getaddrinfo_params, sockaddr) self.socket.connect(sockaddr) except socket.error as e: if e.errno == errno.EINPROGRESS: # Connection in progress. self.__update_io_notification() self.__log.info("Connecting.") else: self.__abort_socket_error(SocketReactorError(e.errno)) else: self.__on_connect()
[docs] def start(self): """Attempts to connect with remote if in one of the inactive states :py:const:`ReactorState.init`, :py:const:`ReactorState.stopped`, :py:const:`ReactorState.error`. The method has no effect if already in an active state. """ self.__assert_state_rules() if self.state in INACTIVE_STATES: self.__start() elif self.state is ReactorState.starting: self.__log.warning("Start while already starting; taking no additional action.") elif self.state is ReactorState.started: self.__log.warning("Start while already started; taking no action.") elif self.state is ReactorState.stopping: self.__log.warning("Start while already stopping; ignoring start and continuing to stop.") else: raise NotImplementedError(self.state) self.__assert_state_rules() self.__update_io_notification()
[docs] def stop(self): self.__assert_state_rules() if self.state is ReactorState.init: self.__log.info('Stopped.') self.__state = ReactorState.stopped elif self.state in (ReactorState.starting, ReactorState.started): self.__log.info('Stopping.') if self.sock_state is SocketState.name_resolution: self.__terminate(ReactorState.stopped, None) elif self.sock_state is SocketState.connecting: self.__terminate(ReactorState.stopped, None) elif self.sock_state is SocketState.handshake: self.__terminate(ReactorState.stopped, None) else: self.__state = ReactorState.stopping self.__preflight_queue.append(MqttDisconnect()) elif self.state is ReactorState.stopping: self.__log.warning('Stop while already stopping.') elif self.state is ReactorState.stopped: self.__log.warning('Stop while already stopped.') elif self.state is ReactorState.error: self.__log.warning('Stop while reactor in error.') else: raise NotImplementedError(self.state) self.__update_io_notification() self.__assert_state_rules()
[docs] def terminate(self): """When in an active state immediately shuts down any socket reading and writing, closes the socket, cancels all outstanding scheduler deadlines, puts the reactor into state ReactorState.stopped, then calls self.on_connect_fail (if in a connect/connack state) or alternatively self.on_disconnect if in some other active state. When reactor is not in an inactive state this method has no effect. """ self.__assert_state_rules() self.__log.info('Terminating.') if self.state in ACTIVE_STATES: self.__terminate(ReactorState.stopped) elif self.state in INACTIVE_STATES: pass else: raise NotImplementedError(self.state) self.__update_io_notification() self.__assert_state_rules()
[docs] def want_read(self): """True if the reactor is ready to process incoming socket data; False otherwise. Returns ------- bool """ if self.sock_state is SocketState.handshake: rv = self.__ssl_want_read elif self.sock_state in (SocketState.connected, SocketState.mute): rv = True else: rv = False return rv
[docs] def want_write(self): """True if the reactor is ready write data to the socket; False otherwise. Returns ------- bool """ if self.sock_state in (SocketState.stopped, SocketState.name_resolution, SocketState.mute): rv = False elif self.sock_state is SocketState.connecting: rv = True elif self.sock_state is SocketState.handshake: rv = self.__ssl_want_write elif self.sock_state is SocketState.connected: rv = bool(self.__wbuf) or bool(self.__preflight_queue) else: raise NotImplementedError(self.sock_state) return rv
def __decode_packet_body(self, header, num_header_bytes, packet_class): num_packet_bytes = num_header_bytes + header.remaining_len body = self.__rbuf[num_header_bytes:num_packet_bytes] num_body_bytes_consumed, packet = packet_class.decode_body(header, BytesReader(body)) assert num_packet_bytes == num_header_bytes + num_body_bytes_consumed self.__rbuf = bytearray(self.__rbuf[num_packet_bytes:]) return packet def __on_recv_bytes(self, new_bytes): assert self.sock_state in (SocketState.connected, SocketState.mute) assert len(new_bytes) > 0 if self.sock_state is not SocketState.mute: if self.__recv_idle_ping_deadline is not None: self.__recv_idle_ping_deadline.cancel() self.__recv_idle_ping_deadline = None if self.recv_idle_ping_period > 0: self.__recv_idle_ping_deadline = self.__scheduler.add(self.recv_idle_ping_period, self.__recv_idle_ping_timeout) self.__recv_idle_abort_deadline.cancel() self.__recv_idle_abort_deadline = self.__scheduler.add(self.__recv_idle_abort_period, self.__recv_idle_abort_timeout) self.__log.debug('recv %d bytes 0x%s', len(new_bytes), HexOnStr(new_bytes)) self.__rbuf.extend(new_bytes) while True: num_header_bytes, header = MqttFixedHeader.decode(BytesReader(bytes(self.__rbuf))) num_packet_bytes = num_header_bytes + header.remaining_len if len(self.__rbuf) >= num_packet_bytes: if header.packet_type == MqttControlPacketType.connack: self.__on_connack(self.__decode_packet_body(header, num_header_bytes, MqttConnack)) elif header.packet_type == MqttControlPacketType.suback: self.__on_suback(self.__decode_packet_body(header, num_header_bytes, MqttSuback)) elif header.packet_type == MqttControlPacketType.unsuback: self.__on_unsuback(self.__decode_packet_body(header, num_header_bytes, MqttUnsuback)) elif header.packet_type == MqttControlPacketType.puback: self.__on_puback(self.__decode_packet_body(header, num_header_bytes, MqttPuback)) elif header.packet_type == MqttControlPacketType.publish: self.__on_publish(self.__decode_packet_body(header, num_header_bytes, MqttPublish)) elif header.packet_type == MqttControlPacketType.pingresp: self.__on_pingresp(self.__decode_packet_body(header, num_header_bytes, MqttPingresp)) elif header.packet_type == MqttControlPacketType.pubrel: self.__on_pubrel(self.__decode_packet_body(header, num_header_bytes, MqttPubrel)) elif header.packet_type == MqttControlPacketType.pubcomp: self.__on_pubcomp(self.__decode_packet_body(header, num_header_bytes, MqttPubcomp)) elif header.packet_type == MqttControlPacketType.pubrec: self.__on_pubrec(self.__decode_packet_body(header, num_header_bytes, MqttPubrec)) else: m = 'Received unsupported message type {}.'.format(header.packet_type) self.__log.error(m) self.__abort(DecodeReactorError(m))
[docs] def read(self): """Calls recv on underlying socket exactly once and returns the number of bytes read. If the underlying socket does not return any bytes due to an error or exception then zero is returned and the reactor state is set to error. This method may be called at any time in any state and if `self` is not prepared for a read at that point then no action will be taken. The `socket.settimeout` can be used to perform a blocking read with a timeout on the underlying socket. Returns ------- int number of bytes read from socket. """ self.__assert_state_rules() self.__ssl_want_write = False self.__ssl_want_read = False num_bytes_read = 0 if self.sock_state in INACTIVE_SOCK_STATES: pass elif self.sock_state in (SocketState.name_resolution, SocketState.connecting, SocketState.deaf): pass elif self.sock_state is SocketState.handshake: self.__set_handshake() elif self.sock_state in (SocketState.connected, SocketState.mute): try: new_bytes = self.socket.recv(4096) num_bytes_read = len(new_bytes) if new_bytes: self.__on_recv_bytes(new_bytes) else: self.__on_muted_remote() except UnderflowDecodeError: # Not enough header bytes. pass except DecodeError as e: self.__log.error('Error decoding message (%s)', str(e)) self.__abort(DecodeReactorError(str(e))) except ssl.SSLWantWriteError: self.__ssl_want_write = True except ssl.SSLWantReadError: self.__ssl_want_read = True except ssl.SSLError as e: # TODO #14 # # Issue: https://github.com/kcallin/haka-mqtt/issues/14 # # In blocking socket mode can't find a way to detect # a timeout other than a string comparison. SUPER # brittle. Don't like at all! # to = ssl.SSLError('The read operation timed out') if e.message == 'The read operation timed out': self.__ssl_want_read = True else: self.__log.error("SSLError while reading socket; %s.", ReprOnStr(e)) self.__abort(SslReactorError(e)) except socket.timeout: # See https://github.com/kcallin/haka-mqtt/issues/25 pass except socket.error as e: if e.errno == errno.EWOULDBLOCK: # No write space ready. pass else: self.__abort_socket_error(SocketReactorError(e.errno)) else: raise NotImplementedError(self.sock_state) self.__update_io_notification() self.__assert_state_rules() return num_bytes_read
def __on_connack_accepted(self, connack): assert self.mqtt_state is MqttState.connack, self.mqtt_state if connack.session_present and self.clean_session: self.__abort_protocol_violation('Server indicates a session is present when none was requested' ' [MQTT-3.2.2-1].') else: if self.state is ReactorState.starting: self.__state = ReactorState.started if self.keepalive_period and self.__pingreq_due: # The keepalive period timed out while waiting for # connack. Pingreq requests are not launched while # waiting for connack; it has been deferred. # Launch the defferred pingreq now. assert self.__launch_pingreq_if_inactive() elif self.state is ReactorState.stopping: pass else: raise NotImplementedError(self.state) self.__mqtt_state = MqttState.connected self.on_connack(self, connack) self.__update_io_notification() def __on_connack(self, connack): """Called once when a connack packet is received. Parameters ---------- connack: MqttConnack """ if self.mqtt_state is MqttState.connack: self.__log.info('Received %s.', repr(connack)) # TODO: should not close incoming socket at this time; # give server opportunity to close socket of its own # accord then give timeout. # if connack.return_code == ConnackResult.accepted: # The first packet sent from the Server to the Client MUST # be a CONNACK Packet [MQTT-3.2.0-1]. self.__on_connack_accepted(connack) elif connack.return_code == ConnackResult.fail_bad_protocol_version: self.__log.error('Connect failed: bad protocol version.') self.__abort(ConnectReactorError(connack.return_code)) elif connack.return_code == ConnackResult.fail_bad_client_id: self.__log.error('Connect failed: bad client ID.') self.__abort(ConnectReactorError(connack.return_code)) elif connack.return_code == ConnackResult.fail_server_unavailable: self.__log.error('Connect failed: server unavailable.') self.__abort(ConnectReactorError(connack.return_code)) elif connack.return_code == ConnackResult.fail_bad_username_or_password: self.__log.error('Connect failed: bad username or password.') self.__abort(ConnectReactorError(connack.return_code)) elif connack.return_code == ConnackResult.fail_not_authorized: self.__log.error('Connect failed: not authorized.') self.__abort(ConnectReactorError(connack.return_code)) else: raise NotImplementedError(connack.return_code) elif self.mqtt_state is MqttState.connected: self.__abort_protocol_violation('Received connack at an inappropriate time. [MQTT-3.2.0-1]') else: raise NotImplementedError(self.mqtt_state) def __on_publish(self, publish): """Called when a publish packet is received from the remote. Parameters ---------- publish: :class:`mqtt_codec.packet.MqttPublish` """ if self.mqtt_state is MqttState.connack: self.__abort_early_packet(publish) elif self.mqtt_state is MqttState.connected: self.__log.info('Received %s.', repr(publish)) self.on_publish(self, publish) if self.sock_state in (SocketState.connected, SocketState.deaf): if publish.qos == 0: pass elif publish.qos == 1: self.__preflight_queue.append(MqttPuback(publish.packet_id)) elif publish.qos == 2: self.__preflight_queue.append(MqttPubrec(publish.packet_id)) else: raise NotImplementedError(publish.qos) elif self.sock_state is SocketState.mute: if publish.qos == 0: pass elif publish.qos == 1: self.__log.info('No puback will be published because reactor is stopping.') elif publish.qos == 2: self.__log.info('No pubrec will be published because reactor is stopping.') else: raise NotImplementedError(publish.qos) else: raise NotImplementedError(self.sock_state) else: raise NotImplementedError(self.mqtt_state) def __on_suback(self, suback): """Called when a suback packet is received from remote. Parameters ---------- suback: MqttSuback """ if self.mqtt_state is MqttState.connack: self.__abort_early_packet(suback) elif self.mqtt_state is MqttState.connected: subscribe = self.__get_packet_type(suback.packet_id, MqttControlPacketType.subscribe) if subscribe is None: self.__abort_protocol_violation('Received %s for a mid that is not in-flight; aborting.', repr(suback)) else: if len(suback.results) == len(subscribe.topics): self.__log.info('Received %s.', repr(suback)) subscribe._set_status(MqttSubscribeStatus.done) self.__send_path_packet_ids.release(subscribe.packet_id) del self.__inflight_queue[suback.packet_id] self.on_suback(self, suback) else: m = 'Received %s as a response to %s, but the number of subscription' \ ' results does not equal the number of subscription requests; aborting.' self.__abort_protocol_violation(m, repr(suback), repr(subscribe.packet())) else: raise NotImplementedError(self.mqtt_state) def __on_unsuback(self, unsuback): """Called when a suback packet is received from remote. Parameters ---------- unsuback: mqtt_codec.packet.MqttUnsuback """ if self.mqtt_state is MqttState.connack: self.__abort_early_packet(unsuback) elif self.mqtt_state is MqttState.connected: unsubscribe = self.__get_packet_type(unsuback.packet_id, MqttControlPacketType.unsubscribe) if unsubscribe is None: self.__abort_protocol_violation('Received %s for a mid that is not in-flight; aborting.', repr(unsuback)) else: self.__log.info('Received %s.', repr(unsuback)) unsubscribe._set_status(MqttSubscribeStatus.done) self.__send_path_packet_ids.release(unsubscribe.packet_id) del self.__inflight_queue[unsuback.packet_id] if self.on_unsuback is not None: self.on_unsuback(self, unsuback) else: raise NotImplementedError(self.mqtt_state) def __on_puback(self, puback): """Called when a puback packet is received from the remote. Parameters ---------- puback: MqttPuback """ if self.mqtt_state is MqttState.connack: self.__abort_early_packet(puback) elif self.mqtt_state is MqttState.connected: in_flight_packet_ids = [p.packet_id for p in self.__inflight_queue.values() if p.packet_type is MqttControlPacketType.publish] publish = self.__inflight_queue[in_flight_packet_ids[0]] if in_flight_packet_ids else None if publish and publish.packet_id == puback.packet_id: if publish.qos == 1: del self.__inflight_queue[puback.packet_id] self.__send_path_packet_ids.release(publish.packet_id) self.__log.info('Received %s.', repr(puback)) publish._set_status(MqttPublishStatus.done) self.on_puback(self, puback) else: self.__abort_protocol_violation('Received %s, an inappropriate response to qos=%d %s; aborting.', ReprOnStr(puback), publish.qos, ReprOnStr(publish)) elif publish and puback.packet_id in in_flight_packet_ids: m = 'Received %s instead of puback for next-in-flight packet_id=%d; aborting.' self.__abort_protocol_violation(m, ReprOnStr(puback), publish.packet_id) else: m = 'Received %s when packet_id=%d was not in-flight; aborting.' self.__abort_protocol_violation(m, ReprOnStr(puback), puback.packet_id) else: raise NotImplementedError(self.mqtt_state) def __on_pubrec(self, pubrec): """ Parameters ---------- pubrec: MqttPubrec """ if self.mqtt_state is MqttState.connack: self.__abort_early_packet(pubrec) elif self.mqtt_state is MqttState.connected: in_flight_publish_ids = dict([(p.packet_id, p) for p in self.__inflight_queue.values() if p.packet_type is MqttControlPacketType.publish]) if pubrec.packet_id in in_flight_publish_ids: publish_ticket = in_flight_publish_ids[pubrec.packet_id] if publish_ticket.qos == 2: del self.__inflight_queue[pubrec.packet_id] self.__log.info('Received %s.', repr(pubrec)) insert_idx = len(self.__preflight_queue) self.on_pubrec(self, pubrec) self.__preflight_queue.insert(insert_idx, MqttPubrel(pubrec.packet_id)) else: self.__abort_protocol_violation('Received unexpected %s in response to qos=%d publish %s; aborting.', ReprOnStr(pubrec), publish_ticket.qos, ReprOnStr(publish_ticket.packet())) else: m = 'Received unexpected %s when packet_id=%d was not in-flight; aborting.' self.__abort_protocol_violation(m, ReprOnStr(pubrec), pubrec.packet_id) else: raise NotImplementedError(self.mqtt_state) def __on_pubcomp(self, pubcomp): """ Parameters ---------- pubcomp: MqttPubcomp """ if self.mqtt_state is MqttState.connack: self.__abort_early_packet(pubcomp) elif self.mqtt_state is MqttState.connected: in_flight_pubrel = dict([(p.packet_id, p) for p in self.__inflight_queue.values() if p.packet_type is MqttControlPacketType.pubrel]) if pubcomp.packet_id in in_flight_pubrel: del self.__inflight_queue[pubcomp.packet_id] self.__log.info('Received %s.', repr(pubcomp)) self.on_pubcomp(self, pubcomp) else: m = 'Received %s when no pubrel for packet_id=%d was in-flight; aborting.' self.__abort_protocol_violation(m, ReprOnStr(pubcomp), pubcomp.packet_id) else: raise NotImplementedError(self.mqtt_state) def __on_pubrel(self, pubrel): """ Part of QoS=2 receive path. Parameters ---------- pubrel: MqttPubrel """ if self.mqtt_state is MqttState.connack: self.__abort_early_packet(pubrel) elif self.mqtt_state is MqttState.connected: self.__log.info('Received %s.', repr(pubrel)) self.on_pubrel(self, pubrel) self.__preflight_queue.append(MqttPubcomp(pubrel.packet_id)) else: raise NotImplementedError(self.mqtt_state) def __on_pingresp(self, pingresp): """ Parameters ---------- pingresp: :class:`mqtt_codec.packet.MqttPingresp` """ if self.mqtt_state is MqttState.connack: self.__abort_early_packet(pingresp) elif self.mqtt_state is MqttState.connected: if self.__pingreq_active: self.__log.info('Received %s.', repr(pingresp)) self.__pingreq_active = False else: self.__log.warning('Received unsolicited %s.', repr(pingresp)) if self.__pingreq_due: self.__launch_pingreq_if_inactive() else: raise NotImplementedError(self.mqtt_state) def __on_muted_remote(self): assert self.sock_state in (SocketState.handshake, SocketState.connected, SocketState.mute) if self.sock_state in (SocketState.handshake, SocketState.connected): self.__log.warning('Remote has unexpectedly closed remote->local writes; Aborting.') self.__abort(MutePeerReactorError()) elif self.sock_state is SocketState.mute: # Socket sending already closed (socket is mute). # If socket receive is also closed (socket is deaf), then # it is time for the socket to be closed. self.__log.info('Remote has gracefully closed remote->local writes; Stopped.') self.__terminate(ReactorState.stopped, None) else: raise NotImplementedError(self.sock_state) def __launch_packets(self): """Transfer as many bytes as possible to the network subsystem. If bytes from a packet are successfully placed on-the-wire then that packet will be considered "in-flight". This method will make exactly one call to send. Returns ------- int Returns number of bytes flushed to output buffers. """ # Try to have at least as many bytes to send as there are in the # socket send buffer. min_buf_size = 2**12 # == 4096 wbuf_size = len(self.__wbuf) #************************************** # # 0 1 2 3 4 5 6 7 8 # -----------x|----x|----x| # # packet_end_offset = [1, 4, 7] # packet_end_offsets = [wbuf_size] bio = BytesIO() for packet_record in self.__preflight_queue: wbuf_size += packet_record.encode(bio) packet_end_offsets.append(wbuf_size) if packet_record.packet_type is MqttControlPacketType.disconnect or wbuf_size >= min_buf_size: break # Write as many bytes as possible. self.__wbuf.extend(bio.getvalue()) num_bytes_flushed = self.__flush() assert num_bytes_flushed <= len(self.__wbuf) # Mark launched messages as in-flight. num_messages_launched = 0 for packet_end_offset in packet_end_offsets: if num_bytes_flushed > packet_end_offset: num_messages_launched += 1 else: break launched_packets = self.__preflight_queue[0:num_messages_launched] del self.__preflight_queue[0:num_messages_launched] for packet_record in launched_packets: packet = packet_record if packet.packet_type is MqttControlPacketType.connect: self.__log.info('Launching message %s.', packet.packet()) else: self.__log.info('Launching message %s.', ReprOnStr(packet.packet())) # if packet.packet_type is MqttControlPacketType.connect: # pass # elif packet.packet_type is MqttControlPacketType.connack: # pass if packet_record.packet_type is MqttControlPacketType.publish: if packet_record.qos == 0: packet_record._set_status(MqttPublishStatus.done) elif packet_record.qos == 1: packet_record._set_status(MqttPublishStatus.puback) assert packet_record.packet_id not in self.__inflight_queue self.__inflight_queue[packet_record.packet_id] = packet_record elif packet_record.qos == 2: packet_record._set_status(MqttPublishStatus.pubrec) assert packet_record.packet_id not in self.__inflight_queue self.__inflight_queue[packet_record.packet_id] = packet_record else: raise NotImplementedError(packet_record.qos) # elif packet.packet_type is MqttControlPacketType.puback: # pass # elif packet.packet_type is MqttControlPacketType.pubrec: # pass elif packet_record.packet_type is MqttControlPacketType.pubrel: assert packet_record.packet_id not in self.__inflight_queue self.__inflight_queue[packet_record.packet_id] = packet_record # elif packet.packet_type is MqttControlPacketType.pubcomp: # pass elif packet_record.packet_type is MqttControlPacketType.subscribe: packet_record._set_status(MqttSubscribeStatus.ack) assert packet_record.packet_id not in self.__inflight_queue self.__inflight_queue[packet_record.packet_id] = packet_record # elif packet.packet_type is MqttControlPacketType.suback: # pass elif packet_record.packet_type is MqttControlPacketType.unsubscribe: assert packet_record.packet_id not in self.__inflight_queue self.__inflight_queue[packet_record.packet_id] = packet_record # elif packet.packet_type is MqttControlPacketType.unsuback: # pass # elif packet.packet_type is MqttControlPacketType.pingreq: # pass # elif packet.packet_type is MqttControlPacketType.pingresp: # pass elif packet.packet_type is MqttControlPacketType.disconnect: assert self.state is ReactorState.stopping self.__log.info('Shutting down outgoing stream.') self.socket.shutdown(socket.SHUT_WR) self.__sock_state = SocketState.mute if self.__keepalive_due_deadline is not None: self.__keepalive_due_deadline.cancel() self.__keepalive_due_deadline = None if self.__recv_idle_ping_deadline is not None: self.__recv_idle_ping_deadline.cancel() self.__recv_idle_ping_deadline = None assert self.__recv_idle_abort_deadline is not None if num_bytes_flushed: self.__log.debug('send %d bytes 0x%s.', num_bytes_flushed, HexOnStr(self.__wbuf[0:num_bytes_flushed])) self.__wbuf = self.__wbuf[num_bytes_flushed:packet_end_offsets[num_messages_launched]] return num_bytes_flushed def __feed_wbuf(self): """Feeds the socket write buffer if the socket is in a state where they can be sent. Packets from the preflight queue are placed on the inflight queue as necessary to acquire the necessary bytes. Returns ------- int Returns number of bytes flushed to output buffers. """ if self.sock_state in (SocketState.connected, SocketState.deaf): num_bytes_flushed = self.__launch_packets() elif self.sock_state is SocketState.handshake: num_bytes_flushed = 0 elif self.sock_state in (SocketState.stopped, SocketState.mute, SocketState.name_resolution, SocketState.connecting): num_bytes_flushed = 0 else: raise NotImplementedError(self.sock_state) return num_bytes_flushed def __set_connack(self): assert self.sock_state in (SocketState.connecting, SocketState.handshake) assert self.mqtt_state is MqttState.connack assert self.state is ReactorState.starting assert not self.__inflight_queue assert not self.__wbuf self.__sock_state = SocketState.connected connect = MqttConnect(self.client_id, self.clean_session, self.keepalive_period, will=self.will, username=self.__username, password=self.__password) self.__preflight_queue.insert(0, connect) self.__update_io_notification() def __set_handshake(self): assert self.sock_state in (SocketState.connecting, SocketState.handshake) assert self.state is ReactorState.starting self.__sock_state = SocketState.handshake self.__ssl_want_read = False self.__ssl_want_write = False try: self.socket.do_handshake() except ssl.SSLWantReadError: self.__ssl_want_read = True self.__update_io_notification() except ssl.SSLWantWriteError: self.__ssl_want_write = True self.__update_io_notification() except ssl.SSLError as e: self.__log.warning('SSL handshake failure: %s.', e) self.__abort(SslReactorError(e)) except socket.error as e: self.__log.warning('SSL handshake failure: %s.', e) self.__abort(SocketReactorError(e.errno)) else: self.__set_connack() def __on_connect(self): """Called when a socket becomes connected; reactor must be in the `ReactorState.connecting` state.""" assert self.sock_state is SocketState.connecting assert self.state is ReactorState.starting assert self.__recv_idle_abort_deadline is None self.__log.info('Connected.') self.__recv_idle_abort_deadline = self.__scheduler.add(self.recv_idle_abort_period, self.__recv_idle_abort_timeout) if hasattr(self.socket, 'do_handshake'): self.__set_handshake() else: self.__set_connack() def __flush(self): """Calls send exactly once; returning the number of bytes written. Returns ------- int Number of bytes written. """ self.__ssl_want_read = False self.__ssl_want_write = False num_bytes_written = 0 if self.__wbuf: try: num_bytes_written = self.socket.send(self.__wbuf) except ssl.SSLWantReadError: self.__ssl_want_read = True except ssl.SSLWantWriteError: self.__ssl_want_write = True except ssl.SSLError as e: self.__log.error("SSLError while writing to socket; %s.", ReprOnStr(e)) self.__abort(SslReactorError(e)) except socket.timeout: # See https://github.com/kcallin/haka-mqtt/issues/25 pass except socket.error as e: if e.errno == errno.EWOULDBLOCK: # No write space ready. pass elif e.errno == errno.EPIPE: self.__log.error("Remote unexpectedly closed the connection (<%s: %d>); Aborting.", errno.errorcode[e.errno], e.errno) self.__abort(SocketReactorError(e.errno)) else: self.__abort_socket_error(SocketReactorError(e.errno)) if num_bytes_written > 0: if self.sock_state in (SocketState.connected, SocketState.deaf): if self.__keepalive_due_deadline is not None: self.__keepalive_due_deadline.cancel() self.__keepalive_due_deadline = None if self.keepalive_period: self.__keepalive_due_deadline = self.__scheduler.add(self.keepalive_period, self.__keepalive_due_timeout) return num_bytes_written # https://eli.thegreenplace.net/2009/06/12/safely-using-destructors-in-python/ # https://www.electricmonk.nl/log/2008/07/07/python-destructor-and-garbage-collection-notes/ # https://docs.python.org/2/reference/datamodel.html#object.__del__ # # The reactor contains circular references because (at least) the # scheduler. The reactor has a reference to the scheduler, the # scheduler grants ticket references to the reactor, and the tickets # contain references to the scheduler. # # How to implement __del__? Easiest just to call terminate which # closes all resources. # # Implementing __del__ looks very tricky. # # def __del__(self): # pass # def __terminate_socket(self): """Cleans up all socket-related resources. * Closes any name resolution future and sets to None. * Removes any socket from the selector. * Shuts down reading and writing on socket. * Calls close on socket. * Ensures want_read and want_write are False. * Sets sock_state to SocketState.stopped. """ if self.__name_resolution_future is not None: self.__name_resolution_future.cancel() self.__name_resolution_future = None if self.socket is not None: self.__selector.update(False, False, self.socket) try: self.socket.shutdown(socket.SHUT_RDWR) except socket.error as e: if e.errno == errno.ENOTCONN: pass else: raise NotImplementedError(e) self.socket.close() self.socket = None self.__update_io_notification() self.__sock_state = SocketState.stopped def __terminate(self, state, error=None): """ Parameters ---------- state: ReactorState error: ReactorError """ assert state in INACTIVE_STATES # Clean up all socket-related resources. self.__terminate_socket() # Clean up all MQTT protocol related items. if self.mqtt_state is MqttState.connack: on_disconnect_cb = self.on_connect_fail elif self.mqtt_state in (MqttState.connected, MqttState.mute): on_disconnect_cb = self.on_disconnect elif self.mqtt_state in INACTIVE_MQTT_STATES: on_disconnect_cb = None else: raise NotImplementedError(self.state) self.__pingreq_active = False self.__wbuf = bytearray() self.__rbuf = bytearray() if self.__recv_idle_abort_deadline is not None: self.__recv_idle_abort_deadline.cancel() self.__recv_idle_abort_deadline = None if self.__recv_idle_ping_deadline is not None: self.__recv_idle_ping_deadline.cancel() self.__recv_idle_ping_deadline = None if self.__keepalive_due_deadline is not None: self.__keepalive_due_deadline.cancel() self.__keepalive_due_deadline = None self.__state = state self.__error = error self.__mqtt_state = MqttState.stopped if callable(on_disconnect_cb): on_disconnect_cb(self) def __abort_socket_error(self, se): """ Parameters ---------- se: SocketReactorError """ self.__log.error('%s (<%s: %d>). Aborting.', os.strerror(se.errno), errno.errorcode[se.errno], se.errno) self.__abort(se) def __abort_early_packet(self, p): self.__abort_protocol_violation('Received %s before connack. [MQTT-3.2.0-1]', repr(p)) def __abort_protocol_violation(self, m, *params): self.__log.error(m, *params) self.__abort(ProtocolReactorError(m % params)) def __abort(self, e): """Immediately terminates all active resources and sets `self.state` to the final state `ReactorState.error`. Parameters ---------- e: ReactorError """ self.__terminate(ReactorState.error, e) def __launch_pingreq_if_inactive(self): """Launch pingreq if it one is not already active. Returns ------- bool True if a pingreq has been launched, false otherwise. """ if not self.__pingreq_active: self.__pingreq_active = True self.__pingreq_due = False self.__preflight_queue.append(MqttPingreq()) rv = True else: rv = False return rv def __keepalive_due_timeout(self): """See [MQTT-3.1.2-23]""" self.__assert_state_rules() assert self.__keepalive_due_deadline is not None assert self.sock_state is SocketState.connected self.__keepalive_due_deadline.cancel() self.__keepalive_due_deadline = None self.__pingreq_due = True if self.mqtt_state is MqttState.connected: self.__launch_pingreq_if_inactive() self.__update_io_notification() self.__assert_state_rules() def __recv_idle_ping_timeout(self): """See [MQTT-3.1.2-23]""" self.__assert_state_rules() self.__launch_pingreq_if_inactive() self.__recv_idle_ping_deadline.cancel() self.__recv_idle_ping_deadline = None self.__update_io_notification() self.__assert_state_rules() def __recv_idle_abort_timeout(self): """Called when bytes have not been received from the server for at least ``self.recv_idle_abort_period`` seconds.""" self.__assert_state_rules() assert self.__recv_idle_abort_deadline is not None if self.sock_state in (SocketState.handshake, SocketState.connected, SocketState.mute, SocketState.deaf): msg = "More than abort period (%.01fs) has passed since last bytes received. Aborting." self.__log.warning(msg, self.recv_idle_abort_period) else: raise NotImplementedError(self.sock_state) self.__recv_idle_abort_deadline = None self.__abort(RecvTimeoutReactorError()) self.__update_io_notification() self.__assert_state_rules()
[docs] def write(self): """If there is any data queued to be written to the underlying socket then a single call to socket send will be made to try and flush it to the socket write buffer. This method may be called at any time in any state and if `self` is not prepared for a write at that point then no action will be taken. The `socket.settimeout` can be used to perform a blocking write with a timeout on the underlying socket. """ self.__assert_state_rules() if self.sock_state is SocketState.connecting: e = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if e == 0: self.__on_connect() self.__feed_wbuf() elif e == errno.EINPROGRESS: pass else: self.__abort_socket_error(SocketReactorError(e)) elif self.sock_state is SocketState.handshake: self.__set_handshake() self.__feed_wbuf() elif self.sock_state in (SocketState.connected, SocketState.deaf): self.__feed_wbuf() elif self.sock_state in (SocketState.name_resolution, SocketState.stopped, SocketState.mute): pass else: raise NotImplementedError(self.sock_state) self.__update_io_notification() self.__assert_state_rules()