API Reference

haka_mqtt.clock module

class haka_mqtt.clock.SettableClock[source]

Bases: object

add_time(duration)[source]
set_time(t)[source]
time()[source]
class haka_mqtt.clock.SystemClock[source]

Bases: object

time()[source]

haka_mqtt.cycle_iter module

class haka_mqtt.cycle_iter.IntegralCycleIter(start, end)[source]

Bases: object

Parameters:
  • start (int) –
  • end (int) –
next()[source]

Returns the next iterator in the sequence.

Returns:
Return type:int

haka_mqtt.dns_async module

class haka_mqtt.dns_async.AsyncFutureDnsResolver(thread_pool_size=1)[source]

Bases: object

An executor that spawns a small thread pool for performing DNS lookups. DNS lookup task sare submitted by using calling this object as a function and those tasks will be completed asynchronously by threads in a thread pool. The completed tasks are posted back to an internal queue and the poll() method gets the completed tasks from the queue and notifies their subscribers of completion. Tasks with done callback methods will be called by poll on the same threat that poll is called on.

>>> resolver = AsyncFutureDnsResolver()
>>>
>>> lookup_result = None
>>>
>>> def lookup_finished(future):
...   global lookup_result
...   lookup_result = future.result()
...
>>> future = resolver('localhost', 80)
>>> future.add_done_callback(lookup_finished)
>>>
>>> while not future.done():
...   # Calling poll services callbacks and sets future to done.
...   resolver.poll()
...   sleep(0.1)
...
>>> assert future.done()
>>>
>>> # Only timeout=0 presently supported.
>>> assert not future.exception(timeout=0)
>>> assert not future.cancelled()
>>> # future.result(0) contains outcome of asynchronous call
>>> # to socket.getaddrinfo.  Note that at this time only timeout=0
>>> # is supported by this limited api.
_images/aafig-e43c0c1acb82fae9fe6f34465288a9a93f3f6d0e.svg
close()[source]

Closes resolver by completing all tasks in queue and joining with worker threads. New dns resolutions cannot be scheduled after this method begins executing (calling the resolver will result in an assertion failure).

closed()[source]

bool: True if the object has been closed; False otherwise.

poll()[source]

Calls done callbacks of any newly completed futures.

read_fd()[source]

int: fileno

haka_mqtt.dns_sync module

class haka_mqtt.dns_sync.SynchronousFuture(result=None, exception=None)[source]

Bases: object

add_done_callback(fn)[source]

Attaches the callable fn to the future. fn will be called, with the future as its only argument, when the future is cancelled or finishes running.

Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them. If the callable raises an Exception subclass, it will be logged and ignored. If the callable raises a BaseException subclass, the behavior is undefined.

If the future has already completed or been cancelled, fn will be called immediately.

cancel()[source]

Always returns False since this future is finished the instant it is created.

Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Returns:
Return type:bool
cancelled()[source]

Always returns False since this future is finished the instant it is created.

Return True if the call was successfully cancelled.

Returns:
Return type:bool
done()[source]

Always returns true since this future is finished the instant it is created.

Return True if the call was successfully cancelled or finished running.

Returns:
Return type:bool
exception(timeout=None)[source]

Immediately returns the exception raised by the call or None if the call completed without raising.

Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

If the future is cancelled before completing then CancelledError will be raised.

If the call completed without raising, None is returned.

result(timeout=None)[source]

Immediately returns the call resultor None if the call raised an exception.

Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

If the future is cancelled before completing then CancelledError will be raised.

If the call raised, this method will raise the same exception.

class haka_mqtt.dns_sync.SynchronousFutureDnsResolver[source]

Bases: object

haka_mqtt.exception module

exception haka_mqtt.exception.PacketIdReactorException[source]

Bases: haka_mqtt.exception.ReactorException

exception haka_mqtt.exception.ReactorException[source]

Bases: exceptions.Exception

haka_mqtt.on_str module

class haka_mqtt.on_str.HexOnStr(buf)[source]

Bases: object

class haka_mqtt.on_str.ReprOnStr(o)[source]

Bases: object

haka_mqtt.reactor module

The reactor module provides an MQTT reactor class suitable for use with select and select-like interfaces like epoll. An adapter is available to make it conveniently usable in a poll environment (haka_mqtt.frontends.poll).

class haka_mqtt.reactor.AddressReactorError(gaierror)[source]

Bases: haka_mqtt.reactor.ReactorError

Failed to lookup a valid address.

Parameters:gaierror (socket.gaierror) –
gaierror

socket.gaierror – Addressing error.

class haka_mqtt.reactor.ConnectReactorError(result)[source]

Bases: haka_mqtt.reactor.ReactorError

Error that occurs when the server sends a connack fail in response to an initial connect packet.

Parameters:result (ConnackResult) – Asserted not to be ConnackResult.accepted.
result

ConnackResult – guaranteed that value is not ConnackResult.accepted.

class haka_mqtt.reactor.DecodeReactorError(description)[source]

Bases: haka_mqtt.reactor.ReactorError

Server wrote a sequence of bytes that could not be interpreted as an MQTT packet.

class haka_mqtt.reactor.MqttState[source]

Bases: enum.IntEnum

Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met.

Active States:

Inactive States:

connack = 0
connected = 1
mute = 2
stopped = 3
class haka_mqtt.reactor.MutePeerReactorError[source]

Bases: haka_mqtt.reactor.ReactorError

Error that occurs when the server closes its write stream unexpectedly.

class haka_mqtt.reactor.ProtocolReactorError(description)[source]

Bases: haka_mqtt.reactor.ReactorError

Server send an inappropriate MQTT packet to the client.

class haka_mqtt.reactor.Reactor(properties, log='haka')[source]

Bases: object

Parameters:
  • properties (ReactorProperties) –
  • log (str or logging.Logger or None) – If str then the result of logging.getLogger(log) is used as a logger; otherwise assumes that is a logging.Logger-like object and asserts that it has debug, info, warning, error, and critical methods. If log is None then logging is disabled.
clean_session

bool – Clean session flag is true/false.

client_id

str – Client id.

error

ReactorError or None – When self.state is ReactorState.error returns a subclass of ReactorError otherwise returns None.

in_flight_packets()[source]
is_active()[source]

True when reactor is active; False otherwise.

An “active” reactor implies that there are outstanding scheduler deadlines active, possibly open sockets, or possibly outstanding DNS lookup futures. This method would return True for this case.

An inactive reactor guarantees that there are no oustanding scheduler deadlines, DNS lookups, or open sockets. An inactive reactor will never change state unless a method like start() is called to start the reactor.

New in version 0.3.5.

Returns:
Return type:bool
keepalive_period

int – If this period elapses without the client sending a control packet to the server then it will generate a pingreq packet and send it to the server. Will return zero if pingreq requests are not generated.

mqtt_state

MqttState – Current state of mqtt protocol handshake.

on_connack(reactor, connack)[source]

Called immediately upon receiving a MqttConnack packet from the remote. The reactor.state will be ReactorState.started or ReactorState.stopping if the reactor is shutting down.

Parameters:
on_connect_fail(reactor)[source]
Parameters:reactor (Reactor) –
on_disconnect(reactor)[source]
Parameters:reactor (Reactor) –
on_puback(reactor, puback)[source]

Called immediately upon receiving a MqttPuback packet from the remote. This method is part of the QoS=1 message send path.

Parameters:
on_pubcomp(reactor, pubcomp)[source]

Called immediately upon receiving a MqttPubcomp packet from the remote. This is part of the QoS=2 message send path.

Parameters:
on_publish(reactor, publish)[source]

Called immediately upon receiving a MqttSuback packet from the remote. This is part of the QoS=0, 1, and 2 message receive paths.

Parameters:
on_pubrec(reactor, pubrec)[source]

Called immediately upon receiving a MqttPubrec packet from the remote. This is part of the QoS=2 message send path.

Parameters:
on_pubrel(reactor, pubrel)[source]

Called immediately upon receiving a MqttPubrel packet from the remote. This is part of the QoS=2 message receive path.

Parameters:
on_suback(reactor, suback)[source]

Called immediately upon receiving a MqttSuback packet from the remote.

Parameters:
on_unsuback(reactor, unsuback)[source]

Called immediately upon receiving a MqttUnsuback packet from the remote.

Parameters:
preflight_packets()[source]
publish(topic, payload, qos, retain=False)[source]

Places a publish packet on the preflight queue. Messages in the preflight queue are fair-queued and launched to the server. The reactor certainly will try to place as many messages in-flight as it is able to. If you want to limit the number of messages in-flight then a queue should be maintained outside of the core reactor.

QoS 0 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. If the reactor encounters an error or stops and is subsequently started then any QoS=0 messages in the preflight queue are discarded. QoS 0 messages are considered delivered as soon as one of their bytes is placed in the socket write buffer regardless of whether the network successfully delivers them to their destination.

QoS 1 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. They are placed in the in-flight queue as soon as the first byte of the packet is placed in the socket write buffer. If the reactor encounters an error or stops and is subsequently started then any QoS=1 messages in the preflight queue maintain their positions. Any messages in the in-flight queue are placed in the front of the preflight queue as publish packets with their dupe flags set to True.

QoS 2 messages are placed in the pre-flight buffer and are eligable for delivery as fast as the socket allows. They are placed in the in-flight queue as soon as the first byte of the packet is placed in the socket write buffer. If the reactor encounters an error or stops and is subsequently started then any QoS=2 messages in the preflight queue maintain their positions. Any messages in the in-flight queue awaiting pubrec acknowledgements are placed in the front of the preflight queue as publish packets with their dupe flags set to True. Any messages in the in-flight queue awaiting pubcomp acknowledgements are placed in the front of the preflight queue as pubrel packets.

Parameters:
  • topic (str) –
  • payload (bytes) –
  • qos (int) – 0 <= qos <= 2
  • retain (bool) –
Raises:

haka_mqtt.exception.PacketIdReactorException – Raised when there are no free packet ids to create a MqttPublish packet with.

Returns:

A publish ticket. The returned object will satisfy ticket.status is MqttPublishStatus.preflight.

Return type:

MqttPublishTicket

read()[source]

Calls recv on underlying socket exactly once and returns the number of bytes read. If the underlying socket does not return any bytes due to an error or exception then zero is returned and the reactor state is set to error.

This method may be called at any time in any state and if self is not prepared for a read at that point then no action will be taken.

The socket.settimeout can be used to perform a blocking read with a timeout on the underlying socket.

Returns:number of bytes read from socket.
Return type:int
recv_idle_abort_period

int – Connection will be closed if bytes have not been received from remote in this many seconds. Typically this is 1.5x the self.keepalive_period.

recv_idle_ping_period

int – 0 <= self.recv_idle_ping_period; sends a mqtt_codec.packet.MqttPingreq packet to the server after this many seconds without receiving and bytes on the socket. If zero then ping messages are not sent when receive stream is idle.

send_packet_ids()[source]
Returns:A set of active send-path packet ids.
Return type:set[int]
sock_state

SocketState – Current state of the socket connection.

start()[source]

Attempts to connect with remote if in one of the inactive states ReactorState.init, ReactorState.stopped, ReactorState.error. The method has no effect if already in an active state.

state

ReactorState – Current reactor state.

stop()[source]
subscribe(topics)[source]

Places a subscribe packet on the preflight queue. Messages in the preflight queue will be placed in-flight as soon as the socket allows. Multiple messages may be placed in-flight at the same time.

If the reactor encounters an error or stops then unacknowledged subscribe packets will be dropped whether they are in the preflight or the in-flight queues.

Parameters:topics (iterable of MqttTopic) –
Raises:haka_mqtt.exception.PacketIdReactorException – Raised when there are no free packet ids to create a MqttSubscribe packet with.
Returns:
Return type:MqttSubscribeTicket
terminate()[source]

When in an active state immediately shuts down any socket reading and writing, closes the socket, cancels all outstanding scheduler deadlines, puts the reactor into state ReactorState.stopped, then calls self.on_connect_fail (if in a connect/connack state) or alternatively self.on_disconnect if in some other active state. When reactor is not in an inactive state this method has no effect.

unsubscribe(topics)[source]

Places an unsubscribe packet on the preflight queue. Messages in the preflight queue will be placed in-flight as soon as the socket allows. Multiple messages may be placed in-flight at the same time.

If the reactor encounters an error or stops then unacknowledged unsubscribe packets will be dropped whether they are in the preflight or the in-flight queues.

Parameters:topics (iterable of str) –
Raises:haka_mqtt.exception.PacketIdReactorException – Raised when there are no free packet ids to create a MqttUnsubscribe packet with.
Returns:
Return type:MqttUnsubscribeTicket
want_read()[source]

True if the reactor is ready to process incoming socket data; False otherwise.

Returns:
Return type:bool
want_write()[source]

True if the reactor is ready write data to the socket; False otherwise.

Returns:
Return type:bool
will

mqtt_codec.packet.MqttWill or None – Last will and testament.

write()[source]

If there is any data queued to be written to the underlying socket then a single call to socket send will be made to try and flush it to the socket write buffer.

This method may be called at any time in any state and if self is not prepared for a write at that point then no action will be taken.

The socket.settimeout can be used to perform a blocking write with a timeout on the underlying socket.

class haka_mqtt.reactor.ReactorError[source]

Bases: object

class haka_mqtt.reactor.ReactorProperties[source]

Bases: object

socket_factory

haka_mqtt.socket_factory.SocketFactory

name_resolver

callable – DNS resolver.

scheduler

TODO

selector

Selector

client_id

str

endpoint

tuple – 2-tuple of (host: str, port: int). The port value is constrainted such that 0 <= port <= 2**16-1.

keepalive_period

int – 0 <= keepalive_period <= 2*16-1; zero disables keepalive. Sends a mqtt_codec.packet.MqttPingreq packet to the server after this many seconds without sending and data over the socket. The server will disconnect the client as if there has been a network error after 1.5x``self.keepalive_period`` seconds without receiving any bytes [MQTT-3.1.2-24].

recv_idle_ping_period

int – 0 < recv_idle_ping_period; sends a mqtt_codec.packet.MqttPingreq packet to the server after this many seconds without receiving and bytes on the socket.

recv_idle_abort_period

int – 0 < recv_idle_abort_period; aborts connection after this time without receiving any bytes from remote (typically set to 1.5x self.recv_idle_ping_period).

clean_session

bool – With clean session set to True reactor will clear all message buffers on disconnect without regard to QoS; otherwise unacknowledged messages will be retransmitted after a re-connect.

address_family

int – Address family; one of the socket.AF_* constants (eg. socket.AF_UNSPEC for any family, socket.AF_INET for IP4 socket.AF_INET6 for IP6). Set to socket.AF_UNSPEC by default.

username

str optional

password

str optional

class haka_mqtt.reactor.ReactorState[source]

Bases: enum.IntEnum

Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met.

Active States:

Inactive States:

  • ReactorState.connecting
  • ReactorState.handshake
  • ReactorState.connack
  • ReactorState.connected
error = 5
init = 0
started = 2
starting = 1
stopped = 4
stopping = 3
class haka_mqtt.reactor.RecvTimeoutReactorError[source]

Bases: haka_mqtt.reactor.ReactorError

Server fails to respond in a timely fashion.

class haka_mqtt.reactor.SocketReactorError(errno_val)[source]

Bases: haka_mqtt.reactor.ReactorError

A socket.error exception was raised by the socket subsystem and it the error code was self.errno. If this errno is in the errno.errorcode lookup table then repr will show the description.

Parameters:errno_val (int) –
errno

int – value in errno.errorcode.

class haka_mqtt.reactor.SocketState[source]

Bases: enum.IntEnum

Inactive states are those where there are no active deadlines, the socket is closed and there is no active I/O. Active states are those where any of these characteristics is not met.

Active States:

Inactive States:

connected = 5
connecting = 2
deaf = 6
handshake = 3
mute = 7
name_resolution = 1
stopped = 8
class haka_mqtt.reactor.SslReactorError(ssl_error)[source]

Bases: haka_mqtt.reactor.ReactorError

A socket error-code in errno.errorcode.

Parameters:ssl_error (ssl.SSLError) –
error

ssl.SSLError – error value.

haka_mqtt.frontends.poll module

class haka_mqtt.frontends.poll.BlockingMqttClient(properties, log='haka')[source]

Bases: haka_mqtt.reactor.Reactor

A client that employs socket.settimeout to use blocking operations on sockets. Although in general the socket timeout is respected by the operating system, this method uses a synchronous DNS lookup and this lookup does not have any timeout.

Parameters:properties (MqttPollClientProperties) –
poll(period=0.0)[source]
class haka_mqtt.frontends.poll.MqttPollClient(properties, log='haka')[source]

Bases: haka_mqtt.reactor.Reactor

Parameters:properties (MqttPollClientProperties) –
poll(period=0.0)[source]
class haka_mqtt.frontends.poll.MqttPollClientProperties[source]

Bases: object

client_id

str or None – The MQTT client id to pass to the MQTT server. If None then a client-id will be generated with A client id will be randomly generated based on generate_client_id().

address_family

int – Address family; one of the socket.AF_* constants (eg. socket.AF_UNSPEC for any family, socket.AF_INET for IP4 socket.AF_INET6 for IP6). By default this will be socket.AF_UNSPEC.

host

str – IP address or host name.

port

int – Integer such that 0 <= port <= 2**16-1.

keepalive_period

str – 0 <= keepalive_period <= 2*16-1; zero disables keepalive. Sends a MqttPingreq packet to the server after this many seconds without sending and data over the socket. The server will disconnect the client as if there has been a network error after 1.5x``self.keepalive_period`` seconds without receiving any bytes [MQTT-3.1.2-24].

recv_idle_ping_period

int – 0 < recv_idle_ping_period; sends a MqttPingreq packet to the server after this many seconds without receiving and bytes on the socket.

recv_idle_abort_period

int – 0 < recv_idle_abort_period; aborts connection after this time without receiving any bytes from remote (typically set to 1.5x self.recv_idle_ping_period).

ssl

bool or SSLContext – When True connects to server using a default SSL socket context created with ssl.create_default_context().

If ssl has a callable wrap_socket method then it is assumed that ssl is a SSLContext to be used for securing sockets.

haka_mqtt.frontends.poll.generate_client_id()[source]

Generates a client id based on current time, hostname, and process-id.

Returns:
Return type:str

haka_mqtt.scheduler module

class haka_mqtt.scheduler.ClockScheduler(clock)[source]

Bases: haka_mqtt.scheduler.Scheduler

instant()[source]

Current clock instant.

Returns:Current clock scheduler.
Return type:int
poll()[source]

Calls all callbacks awaiting execution to this point.

class haka_mqtt.scheduler.Deadline(deadline_entry)[source]

Bases: object

cancel()[source]

Stops a scheduled callback from being made; has no effect if cancel is called after the callback has already been made.

expired()[source]

bool: True if callback has already been called; False otherwise.

class haka_mqtt.scheduler.DurationScheduler[source]

Bases: haka_mqtt.scheduler.Scheduler

instant()[source]

Returns the current tick.

Returns:
Return type:int
poll(duration)[source]

Adds duration to self.instant() and calls all scheduled callbacks.

Parameters:duration (int) –
class haka_mqtt.scheduler.Scheduler[source]

Bases: object

add(duration, cb)[source]

Adds duration to self.instant() and calls all scheduled callbacks.

Parameters:
  • duration (int) – Number of ticks passed.
  • cb (callable()) – No calling with so
Returns:

Return type:

Deadline

instant()[source]

Returns the current tick.

Returns:
Return type:int
remaining()[source]

Duration remaining to next scheduled callback.

Returns:
Return type:int or None

haka_mqtt.selector module

class haka_mqtt.selector.Selector[source]

Bases: object

add_read(fd, reactor)[source]
Parameters:
add_write(f, reactor)[source]
Parameters:
del_read(fd, reactor)[source]
Parameters:
del_write(fd, reactor)[source]
Parameters:

haka_mqtt.socket_factory module

class haka_mqtt.socket_factory.BlockingSocketFactory[source]

Bases: object

class haka_mqtt.socket_factory.BlockingSslSocketFactory(context)[source]

Bases: object

class haka_mqtt.socket_factory.SocketFactory[source]

Bases: object

class haka_mqtt.socket_factory.SslSocketFactory(context)[source]

Bases: object