API Reference¶
haka_mqtt.cycle_iter module¶
haka_mqtt.dns_async module¶
-
class
haka_mqtt.dns_async.
AsyncFutureDnsResolver
(thread_pool_size=1)[source]¶ Bases:
object
An executor that spawns a small thread pool for performing DNS lookups. DNS lookup task sare submitted by using calling this object as a function and those tasks will be completed asynchronously by threads in a thread pool. The completed tasks are posted back to an internal queue and the
poll()
method gets the completed tasks from the queue and notifies their subscribers of completion. Tasks with done callback methods will be called by poll on the same threat that poll is called on.>>> resolver = AsyncFutureDnsResolver() >>> >>> lookup_result = None >>> >>> def lookup_finished(future): ... global lookup_result ... lookup_result = future.result() ... >>> future = resolver('localhost', 80) >>> future.add_done_callback(lookup_finished) >>> >>> while not future.done(): ... # Calling poll services callbacks and sets future to done. ... resolver.poll() ... sleep(0.1) ... >>> assert future.done() >>> >>> # Only timeout=0 presently supported. >>> assert not future.exception(timeout=0) >>> assert not future.cancelled() >>> # future.result(0) contains outcome of asynchronous call >>> # to socket.getaddrinfo. Note that at this time only timeout=0 >>> # is supported by this limited api.
haka_mqtt.dns_sync module¶
-
class
haka_mqtt.dns_sync.
SynchronousFuture
(result=None, exception=None)[source]¶ Bases:
object
-
add_done_callback
(fn)[source]¶ Attaches the callable fn to the future. fn will be called, with the future as its only argument, when the future is cancelled or finishes running.
Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them. If the callable raises an Exception subclass, it will be logged and ignored. If the callable raises a BaseException subclass, the behavior is undefined.
If the future has already completed or been cancelled, fn will be called immediately.
-
cancel
()[source]¶ Always returns False since this future is finished the instant it is created.
Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.
Returns: Return type: bool
-
cancelled
()[source]¶ Always returns False since this future is finished the instant it is created.
Return True if the call was successfully cancelled.
Returns: Return type: bool
-
done
()[source]¶ Always returns true since this future is finished the instant it is created.
Return True if the call was successfully cancelled or finished running.
Returns: Return type: bool
-
exception
(timeout=None)[source]¶ Immediately returns the exception raised by the call or None if the call completed without raising.
Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
If the future is cancelled before completing then CancelledError will be raised.
If the call completed without raising, None is returned.
-
result
(timeout=None)[source]¶ Immediately returns the call resultor None if the call raised an exception.
Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
If the future is cancelled before completing then CancelledError will be raised.
If the call raised, this method will raise the same exception.
-
haka_mqtt.exception module¶
-
exception
haka_mqtt.exception.
ReactorException
[source]¶ Bases:
exceptions.Exception
haka_mqtt.on_str module¶
haka_mqtt.reactor module¶
The reactor module provides an MQTT reactor class suitable for use
with select
and select-like interfaces like epoll. An adapter is
available to make it conveniently usable in a poll environment
(haka_mqtt.frontends.poll
).
-
class
haka_mqtt.reactor.
AddressReactorError
(gaierror)[source]¶ Bases:
haka_mqtt.reactor.ReactorError
Failed to lookup a valid address.
Parameters: gaierror (socket.gaierror) – -
gaierror
¶ socket.gaierror – Addressing error.
-
-
class
haka_mqtt.reactor.
ConnectReactorError
(result)[source]¶ Bases:
haka_mqtt.reactor.ReactorError
Error that occurs when the server sends a connack fail in response to an initial connect packet.
Parameters: result (ConnackResult) – Asserted not to be ConnackResult.accepted. -
result
¶ ConnackResult – guaranteed that value is not ConnackResult.accepted.
-
-
class
haka_mqtt.reactor.
DecodeReactorError
(description)[source]¶ Bases:
haka_mqtt.reactor.ReactorError
Server wrote a sequence of bytes that could not be interpreted as an MQTT packet.
-
class
haka_mqtt.reactor.
MqttState
[source]¶ Bases:
enum.IntEnum
Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met.
Active States:
Inactive States:
-
connack
= 0¶
-
connected
= 1¶
-
mute
= 2¶
-
stopped
= 3¶
-
-
class
haka_mqtt.reactor.
MutePeerReactorError
[source]¶ Bases:
haka_mqtt.reactor.ReactorError
Error that occurs when the server closes its write stream unexpectedly.
-
class
haka_mqtt.reactor.
ProtocolReactorError
(description)[source]¶ Bases:
haka_mqtt.reactor.ReactorError
Server send an inappropriate MQTT packet to the client.
-
class
haka_mqtt.reactor.
Reactor
(properties, log='haka')[source]¶ Bases:
object
Parameters: - properties (ReactorProperties) –
- log (str or logging.Logger or None) – If str then the result of logging.getLogger(log) is used as a logger; otherwise assumes that is a logging.Logger-like object and asserts that it has debug, info, warning, error, and critical methods. If log is None then logging is disabled.
-
clean_session
¶ bool – Clean session flag is true/false.
-
client_id
¶ str – Client id.
-
error
¶ ReactorError or None – When self.state is ReactorState.error returns a subclass of ReactorError otherwise returns None.
-
is_active
()[source]¶ True
when reactor is active;False
otherwise.An “active” reactor implies that there are outstanding scheduler deadlines active, possibly open sockets, or possibly outstanding DNS lookup futures. This method would return
True
for this case.An inactive reactor guarantees that there are no oustanding scheduler deadlines, DNS lookups, or open sockets. An inactive reactor will never change state unless a method like
start()
is called to start the reactor.New in version 0.3.5.
Returns: Return type: bool
-
keepalive_period
¶ int – If this period elapses without the client sending a control packet to the server then it will generate a pingreq packet and send it to the server. Will return zero if pingreq requests are not generated.
-
mqtt_state
¶ MqttState – Current state of mqtt protocol handshake.
-
on_connack
(reactor, connack)[source]¶ Called immediately upon receiving a MqttConnack packet from the remote. The reactor.state will be ReactorState.started or ReactorState.stopping if the reactor is shutting down.
Parameters: - reactor (Reactor) –
- connack (
mqtt_codec.packet.MqttConnack
) –
-
on_puback
(reactor, puback)[source]¶ Called immediately upon receiving a MqttPuback packet from the remote. This method is part of the QoS=1 message send path.
Parameters: - reactor (Reactor) –
- puback (
mqtt_codec.packet.MqttPuback
) –
-
on_pubcomp
(reactor, pubcomp)[source]¶ Called immediately upon receiving a MqttPubcomp packet from the remote. This is part of the QoS=2 message send path.
Parameters: - reactor (Reactor) –
- pubcomp (
mqtt_codec.packet.MqttPubcomp
) –
-
on_publish
(reactor, publish)[source]¶ Called immediately upon receiving a MqttSuback packet from the remote. This is part of the QoS=0, 1, and 2 message receive paths.
Parameters: - reactor (Reactor) –
- publish (
mqtt_codec.packet.MqttPublish
) –
-
on_pubrec
(reactor, pubrec)[source]¶ Called immediately upon receiving a MqttPubrec packet from the remote. This is part of the QoS=2 message send path.
Parameters: - reactor (Reactor) –
- pubrec (
mqtt_codec.packet.MqttPubrec
) –
-
on_pubrel
(reactor, pubrel)[source]¶ Called immediately upon receiving a MqttPubrel packet from the remote. This is part of the QoS=2 message receive path.
Parameters: - reactor (Reactor) –
- pubrel (
mqtt_codec.packet.MqttPubrel
) –
-
on_suback
(reactor, suback)[source]¶ Called immediately upon receiving a MqttSuback packet from the remote.
Parameters: - reactor (Reactor) –
- suback (
mqtt_codec.packet.MqttSuback
) –
-
on_unsuback
(reactor, unsuback)[source]¶ Called immediately upon receiving a MqttUnsuback packet from the remote.
Parameters: - reactor (Reactor) –
- unsuback (
mqtt_codec.packet.MqttUnsuback
) –
-
publish
(topic, payload, qos, retain=False)[source]¶ Places a publish packet on the preflight queue. Messages in the preflight queue are fair-queued and launched to the server. The reactor certainly will try to place as many messages in-flight as it is able to. If you want to limit the number of messages in-flight then a queue should be maintained outside of the core reactor.
QoS 0 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. If the reactor encounters an error or stops and is subsequently started then any QoS=0 messages in the preflight queue are discarded. QoS 0 messages are considered delivered as soon as one of their bytes is placed in the socket write buffer regardless of whether the network successfully delivers them to their destination.
QoS 1 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. They are placed in the in-flight queue as soon as the first byte of the packet is placed in the socket write buffer. If the reactor encounters an error or stops and is subsequently started then any QoS=1 messages in the preflight queue maintain their positions. Any messages in the in-flight queue are placed in the front of the preflight queue as
publish
packets with their dupe flags set toTrue
.QoS 2 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. They are placed in the in-flight queue as soon as the first byte of the packet is placed in the socket write buffer. If the reactor encounters an error or stops and is subsequently started then any QoS=2 messages in the preflight queue maintain their positions. Any messages in the in-flight queue awaiting
pubrec
acknowledgements are placed in the front of the preflight queue as publish packets with their dupe flags set toTrue
. Any messages in the in-flight queue awaitingpubcomp
acknowledgements are placed in the front of the preflight queue aspubrel
packets.Parameters: Raises: haka_mqtt.exception.PacketIdReactorException
– Raised when there are no free packet ids to create a MqttPublish packet with.Returns: A publish ticket. The returned object will satisfy ticket.status is MqttPublishStatus.preflight.
Return type: MqttPublishTicket
-
read
()[source]¶ Calls recv on underlying socket exactly once and returns the number of bytes read. If the underlying socket does not return any bytes due to an error or exception then zero is returned and the reactor state is set to error.
This method may be called at any time in any state and if self is not prepared for a read at that point then no action will be taken.
The socket.settimeout can be used to perform a blocking read with a timeout on the underlying socket.
Returns: number of bytes read from socket. Return type: int
-
recv_idle_abort_period
¶ int – Connection will be closed if bytes have not been received from remote in this many seconds. Typically this is 1.5x the self.keepalive_period.
-
recv_idle_ping_period
¶ int – 0 <=
self.recv_idle_ping_period
; sends amqtt_codec.packet.MqttPingreq
packet to the server after this many seconds without receiving and bytes on the socket. If zero then ping messages are not sent when receive stream is idle.
-
sock_state
¶ SocketState – Current state of the socket connection.
-
start
()[source]¶ Attempts to connect with remote if in one of the inactive states
ReactorState.init
,ReactorState.stopped
,ReactorState.error
. The method has no effect if already in an active state.
-
state
¶ ReactorState – Current reactor state.
-
subscribe
(topics)[source]¶ Places a
subscribe
packet on the preflight queue. Messages in the preflight queue will be placed in-flight as soon as the socket allows. Multiple messages may be placed in-flight at the same time.If the reactor encounters an error or stops then unacknowledged
subscribe
packets will be dropped whether they are in the preflight or the in-flight queues.Parameters: topics (iterable of MqttTopic) – Raises: haka_mqtt.exception.PacketIdReactorException
– Raised when there are no free packet ids to create a MqttSubscribe packet with.Returns: Return type: MqttSubscribeTicket
-
terminate
()[source]¶ When in an active state immediately shuts down any socket reading and writing, closes the socket, cancels all outstanding scheduler deadlines, puts the reactor into state ReactorState.stopped, then calls self.on_connect_fail (if in a connect/connack state) or alternatively self.on_disconnect if in some other active state. When reactor is not in an inactive state this method has no effect.
-
unsubscribe
(topics)[source]¶ Places an
unsubscribe
packet on the preflight queue. Messages in the preflight queue will be placed in-flight as soon as the socket allows. Multiple messages may be placed in-flight at the same time.If the reactor encounters an error or stops then unacknowledged
unsubscribe
packets will be dropped whether they are in the preflight or the in-flight queues.Parameters: topics (iterable of str) – Raises: haka_mqtt.exception.PacketIdReactorException
– Raised when there are no free packet ids to create a MqttUnsubscribe packet with.Returns: Return type: MqttUnsubscribeTicket
-
want_read
()[source]¶ True if the reactor is ready to process incoming socket data; False otherwise.
Returns: Return type: bool
-
want_write
()[source]¶ True if the reactor is ready write data to the socket; False otherwise.
Returns: Return type: bool
-
will
¶ mqtt_codec.packet.MqttWill
orNone
– Last will and testament.
-
write
()[source]¶ If there is any data queued to be written to the underlying socket then a single call to socket send will be made to try and flush it to the socket write buffer.
This method may be called at any time in any state and if self is not prepared for a write at that point then no action will be taken.
The socket.settimeout can be used to perform a blocking write with a timeout on the underlying socket.
-
class
haka_mqtt.reactor.
ReactorProperties
[source]¶ Bases:
object
-
socket_factory
¶ haka_mqtt.socket_factory.SocketFactory
-
name_resolver
¶ callable – DNS resolver.
-
scheduler
¶ TODO
-
selector
¶ Selector
-
client_id
¶ str
-
endpoint
¶ tuple – 2-tuple of (host: str, port: int). The port value is constrainted such that 0 <= port <= 2**16-1.
-
keepalive_period
¶ int – 0 <= keepalive_period <= 2*16-1; zero disables keepalive. Sends a
mqtt_codec.packet.MqttPingreq
packet to the server after this many seconds without sending and data over the socket. The server will disconnect the client as if there has been a network error after 1.5x``self.keepalive_period`` seconds without receiving any bytes [MQTT-3.1.2-24].
-
recv_idle_ping_period
¶ int – 0 < recv_idle_ping_period; sends a
mqtt_codec.packet.MqttPingreq
packet to the server after this many seconds without receiving and bytes on the socket.
-
recv_idle_abort_period
¶ int – 0 < recv_idle_abort_period; aborts connection after this time without receiving any bytes from remote (typically set to 1.5x
self.recv_idle_ping_period
).
-
clean_session
¶ bool – With clean session set to True reactor will clear all message buffers on disconnect without regard to QoS; otherwise unacknowledged messages will be retransmitted after a re-connect.
-
address_family
¶ int – Address family; one of the socket.AF_* constants (eg.
socket.AF_UNSPEC
for any family,socket.AF_INET
for IP4socket.AF_INET6
for IP6). Set tosocket.AF_UNSPEC
by default.
-
username
¶ str optional
-
password
¶ str optional
-
-
class
haka_mqtt.reactor.
ReactorState
[source]¶ Bases:
enum.IntEnum
Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met.
Active States:
Inactive States:
ReactorState.connecting
ReactorState.handshake
ReactorState.connack
ReactorState.connected
-
error
= 5¶
-
init
= 0¶
-
started
= 2¶
-
starting
= 1¶
-
stopped
= 4¶
-
stopping
= 3¶
-
class
haka_mqtt.reactor.
RecvTimeoutReactorError
[source]¶ Bases:
haka_mqtt.reactor.ReactorError
Server fails to respond in a timely fashion.
-
class
haka_mqtt.reactor.
SocketReactorError
(errno_val)[source]¶ Bases:
haka_mqtt.reactor.ReactorError
A socket.error exception was raised by the socket subsystem and it the error code was self.errno. If this errno is in the errno.errorcode lookup table then repr will show the description.
Parameters: errno_val (int) – -
errno
¶ int – value in errno.errorcode.
-
-
class
haka_mqtt.reactor.
SocketState
[source]¶ Bases:
enum.IntEnum
Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met.
Active States:
SocketState.name_resolution
SocketState.connecting
SocketState.handshake
SocketState.connected
SocketState.deaf
SocketState.mute
Inactive States:
-
connected
= 5¶
-
connecting
= 2¶
-
deaf
= 6¶
-
handshake
= 3¶
-
mute
= 7¶
-
name_resolution
= 1¶
-
stopped
= 8¶
-
class
haka_mqtt.reactor.
SslReactorError
(ssl_error)[source]¶ Bases:
haka_mqtt.reactor.ReactorError
A socket error-code in errno.errorcode.
Parameters: ssl_error (ssl.SSLError) – -
error
¶ ssl.SSLError – error value.
-
haka_mqtt.frontends.poll module¶
-
class
haka_mqtt.frontends.poll.
BlockingMqttClient
(properties, log='haka')[source]¶ Bases:
haka_mqtt.reactor.Reactor
A client that employs socket.settimeout to use blocking operations on sockets. Although in general the socket timeout is respected by the operating system, this method uses a synchronous DNS lookup and this lookup does not have any timeout.
Parameters: properties (MqttPollClientProperties) –
-
class
haka_mqtt.frontends.poll.
MqttPollClient
(properties, log='haka')[source]¶ Bases:
haka_mqtt.reactor.Reactor
Parameters: properties (MqttPollClientProperties) –
-
class
haka_mqtt.frontends.poll.
MqttPollClientProperties
[source]¶ Bases:
object
-
client_id
¶ str or None – The MQTT client id to pass to the MQTT server. If None then a client-id will be generated with A client id will be randomly generated based on
generate_client_id()
.
-
address_family
¶ int – Address family; one of the socket.AF_* constants (eg.
socket.AF_UNSPEC
for any family,socket.AF_INET
for IP4socket.AF_INET6
for IP6). By default this will besocket.AF_UNSPEC
.
-
host
¶ str – IP address or host name.
-
port
¶ int – Integer such that 0 <= port <= 2**16-1.
-
keepalive_period
¶ str – 0 <= keepalive_period <= 2*16-1; zero disables keepalive. Sends a
MqttPingreq
packet to the server after this many seconds without sending and data over the socket. The server will disconnect the client as if there has been a network error after 1.5x``self.keepalive_period`` seconds without receiving any bytes [MQTT-3.1.2-24].
-
recv_idle_ping_period
¶ int – 0 < recv_idle_ping_period; sends a
MqttPingreq
packet to the server after this many seconds without receiving and bytes on the socket.
-
recv_idle_abort_period
¶ int – 0 < recv_idle_abort_period; aborts connection after this time without receiving any bytes from remote (typically set to 1.5x
self.recv_idle_ping_period
).
-
ssl
¶ bool or SSLContext – When True connects to server using a default SSL socket context created with
ssl.create_default_context()
.If
ssl
has a callablewrap_socket
method then it is assumed thatssl
is a SSLContext to be used for securing sockets.
-
haka_mqtt.scheduler module¶
-
class
haka_mqtt.scheduler.
Deadline
(deadline_entry)[source]¶ Bases:
object
haka_mqtt.selector module¶
-
class
haka_mqtt.selector.
Selector
[source]¶ Bases:
object
-
add_read
(fd, reactor)[source]¶ Parameters: - fd (file descriptor) – File-like object.
- reactor (haka_mqtt.reactor.Reactor) –
-
add_write
(f, reactor)[source]¶ Parameters: - fd (file descriptor) – File-like object.
- reactor (haka_mqtt.reactor.Reactor) –
-
del_read
(fd, reactor)[source]¶ Parameters: - fd (file descriptor) – File-like object.
- reactor (haka_mqtt.reactor.Reactor) –
-
del_write
(fd, reactor)[source]¶ Parameters: - fd (file descriptor) – File-like object.
- reactor (haka_mqtt.reactor.Reactor) –
-