Source code for examples.frontend_poll

"""An MQTT client that demonstrates a select-based polling interface.

A loopback client that subscribes to a topic that it publishes to.
Every time a message is published it should be echoed back to the
client by the remote MQTT broker.


from __future__ import print_function
import logging
import socket
import sys
from Queue import Empty, Queue
from argparse import ArgumentParser, ArgumentTypeError

# 3rd Party Imports
from mqtt_codec.packet import (
from haka_mqtt.frontends.event_queue import MqttEventEnqueue
from haka_mqtt.frontends.poll import MqttPollClientProperties, MqttPollClient

[docs]class UnexpectedMqttEventError(Exception): def __init__(self, e): """Raised when an unexpected MQTT event occurs. Parameters ---------- e: mqtt_codec.packet.MqttPacketBody or haka_mqtt.frontends.event_queue.MqttConnectionEvent """ self.event = e def __str__(self): return repr(self) def __repr__(self): return '{}({})'.format(self.__class__.__name__, repr(self.event))
[docs]class ExampleMqttClient(MqttEventEnqueue, MqttPollClient): """A helper class for polling mqtt events. It is critical that the order of inheritance is correct for this class to work correctly. The :class:`MqttEventEnqueue` class *must* appear before :class:`MqttPollClient` so that its methods are called first. """ def __init__(self, endpoint): properties = MqttPollClientProperties() properties.keepalive_period = 100 properties.ssl = False, properties.port = endpoint properties.address_family = socket.AF_UNSPEC self.__q = Queue() MqttPollClient.__init__(self, properties) MqttEventEnqueue.__init__(self, self.__q)
[docs] def poll_until_event(self, timeout=None): """Polls connection until an event occurs then returns it. If timeout passes without an event occurring returns None. Parameters ---------- timeout: float or None Maximum amount of time to wait for an event. If None then waits forever. Must satisfy condition ``timeout >= 0``. Returns ------- mqtt_codec.packet.MqttPacketBody or MqttConnectionEvent or None None is returned if no event occurs. """ if timeout is None: poll_end_time = None elif timeout >= 0: poll_end_time = self._clock.time() + timeout else: assert timeout < 0 raise ValueError(timeout) e = None while poll_end_time is None or self._clock.time() < poll_end_time: try: reactor, event = self.__q.get_nowait() e = event break except Empty: pass select_timeout = self._scheduler.remaining() if poll_end_time is not None: if select_timeout is None or self._clock.time() + select_timeout > poll_end_time: select_timeout = poll_end_time - self._clock.time() if select_timeout < 0.: select_timeout = 0 self._scheduler.poll() return e
[docs] def expect_event(self, predicate, timeout=None): """Waits any event to occur and returns it if ``predicate(event)`` returns ``True``; otherwise raises an exception. If `timeout` expires before any event is received then returns `None`. Parameters ---------- predicate: callable A callable that will be passed a single argument that will be of type :class:`mqtt_codec.packet.MqttPacketBody` or :class:`haka_mqtt.frontends.event_queue.MqttConnectionEvent`. timeout: float or None Maximum amount of time to wait for an event. If None then waits forever. Raises ------ UnexpectedMqttEventError The first even that occurs ``predicate(event)`` returns False. Returns ------- mqtt_codec.packet.MqttPacketBody or haka_mqtt.frontends.event_queue.MqttConnectionEvent or None Returns an event matching predicate(e) or None if no such event occurred before timeout. """ while True: e = self.poll_until_event(timeout=timeout) if e is None or predicate(e): return e else: raise UnexpectedMqttEventError(e)
[docs]def argparse_endpoint(s): """Splits an incoming string into host and port components. >>> argparse_endpoint('localhost:1883') ('localhost', 1883) Parameters ---------- s: str Raises ------- ArgumentTypeError Raised when port number is out of range 1 <= port <= 65535, when port is not an integer, or when there is more than one colon in the string. Returns ------- (str, int) hostname, port tuple. """ words = s.split(':') if len(words) != 2: raise ArgumentTypeError('Format of endpoint must be hostname:port.') host, port = words try: port = int(port) if not 1 <= port <= 2**16-1: raise ArgumentTypeError('Port must be in the range 1 <= port <= 65535.') except ValueError: raise ArgumentTypeError('Format of endpoint must be hostname:port.') return host, port
[docs]def create_parser(): """Creates a command-line argument parser used by the program main method. Returns ------- ArgumentParser """ parser = ArgumentParser() parser.add_argument("endpoint", type=argparse_endpoint) return parser
[docs]def run(client): """ Raises ------- UnexpectedMqttEventError Parameters ---------- client: ExampleMqttClient """ client.start() topic = client.client_id client.expect_event(lambda e: e == MqttConnack(False, ConnackResult.accepted)) subscribe = client.subscribe([MqttTopic(topic, 1)]) client.expect_event(lambda e: e == MqttSuback(subscribe.packet_id, [SubscribeResult.qos1])) UTF8 = 'utf-8' seq_num = 0 while True: payload = str(seq_num).encode(UTF8) publish = client.publish(topic, payload, 1) e = client.poll_until_event() # It isn't clear whether the publish or puback should return # first; need to be prepared for either case. # if isinstance(e, MqttPublish) and e.payload == payload: client.expect_event(lambda e: e == MqttPuback(publish.packet_id)) elif e == MqttPuback(publish.packet_id): client.expect_event(lambda e: isinstance(e, MqttPublish) and e.payload == payload) else: raise UnexpectedMqttEventError(e) seq_num += 1 client.poll(1)
[docs]def main(args=sys.argv[1:]): """Parses arguments and passes them to :func:`run` method. Returns one when an error occurs. Returns -------- int """ logging.basicConfig(format='%(asctime)-15s %(name)s %(levelname)s %(message)s', level=logging.DEBUG, stream=sys.stdout) parser = create_parser() ns = parser.parse_args(args) try: run(ExampleMqttClient(ns.endpoint)) except Exception as e: logging.critical('Panicked because of exception.', exc_info=True) return 1
if __name__ == '__main__': sys.exit(main(sys.argv[1:]))