ohmqtt.connection package
Submodules
ohmqtt.connection.address module
- class ohmqtt.connection.address.Address(address: str = '')
Bases:
object- family: AddressFamily
- host: str
- is_default_port() bool
Check if the port is the default for the scheme.
- is_websocket() bool
Check if the address uses WebSocket.
- password: str | None
- path: str
- port: int
- scheme: str
- property use_tls: bool
Check if the address uses TLS.
- username: str | None
- ohmqtt.connection.address.is_ipv6(hostname: str) bool
Check if the hostname is an IPv6 address.
ohmqtt.connection.decoder module
- exception ohmqtt.connection.decoder.ClosedSocketError
Bases:
ExceptionException raised when the socket is closed.
- class ohmqtt.connection.decoder.IncrementalDecoder
Bases:
objectIncremental decoder for MQTT messages coming in from a socket.
- data: bytearray
The remaining data of the packet.
- decode(sock: socket.socket | ssl.SSLSocket) ohmqtt.packet.base.MQTTPacket | None
Decode a single packet straight from the socket.
- Returns:
None if the socket doesn’t have enough data for us to decode a packet.
- Raises:
ClosedSocketError – The socket is closed.
- head: int
The first byte of the packet.
- length: VarintDecodeResult
Variable integer decoding state of the packet length.
- reset() None
Reset the decoder state.
- class ohmqtt.connection.decoder.VarintDecodeResult(value: int, multiplier: int, complete: bool)
Bases:
NamedTupleResult of decoding a variable length integer, in part or whole.
This state can be used to resume decoding if the socket doesn’t have enough data.
- complete: bool
Alias for field number 2
- multiplier: int
Alias for field number 1
- value: int
Alias for field number 0
- exception ohmqtt.connection.decoder.WantReadError
Bases:
ExceptionIndicates that the socket is not ready for reading.
ohmqtt.connection.fsm module
- class ohmqtt.connection.fsm.FSM(env: StateEnvironment, init_state: type[ohmqtt.connection.states.base.FSMState], error_state: type[ohmqtt.connection.states.base.FSMState])
Bases:
objectThreadsafe Finite State Machine.
- change_state(state: type[ohmqtt.connection.states.base.FSMState]) None
Change to a new state.
This method must only be called from within a state.
- cond: Condition
- env: StateEnvironment
- error_state: type[ohmqtt.connection.states.base.FSMState]
- get_state() type[ohmqtt.connection.states.base.FSMState]
Get the current state.
- lock: RLock
- loop_once(max_wait: float | None = 0.0) bool
Do the current state.
State transition will be run if needed.
- Returns:
True if the state is finished and the calling thread should wait for a change.
- loop_until_state(targets: Sequence[type[ohmqtt.connection.states.base.FSMState]], timeout: float | None = None) bool
Run the state machine until a specific state(s) has been entered.
- Returns:
True if a target state is reached, False if another final state was finished or timeout reached.
- params: ConnectParams
- previous_state: type[ohmqtt.connection.states.base.FSMState]
- request_state(state: type[ohmqtt.connection.states.base.FSMState]) None
Request a state change from outside the FSM.
- requested_state: type[ohmqtt.connection.states.base.FSMState]
- selector: InterruptibleSelector
- set_params(params: ConnectParams) None
Set the connection parameters.
- state: type[ohmqtt.connection.states.base.FSMState]
- wait_for_state(states: Sequence[type[ohmqtt.connection.states.base.FSMState]], timeout: float | None = None) bool
Wait for a specific state to be reached.
- Returns:
True if the state is reached, False if the timeout is reached.
- exception ohmqtt.connection.fsm.InvalidStateError
Bases:
ExceptionException raised when an operation is performed in an invalid state.
ohmqtt.connection.handlers module
- class ohmqtt.connection.handlers.MessageHandlers
Bases:
objectContainer for intentional registration of message handlers for MQTT packets.
Handlers may only be registered within the context manager.
The context manager may only be entered once.
- get_handlers(packet_type: type[PacketT]) list[Callable[[PacketT], NoneType]]
Get the handlers for a given packet type.
- Raises:
RuntimeError – Message handlers registration has not been completed.
- handle(packet: ohmqtt.packet.connect.MQTTConnAckPacket | ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubAckPacket | ohmqtt.packet.subscribe.MQTTUnsubAckPacket | ohmqtt.packet.auth.MQTTAuthPacket | ohmqtt.packet.connect.MQTTDisconnectPacket) list[Exception]
Handle a packet by calling the appropriate handlers.
Guarantees that all handlers are called in the order they were registered.
Guarantees that all handlers are run regardless of Exceptions.
- Returns:
A list of Exceptions raised by handlers.
- register(packet_type: type[PacketT], handler: Callable[[PacketT], None]) None
Register a handler for a given packet type.
- Raises:
RuntimeError – Not in the context manager.
ohmqtt.connection.keepalive module
The Keep Alive is a Two Byte Integer (a required field in CONNECT packets) which is a time interval measured in seconds.
It is the maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one MQTT Control Packet and the point it starts sending the next.
It is the responsibility of the Client to ensure that the interval between MQTT Control Packets being sent does not exceed the Keep Alive value.
If Keep Alive is non-zero and in the absence of sending any other MQTT Control Packets, the Client MUST send a PINGREQ packet.
If the Server returns a Server Keep Alive on the CONNACK packet, the Client MUST use that value instead of the value it sent as the Keep Alive.
If the Server does not send the Server Keep Alive, the Server MUST use the Keep Alive value set by the Client on CONNECT.
If the Keep Alive value is non-zero and the Server does not receive an MQTT Control Packet from the Client within one and a half times the Keep Alive time period, it MUST close the Network Connection to the Client as if the network had failed.
If a Client does not receive a PINGRESP packet within a reasonable amount of time after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server.
A Keep Alive value of 0 has the effect of turning off the Keep Alive mechanism. If Keep Alive is 0 the Client is not obliged to send MQTT Control Packets on any particular schedule.
- class ohmqtt.connection.keepalive.KeepAlive
Bases:
objectTracks the keep alive timer for a connection.
- get_next_timeout(max_wait: float | None = None) float | None
Check how long until the next next ping or closure check is due.
max_wait is the maximum time to wait for a timeout. If this is set, the timeout will be the minimum of the calculated timeout and max_wait.
If the keep alive interval is 0, this will return None unless max_wait is set.
- property keepalive_interval: int
Get the keep alive interval in seconds.
- mark_init() None
Initialize the keep alive timer for a new connection.
- mark_ping() None
Mark that a PINGREQ was sent.
This also marks that data has been sent.
- mark_pong() None
Mark that a PINGRESP was received.
This also marks that data has been received.
- mark_send() None
Mark that data has been sent.
- should_close() bool
Check if the keep alive timer has expired.
- should_send_ping() bool
Check if a PINGREQ packet should be sent.
ohmqtt.connection.selector module
- class ohmqtt.connection.selector.InterruptibleSelector(lock: Optional[Union[RLock, allocate_lock]] = None)
Bases:
ProtectedA select() method which can be interrupted by a call to interrupt().
This can be used to interrupt a blocking select() call from another thread.
- change_sock(sock: socket.socket | ssl.SSLSocket) None
Change the socket for the selector.
- close() None
Finalize this instance.
- interrupt() None
Interrupt the select call, if we are in select.
- select(*, read: bool = False, write: bool = False, timeout: float | None = None) tuple[bool, bool]
Select an optional socket with a timeout, allowing for interruption.
This method must be called with the lock already held.
ohmqtt.connection.states module
ohmqtt.connection.timeout module
- class ohmqtt.connection.timeout.Timeout(interval: float | None = None)
Bases:
objectA simple timer class for getting timeouts since the last event.
- exceeded() bool
Check if the timeout has been exceeded.
If the interval is None, always returns False.
- get_timeout(max_wait: float | None = None) float | None
Get the difference between the interval and the last mark.
If the value would be negative, returns 0.
If the interval is None, returns None.
If max_wait is not None, returns the minimum of max_wait and the interval.
- interval
- mark() None
Mark an event.
ohmqtt.connection.types module
- class ohmqtt.connection.types.ConnectParams(address: ~ohmqtt.connection.address.Address = <factory>, client_id: str = '', connect_timeout: float | None = None, reconnect_delay: int = 0, keepalive_interval: int = 0, tcp_nodelay: bool = True, tls_context: ssl.SSLContext | None = None, tls_hostname: str = '', protocol_version: int = 5, clean_start: bool = False, username: str | None = None, password: bytes | None = None, will_topic: str = '', will_payload: bytes = b'', will_qos: int = 0, will_retain: bool = False, will_properties: ~ohmqtt.property.MQTTWillProps = <factory>, connect_properties: ~ohmqtt.property.MQTTConnectProps = <factory>)
Bases:
objectParameters for the MQTT connection.
- clean_start: bool
- client_id: str
- connect_properties: MQTTConnectProps
- connect_timeout: float | None
- keepalive_interval: int
- password: bytes | None
- protocol_version: int
- reconnect_delay: int
- tcp_nodelay: bool
- tls_context: ssl.SSLContext | None
- tls_hostname: str
- username: str | None
- will_payload: bytes
- will_properties: MQTTWillProps
- will_qos: int
- will_retain: bool
- will_topic: str
- class ohmqtt.connection.types.StateData
Bases:
objectState data for the connection.
This should contain any attributes needed by multiple states.
The data in this class should never be accessed from outside the state methods.
- connack: ohmqtt.packet.connect.MQTTConnAckPacket | None
- decoder: IncrementalDecoder
- disconnect_rc: ohmqtt.mqtt_spec.MQTTReasonCode | None
- open_called: bool
- sock: socket.socket | ssl.SSLSocket
- write_buffer: bytearray
- ws_decoder: WebsocketDecoder
- ws_handshake_buffer: bytearray
- ws_nonce: str
- class ohmqtt.connection.types.StateEnvironment(*, packet_callback: Callable[[ohmqtt.packet.connect.MQTTConnAckPacket | ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubAckPacket | ohmqtt.packet.subscribe.MQTTUnsubAckPacket | ohmqtt.packet.auth.MQTTAuthPacket | ohmqtt.packet.connect.MQTTDisconnectPacket], None])
Bases:
objectState environment for the connection.
Data in this class is shared with the outside world.
- packet_buffer: deque[MQTTPacket]
- packet_callback: Callable[[ohmqtt.packet.connect.MQTTConnAckPacket | ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubAckPacket | ohmqtt.packet.subscribe.MQTTUnsubAckPacket | ohmqtt.packet.auth.MQTTAuthPacket | ohmqtt.packet.connect.MQTTDisconnectPacket], None]
Module contents
- class ohmqtt.connection.Connection(handlers: MessageHandlers)
Bases:
objectInterface for the MQTT connection.
- can_send() bool
Check if the connection is in a state where data can be sent.
- Returns:
True if the connection is in a state where data can be sent, False otherwise.
- connect(params: ConnectParams) None
Connect to the MQTT broker.
- disconnect() None
Disconnect from the MQTT broker.
- fsm
- handle_packet(packet: ohmqtt.packet.connect.MQTTConnAckPacket | ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubAckPacket | ohmqtt.packet.subscribe.MQTTUnsubAckPacket | ohmqtt.packet.auth.MQTTAuthPacket | ohmqtt.packet.connect.MQTTDisconnectPacket) None
Handle incoming packets by routing them to registered handlers.
- is_connected() bool
Check if the connection is established.
- loop_forever() None
Run the state machine until the connection is shut down.
- loop_once(max_wait: float | None = 0.0) None
Run a single iteration of the state machine.
If max_wait is None, wait indefinitely. Otherwise, wait for the specified time.
- loop_until_connected(timeout: float | None = None) bool
Run the state machine until the connection is established.
- Returns:
True if the connection is established, False otherwise.
- send(packet: ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubRelPacket | ohmqtt.packet.publish.MQTTPubCompPacket | ohmqtt.packet.subscribe.MQTTSubscribePacket | ohmqtt.packet.subscribe.MQTTUnsubscribePacket | ohmqtt.packet.auth.MQTTAuthPacket) None
Send data to the connection.
- Raises:
InvalidStateError – If the connection is not in a state where data can be sent.
- shutdown() None
Shutdown the connection.
- wait_for_connect(timeout: float | None = None) bool
Wait for the connection to be established.
- Returns:
True if the connection is established, False if the timeout is reached.
- wait_for_disconnect(timeout: float | None = None) bool
Wait for the connection to be closed.
- Returns:
True if the connection is closed, False if the timeout is reached.
- wait_for_shutdown(timeout: float | None = None) bool
Wait for the connection to be closed and finalized.
- Returns:
True if the connection is closed, False if the timeout is reached.