ohmqtt.connection package

Submodules

ohmqtt.connection.address module

class ohmqtt.connection.address.Address(address: str = '')

Bases: object

family: AddressFamily
host: str
is_default_port() bool

Check if the port is the default for the scheme.

is_websocket() bool

Check if the address uses WebSocket.

password: str | None
path: str
port: int
scheme: str
property use_tls: bool

Check if the address uses TLS.

username: str | None
ohmqtt.connection.address.is_ipv6(hostname: str) bool

Check if the hostname is an IPv6 address.

ohmqtt.connection.decoder module

exception ohmqtt.connection.decoder.ClosedSocketError

Bases: Exception

Exception raised when the socket is closed.

class ohmqtt.connection.decoder.IncrementalDecoder

Bases: object

Incremental decoder for MQTT messages coming in from a socket.

data: bytearray

The remaining data of the packet.

decode(sock: socket.socket | ssl.SSLSocket) ohmqtt.packet.base.MQTTPacket | None

Decode a single packet straight from the socket.

Returns:

None if the socket doesn’t have enough data for us to decode a packet.

Raises:

ClosedSocketError – The socket is closed.

head: int

The first byte of the packet.

length: VarintDecodeResult

Variable integer decoding state of the packet length.

reset() None

Reset the decoder state.

class ohmqtt.connection.decoder.VarintDecodeResult(value: int, multiplier: int, complete: bool)

Bases: NamedTuple

Result of decoding a variable length integer, in part or whole.

This state can be used to resume decoding if the socket doesn’t have enough data.

complete: bool

Alias for field number 2

multiplier: int

Alias for field number 1

value: int

Alias for field number 0

exception ohmqtt.connection.decoder.WantReadError

Bases: Exception

Indicates that the socket is not ready for reading.

ohmqtt.connection.fsm module

class ohmqtt.connection.fsm.FSM(env: StateEnvironment, init_state: type[ohmqtt.connection.states.base.FSMState], error_state: type[ohmqtt.connection.states.base.FSMState])

Bases: object

Threadsafe Finite State Machine.

change_state(state: type[ohmqtt.connection.states.base.FSMState]) None

Change to a new state.

This method must only be called from within a state.

cond: Condition
env: StateEnvironment
error_state: type[ohmqtt.connection.states.base.FSMState]
get_state() type[ohmqtt.connection.states.base.FSMState]

Get the current state.

lock: RLock
loop_once(max_wait: float | None = 0.0) bool

Do the current state.

State transition will be run if needed.

Returns:

True if the state is finished and the calling thread should wait for a change.

loop_until_state(targets: Sequence[type[ohmqtt.connection.states.base.FSMState]], timeout: float | None = None) bool

Run the state machine until a specific state(s) has been entered.

Returns:

True if a target state is reached, False if another final state was finished or timeout reached.

params: ConnectParams
previous_state: type[ohmqtt.connection.states.base.FSMState]
request_state(state: type[ohmqtt.connection.states.base.FSMState]) None

Request a state change from outside the FSM.

requested_state: type[ohmqtt.connection.states.base.FSMState]
selector: InterruptibleSelector
set_params(params: ConnectParams) None

Set the connection parameters.

state: type[ohmqtt.connection.states.base.FSMState]
wait_for_state(states: Sequence[type[ohmqtt.connection.states.base.FSMState]], timeout: float | None = None) bool

Wait for a specific state to be reached.

Returns:

True if the state is reached, False if the timeout is reached.

exception ohmqtt.connection.fsm.InvalidStateError

Bases: Exception

Exception raised when an operation is performed in an invalid state.

ohmqtt.connection.handlers module

class ohmqtt.connection.handlers.MessageHandlers

Bases: object

Container for intentional registration of message handlers for MQTT packets.

Handlers may only be registered within the context manager.

The context manager may only be entered once.

get_handlers(packet_type: type[PacketT]) list[Callable[[PacketT], NoneType]]

Get the handlers for a given packet type.

Raises:

RuntimeError – Message handlers registration has not been completed.

handle(packet: ohmqtt.packet.connect.MQTTConnAckPacket | ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubAckPacket | ohmqtt.packet.subscribe.MQTTUnsubAckPacket | ohmqtt.packet.auth.MQTTAuthPacket | ohmqtt.packet.connect.MQTTDisconnectPacket) list[Exception]

Handle a packet by calling the appropriate handlers.

Guarantees that all handlers are called in the order they were registered.

Guarantees that all handlers are run regardless of Exceptions.

Returns:

A list of Exceptions raised by handlers.

register(packet_type: type[PacketT], handler: Callable[[PacketT], None]) None

Register a handler for a given packet type.

Raises:

RuntimeError – Not in the context manager.

ohmqtt.connection.keepalive module

The Keep Alive is a Two Byte Integer (a required field in CONNECT packets) which is a time interval measured in seconds.

It is the maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one MQTT Control Packet and the point it starts sending the next.

It is the responsibility of the Client to ensure that the interval between MQTT Control Packets being sent does not exceed the Keep Alive value.

If Keep Alive is non-zero and in the absence of sending any other MQTT Control Packets, the Client MUST send a PINGREQ packet.

If the Server returns a Server Keep Alive on the CONNACK packet, the Client MUST use that value instead of the value it sent as the Keep Alive.

If the Server does not send the Server Keep Alive, the Server MUST use the Keep Alive value set by the Client on CONNECT.

If the Keep Alive value is non-zero and the Server does not receive an MQTT Control Packet from the Client within one and a half times the Keep Alive time period, it MUST close the Network Connection to the Client as if the network had failed.

If a Client does not receive a PINGRESP packet within a reasonable amount of time after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server.

A Keep Alive value of 0 has the effect of turning off the Keep Alive mechanism. If Keep Alive is 0 the Client is not obliged to send MQTT Control Packets on any particular schedule.

class ohmqtt.connection.keepalive.KeepAlive

Bases: object

Tracks the keep alive timer for a connection.

get_next_timeout(max_wait: float | None = None) float | None

Check how long until the next next ping or closure check is due.

max_wait is the maximum time to wait for a timeout. If this is set, the timeout will be the minimum of the calculated timeout and max_wait.

If the keep alive interval is 0, this will return None unless max_wait is set.

property keepalive_interval: int

Get the keep alive interval in seconds.

mark_init() None

Initialize the keep alive timer for a new connection.

mark_ping() None

Mark that a PINGREQ was sent.

This also marks that data has been sent.

mark_pong() None

Mark that a PINGRESP was received.

This also marks that data has been received.

mark_send() None

Mark that data has been sent.

should_close() bool

Check if the keep alive timer has expired.

should_send_ping() bool

Check if a PINGREQ packet should be sent.

ohmqtt.connection.selector module

class ohmqtt.connection.selector.InterruptibleSelector(lock: Optional[Union[RLock, allocate_lock]] = None)

Bases: Protected

A select() method which can be interrupted by a call to interrupt().

This can be used to interrupt a blocking select() call from another thread.

change_sock(sock: socket.socket | ssl.SSLSocket) None

Change the socket for the selector.

close() None

Finalize this instance.

interrupt() None

Interrupt the select call, if we are in select.

select(*, read: bool = False, write: bool = False, timeout: float | None = None) tuple[bool, bool]

Select an optional socket with a timeout, allowing for interruption.

This method must be called with the lock already held.

ohmqtt.connection.states module

ohmqtt.connection.timeout module

class ohmqtt.connection.timeout.Timeout(interval: float | None = None)

Bases: object

A simple timer class for getting timeouts since the last event.

exceeded() bool

Check if the timeout has been exceeded.

If the interval is None, always returns False.

get_timeout(max_wait: float | None = None) float | None

Get the difference between the interval and the last mark.

If the value would be negative, returns 0.

If the interval is None, returns None.

If max_wait is not None, returns the minimum of max_wait and the interval.

interval
mark() None

Mark an event.

ohmqtt.connection.types module

class ohmqtt.connection.types.ConnectParams(address: ~ohmqtt.connection.address.Address = <factory>, client_id: str = '', connect_timeout: float | None = None, reconnect_delay: int = 0, keepalive_interval: int = 0, tcp_nodelay: bool = True, tls_context: ssl.SSLContext | None = None, tls_hostname: str = '', protocol_version: int = 5, clean_start: bool = False, username: str | None = None, password: bytes | None = None, will_topic: str = '', will_payload: bytes = b'', will_qos: int = 0, will_retain: bool = False, will_properties: ~ohmqtt.property.MQTTWillProps = <factory>, connect_properties: ~ohmqtt.property.MQTTConnectProps = <factory>)

Bases: object

Parameters for the MQTT connection.

address: Address
clean_start: bool
client_id: str
connect_properties: MQTTConnectProps
connect_timeout: float | None
keepalive_interval: int
password: bytes | None
protocol_version: int
reconnect_delay: int
tcp_nodelay: bool
tls_context: ssl.SSLContext | None
tls_hostname: str
username: str | None
will_payload: bytes
will_properties: MQTTWillProps
will_qos: int
will_retain: bool
will_topic: str
class ohmqtt.connection.types.StateData

Bases: object

State data for the connection.

This should contain any attributes needed by multiple states.

The data in this class should never be accessed from outside the state methods.

connack: ohmqtt.packet.connect.MQTTConnAckPacket | None
decoder: IncrementalDecoder
disconnect_rc: ohmqtt.mqtt_spec.MQTTReasonCode | None
keepalive: KeepAlive
open_called: bool
sock: socket.socket | ssl.SSLSocket
timeout: Timeout
write_buffer: bytearray
ws_decoder: WebsocketDecoder
ws_handshake_buffer: bytearray
ws_nonce: str
class ohmqtt.connection.types.StateEnvironment(*, packet_callback: Callable[[ohmqtt.packet.connect.MQTTConnAckPacket | ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubAckPacket | ohmqtt.packet.subscribe.MQTTUnsubAckPacket | ohmqtt.packet.auth.MQTTAuthPacket | ohmqtt.packet.connect.MQTTDisconnectPacket], None])

Bases: object

State environment for the connection.

Data in this class is shared with the outside world.

packet_buffer: deque[MQTTPacket]
packet_callback: Callable[[ohmqtt.packet.connect.MQTTConnAckPacket | ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubAckPacket | ohmqtt.packet.subscribe.MQTTUnsubAckPacket | ohmqtt.packet.auth.MQTTAuthPacket | ohmqtt.packet.connect.MQTTDisconnectPacket], None]

Module contents

class ohmqtt.connection.Connection(handlers: MessageHandlers)

Bases: object

Interface for the MQTT connection.

can_send() bool

Check if the connection is in a state where data can be sent.

Returns:

True if the connection is in a state where data can be sent, False otherwise.

connect(params: ConnectParams) None

Connect to the MQTT broker.

disconnect() None

Disconnect from the MQTT broker.

fsm
handle_packet(packet: ohmqtt.packet.connect.MQTTConnAckPacket | ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubAckPacket | ohmqtt.packet.subscribe.MQTTUnsubAckPacket | ohmqtt.packet.auth.MQTTAuthPacket | ohmqtt.packet.connect.MQTTDisconnectPacket) None

Handle incoming packets by routing them to registered handlers.

is_connected() bool

Check if the connection is established.

loop_forever() None

Run the state machine until the connection is shut down.

loop_once(max_wait: float | None = 0.0) None

Run a single iteration of the state machine.

If max_wait is None, wait indefinitely. Otherwise, wait for the specified time.

loop_until_connected(timeout: float | None = None) bool

Run the state machine until the connection is established.

Returns:

True if the connection is established, False otherwise.

send(packet: ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubscribePacket | ohmqtt.packet.subscribe.MQTTUnsubscribePacket | ohmqtt.packet.auth.MQTTAuthPacket) None

Send data to the connection.

Raises:

InvalidStateError – If the connection is not in a state where data can be sent.

shutdown() None

Shutdown the connection.

wait_for_connect(timeout: float | None = None) bool

Wait for the connection to be established.

Returns:

True if the connection is established, False if the timeout is reached.

wait_for_disconnect(timeout: float | None = None) bool

Wait for the connection to be closed.

Returns:

True if the connection is closed, False if the timeout is reached.

wait_for_shutdown(timeout: float | None = None) bool

Wait for the connection to be closed and finalized.

Returns:

True if the connection is closed, False if the timeout is reached.