Implementation Interfaces

Subpackages

Submodules

ohmqtt.client module

class ohmqtt.client.Client(db_path: str = '', *, db_fast: bool = False)

Bases: object

High level interface for the MQTT client.

Parameters:
  • db_path – Path to the database file for persistence. If none provided, in-memory store will be used.

  • db_fast – If True, use a faster database implementation (e.g. SQLite WAL).

auth(*, authentication_method: str | None = None, authentication_data: bytes | None = None, reason_string: str | None = None, user_properties: Optional[Sequence[tuple[str, str]]] = None, reason_code: MQTTReasonCode = MQTTReasonCode.Success) None

Send an AUTH packet to the broker.

Parameters:
  • authentication_method – The authentication method to use.

  • authentication_data – Authentication data to send.

  • reason_string – A reason string to include in the AUTH packet.

  • user_properties – Optional user properties to include in the AUTH packet.

  • reason_code – The reason code for the AUTH packet.

connect(address: str, *, client_id: str = '', clean_start: bool = False, connect_timeout: float | None = None, reconnect_delay: int = 0, keepalive_interval: int = 0, tcp_nodelay: bool = True, tls_context: ssl.SSLContext | None = None, tls_hostname: str = '', username: str | None = None, password: bytes | None = None, will_topic: str = '', will_payload: bytes = b'', will_qos: int = 0, will_retain: bool = False, will_properties: ohmqtt.property.MQTTWillProps | None = None, connect_properties: ohmqtt.property.MQTTConnectProps | None = None) None

Connect to the broker.

Parameters:
  • address

    The address of the broker to connect to. Addresses may be in the form: - mqtt://host:port (TCP) - mqtts://host:port (TLS) - ws://host:port (WebSocket) - wss://host:port (WebSocket/TLS) - unix:/path/to/socket (if supported on the platform) - When the protocol is omitted, mqtt:// is assumed. - When the port is omitted, the default port for the protocol is used.

    • mqtt: 1883

    • mqtts: 8883

    • ws: 80

    • wss: 443

  • client_id – The client ID to use for the connection, or empty string to request one from the broker.

  • clean_start – If True, an existing session will not be resumed.

  • connect_timeout – Timeout for the connection attempt in seconds, or None for no timeout.

  • reconnect_delay – Delay in seconds before attempting to reconnect after a disconnection, or 0 to disable reconnect.

  • keepalive_interval – The keep alive interval in seconds, or 0 to disable keep alive.

  • tcp_nodelay – If True, enable TCP_NODELAY to disable Nagle’s algorithm.

  • tls_context – An SSLContext for TLS connections, or None to use the default.

  • tls_hostname – The hostname to use for TLS connections, or empty string to determine from the address.

  • username – The username for MQTT authentication, or None to disable.

  • password – The password for MQTT authentication, or None to disable.

  • will_topic – The topic for the Will message, or empty string to disable.

  • will_payload – The payload for the Will message.

  • will_qos – The QoS level for the Will message (0, 1, or 2).

  • will_retain – If True, the Will message will be retained.

  • will_properties – Properties for the Will message.

  • connect_properties – Properties for the CONNECT packet.

connection
disconnect() None

Disconnect from the broker.

handle_auth(packet: MQTTAuthPacket) None

Callback for an AUTH packet from the broker.

is_connected() bool

Check if the client is connected to the broker.

loop_forever() None

Run the MQTT client loop.

This will run until the client is stopped or shutdown.

loop_once(max_wait: float | None = 0.0) None

Run a single iteration of the MQTT client loop.

If max_wait is 0.0 (the default), this call will not block.

If max_wait is None, this call will block until the next event.

Any other numeric max_wait value may block for maximum that amount of time in seconds.

loop_until_connected(timeout: float | None = None) bool

Run the MQTT client loop until the client is connected to the broker.

If a timeout is provided, the loop will give up after that amount of time.

Returns True if the client is connected, False if the timeout was reached.

publish(topic: str, payload: bytes | bytearray | str, *, qos: int | ohmqtt.mqtt_spec.MQTTQoS = MQTTQoS.Q0, retain: bool = False, properties: ohmqtt.property.MQTTPublishProps | None = None, alias_policy: AliasPolicy = AliasPolicy.NEVER) Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]

Publish a message to a topic.

Parameters:
  • topic – The topic to publish to.

  • payload – The payload of the message. If a string is provided, it will be encoded as UTF-8.

  • qos – The QoS level for the message (0, 1, or 2).

  • retain – If True, the message will be retained by the broker.

  • properties – Properties for the PUBLISH packet.

  • alias_policy – The policy for using automatic topic aliases.

session
shutdown() None

Shutdown the client and close the connection.

start_loop() None

Start the client state machine in a separate thread.

subscribe(topic_filter: str, callback: Callable[[Client, MQTTPublishPacket], None], max_qos: int | ohmqtt.mqtt_spec.MQTTQoS = MQTTQoS.Q2, *, share_name: str | None = None, no_local: bool = False, retain_as_published: bool = False, retain_policy: RetainPolicy = RetainPolicy.ALWAYS, sub_id: int | None = None, user_properties: Optional[Sequence[tuple[str, str]]] = None) Optional[Handle[MQTTSubAckPacket]]

Subscribe to a topic filter with a callback.

If the client is connected, returns a handle which can be used to unsubscribe from the topic filter or wait for the subscription to be acknowledged.

If the client is not connected, returns None.

Parameters:
  • topic_filter – The topic filter to subscribe to.

  • callback – The callback to call when a message is received on the subscribed topic.

  • max_qos – The maximum QoS level for the subscription (0, 1, or 2).

  • share_name – The name of a shared subscription to use.

  • no_local – If True, do not receive messages published by this client.

  • retain_as_published – If True, the retain flag of messages will match the original message.

  • retain_policy – The policy for retained messages.

  • sub_id – An optional subscription ID for the subscription.

  • user_properties – Optional user properties to include in the subscription request.

unsubscribe(topic_filter: str, *, share_name: str | None = None) Optional[Handle[MQTTUnsubAckPacket]]

Unsubscribe from a topic filter.

If the client is connected, returns a handle which can be used to wait for the unsubscription to be acknowledged.

If the client is not connected, returns None.

Parameters:
  • topic_filter – The topic filter to unsubscribe from.

  • share_name – The name of a shared subscription to use.

wait_for_connect(timeout: float | None = None) None

Wait for the client to connect to the broker.

Raises:

TimeoutError – The timeout was exceeded.

wait_for_disconnect(timeout: float | None = None) None

Wait for the client to disconnect from the broker.

Raises:

TimeoutError – The timeout was exceeded.

wait_for_shutdown(timeout: float | None = None) None

Wait for the client to disconnect and finalize.

Raises:

TimeoutError – The timeout was exceeded.

ohmqtt.error module

exception ohmqtt.error.MQTTError(message: str, reason_code: MQTTReasonCode = MQTTReasonCode.UnspecifiedError)

Bases: Exception

ohmqtt.logger module

ohmqtt.logger.get_logger(name: str) Logger

Get a logger with the specified name.

ohmqtt.logger.set_log_level(level: int) None

Set the log level for the ohmqtt logger.

ohmqtt.mqtt_spec module

Constants from the MQTT specification.

enum ohmqtt.mqtt_spec.MQTTPacketType(value)

Bases: IntEnum

Types of MQTT control packets, mapped to identifiers from the specification.

Member Type:

int

Valid values are as follows:

CONNECT = <MQTTPacketType.CONNECT: 1>
CONNACK = <MQTTPacketType.CONNACK: 2>
PUBLISH = <MQTTPacketType.PUBLISH: 3>
PUBACK = <MQTTPacketType.PUBACK: 4>
PUBREC = <MQTTPacketType.PUBREC: 5>
PUBREL = <MQTTPacketType.PUBREL: 6>
PUBCOMP = <MQTTPacketType.PUBCOMP: 7>
SUBSCRIBE = <MQTTPacketType.SUBSCRIBE: 8>
SUBACK = <MQTTPacketType.SUBACK: 9>
UNSUBSCRIBE = <MQTTPacketType.UNSUBSCRIBE: 10>
UNSUBACK = <MQTTPacketType.UNSUBACK: 11>
PINGREQ = <MQTTPacketType.PINGREQ: 12>
PINGRESP = <MQTTPacketType.PINGRESP: 13>
DISCONNECT = <MQTTPacketType.DISCONNECT: 14>
AUTH = <MQTTPacketType.AUTH: 15>
enum ohmqtt.mqtt_spec.MQTTPropertyId(value)

Bases: IntEnum

Types of MQTT properties, mapped to identifiers from the specification.

Member Type:

int

Valid values are as follows:

PayloadFormatIndicator = <MQTTPropertyId.PayloadFormatIndicator: 1>
MessageExpiryInterval = <MQTTPropertyId.MessageExpiryInterval: 2>
ContentType = <MQTTPropertyId.ContentType: 3>
ResponseTopic = <MQTTPropertyId.ResponseTopic: 8>
CorrelationData = <MQTTPropertyId.CorrelationData: 9>
SubscriptionIdentifier = <MQTTPropertyId.SubscriptionIdentifier: 11>
SessionExpiryInterval = <MQTTPropertyId.SessionExpiryInterval: 17>
AssignedClientIdentifier = <MQTTPropertyId.AssignedClientIdentifier: 18>
ServerKeepAlive = <MQTTPropertyId.ServerKeepAlive: 19>
AuthenticationMethod = <MQTTPropertyId.AuthenticationMethod: 21>
AuthenticationData = <MQTTPropertyId.AuthenticationData: 22>
RequestProblemInformation = <MQTTPropertyId.RequestProblemInformation: 23>
WillDelayInterval = <MQTTPropertyId.WillDelayInterval: 24>
RequestResponseInformation = <MQTTPropertyId.RequestResponseInformation: 25>
ResponseInformation = <MQTTPropertyId.ResponseInformation: 26>
ServerReference = <MQTTPropertyId.ServerReference: 28>
ReasonString = <MQTTPropertyId.ReasonString: 31>
ReceiveMaximum = <MQTTPropertyId.ReceiveMaximum: 33>
TopicAliasMaximum = <MQTTPropertyId.TopicAliasMaximum: 34>
TopicAlias = <MQTTPropertyId.TopicAlias: 35>
MaximumQoS = <MQTTPropertyId.MaximumQoS: 36>
RetainAvailable = <MQTTPropertyId.RetainAvailable: 37>
UserProperty = <MQTTPropertyId.UserProperty: 38>
MaximumPacketSize = <MQTTPropertyId.MaximumPacketSize: 39>
WildcardSubscriptionAvailable = <MQTTPropertyId.WildcardSubscriptionAvailable: 40>
SubscriptionIdentifierAvailable = <MQTTPropertyId.SubscriptionIdentifierAvailable: 41>
SharedSubscriptionAvailable = <MQTTPropertyId.SharedSubscriptionAvailable: 42>
enum ohmqtt.mqtt_spec.MQTTQoS(value)

Bases: IntEnum

Quality of Service level for messaging.

Member Type:

int

Valid values are as follows:

Q0 = <MQTTQoS.Q0: 0>
Q1 = <MQTTQoS.Q1: 1>
Q2 = <MQTTQoS.Q2: 2>
enum ohmqtt.mqtt_spec.MQTTReasonCode(value)

Bases: IntEnum

Indicates the result of an operation.

Member Type:

int

Valid values are as follows:

Success = <MQTTReasonCode.Success: 0>
GrantedQoS1 = <MQTTReasonCode.GrantedQoS1: 1>
GrantedQoS2 = <MQTTReasonCode.GrantedQoS2: 2>
DisconnectWithWillMessage = <MQTTReasonCode.DisconnectWithWillMessage: 4>
NoMatchingSubscribers = <MQTTReasonCode.NoMatchingSubscribers: 16>
NoSubscriptionExisted = <MQTTReasonCode.NoSubscriptionExisted: 17>
ContinueAuthentication = <MQTTReasonCode.ContinueAuthentication: 24>
ReAuthenticate = <MQTTReasonCode.ReAuthenticate: 25>
UnspecifiedError = <MQTTReasonCode.UnspecifiedError: 128>
MalformedPacket = <MQTTReasonCode.MalformedPacket: 129>
ProtocolError = <MQTTReasonCode.ProtocolError: 130>
ImplementationSpecificError = <MQTTReasonCode.ImplementationSpecificError: 131>
UnsupportedProtocolVersion = <MQTTReasonCode.UnsupportedProtocolVersion: 132>
ClientIdentifierNotValid = <MQTTReasonCode.ClientIdentifierNotValid: 133>
BadUserNameOrPassword = <MQTTReasonCode.BadUserNameOrPassword: 134>
NotAuthorized = <MQTTReasonCode.NotAuthorized: 135>
ServerUnavailable = <MQTTReasonCode.ServerUnavailable: 136>
ServerBusy = <MQTTReasonCode.ServerBusy: 137>
Banned = <MQTTReasonCode.Banned: 138>
ServerShuttingDown = <MQTTReasonCode.ServerShuttingDown: 139>
BadAuthenticationMethod = <MQTTReasonCode.BadAuthenticationMethod: 140>
KeepAliveTimeout = <MQTTReasonCode.KeepAliveTimeout: 141>
SessionTakenOver = <MQTTReasonCode.SessionTakenOver: 142>
TopicFilterInvalid = <MQTTReasonCode.TopicFilterInvalid: 143>
TopicNameInvalid = <MQTTReasonCode.TopicNameInvalid: 144>
PacketIdentifierInUse = <MQTTReasonCode.PacketIdentifierInUse: 145>
PacketIdentifierNotFound = <MQTTReasonCode.PacketIdentifierNotFound: 146>
ReceiveMaximumExceeded = <MQTTReasonCode.ReceiveMaximumExceeded: 147>
TopicAliasInvalid = <MQTTReasonCode.TopicAliasInvalid: 148>
PacketTooLarge = <MQTTReasonCode.PacketTooLarge: 149>
MessageRateTooHigh = <MQTTReasonCode.MessageRateTooHigh: 150>
QuotaExceeded = <MQTTReasonCode.QuotaExceeded: 151>
AdministrativeAction = <MQTTReasonCode.AdministrativeAction: 152>
PayloadFormatInvalid = <MQTTReasonCode.PayloadFormatInvalid: 153>
RetainNotSupported = <MQTTReasonCode.RetainNotSupported: 154>
QoSNotSupported = <MQTTReasonCode.QoSNotSupported: 155>
UseAnotherServer = <MQTTReasonCode.UseAnotherServer: 156>
ServerMoved = <MQTTReasonCode.ServerMoved: 157>
SharedSubscriptionsNotSupported = <MQTTReasonCode.SharedSubscriptionsNotSupported: 158>
ConnectionRateExceeded = <MQTTReasonCode.ConnectionRateExceeded: 159>
MaximumConnectTime = <MQTTReasonCode.MaximumConnectTime: 160>
SubscriptionIdentifiersNotSupported = <MQTTReasonCode.SubscriptionIdentifiersNotSupported: 161>
WildcardSubscriptionsNotSupported = <MQTTReasonCode.WildcardSubscriptionsNotSupported: 162>

The Enum and its members also have the following methods:

is_error() bool

Check if the reason code indicates an error.

ohmqtt.platform module

exception ohmqtt.platform.PlatformError

Bases: Exception

A feature is not supported on this platform.

ohmqtt.property module

class ohmqtt.property.MQTTAuthProps

Bases: MQTTProperties

Properties for MQTT AUTH packet.

AuthenticationData: bytes | None = None
AuthenticationMethod: str | None = None
ReasonString: str | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTConnAckProps

Bases: MQTTProperties

Properties for MQTT CONNACK packet.

AssignedClientIdentifier: str | None = None
AuthenticationData: bytes | None = None
AuthenticationMethod: str | None = None
MaximumPacketSize: int | None = None
MaximumQoS: int | None = None
ReasonString: str | None = None
ReceiveMaximum: int | None = None
ResponseInformation: str | None = None
RetainAvailable: bool | None = None
ServerKeepAlive: int | None = None
ServerReference: str | None = None
SessionExpiryInterval: int | None = None
SharedSubscriptionAvailable: bool | None = None
SubscriptionIdentifierAvailable: bool | None = None
TopicAliasMaximum: int | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
WildcardSubscriptionAvailable: bool | None = None
class ohmqtt.property.MQTTConnectProps

Bases: MQTTProperties

Properties for MQTT CONNECT packet.

AuthenticationData: bytes | None = None
AuthenticationMethod: str | None = None
MaximumPacketSize: int | None = None
ReceiveMaximum: int | None = None
RequestProblemInformation: bool | None = None
RequestResponseInformation: bool | None = None
SessionExpiryInterval: int | None = None
TopicAliasMaximum: int | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTDisconnectProps

Bases: MQTTProperties

Properties for MQTT DISCONNECT packet.

ReasonString: str | None = None
ServerReference: str | None = None
SessionExpiryInterval: int | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTProperties

Bases: MQTTPropertiesBase

classmethod decode(data: memoryview) tuple[Self, int]

Decode MQTT properties from a buffer.

Returns a tuple of the decoded properties and the number of bytes consumed.

encode() bytes

Encode MQTT properties to a buffer.

class ohmqtt.property.MQTTPropertiesBase

Bases: SimpleNamespace

Represents MQTT packet properties.

class ohmqtt.property.MQTTPubAckProps

Bases: MQTTProperties

Properties for MQTT PUBACK packet.

ReasonString: str | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTPubCompProps

Bases: MQTTProperties

Properties for MQTT PUBCOMP packet.

ReasonString: str | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTPubRecProps

Bases: MQTTProperties

Properties for MQTT PUBREC packet.

ReasonString: str | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTPubRelProps

Bases: MQTTProperties

Properties for MQTT PUBREL packet.

ReasonString: str | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTPublishProps

Bases: MQTTProperties

Properties for MQTT PUBLISH packet.

ContentType: str | None = None
CorrelationData: bytes | None = None
MessageExpiryInterval: int | None = None
PayloadFormatIndicator: int | None = None
ResponseTopic: str | None = None
SubscriptionIdentifier: set[int] | None = None
TopicAlias: int | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTSubAckProps

Bases: MQTTProperties

Properties for MQTT SUBACK packet.

ReasonString: str | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTSubscribeProps

Bases: MQTTProperties

Properties for MQTT SUBSCRIBE packet.

SubscriptionIdentifier: set[int] | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTUnsubAckProps

Bases: MQTTProperties

Properties for MQTT UNSUBACK packet.

ReasonString: str | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTUnsubscribeProps

Bases: MQTTProperties

Properties for MQTT UNSUBSCRIBE packet.

UserProperty: Optional[Sequence[tuple[str, str]]] = None
class ohmqtt.property.MQTTWillProps

Bases: MQTTProperties

Properties for MQTT Will message.

ContentType: str | None = None
CorrelationData: bytes | None = None
MessageExpiryInterval: int | None = None
PayloadFormatIndicator: int | None = None
ResponseTopic: str | None = None
UserProperty: Optional[Sequence[tuple[str, str]]] = None
WillDelayInterval: int | None = None

ohmqtt.protected module

class ohmqtt.protected.Protected(lock: Optional[Union[RLock, allocate_lock]] = None)

Bases: object

A wrapper to protect a resource or resources.

Combine with the @protect decorator to protect methods of this class.

acquire
release
ohmqtt.protected.protect(func: Callable[[Concatenate[ProtectedT, ProtectP]], ProtectR]) Callable[[Concatenate[ProtectedT, ProtectP]], ProtectR]

Decorator to protect a method of a Protected instance.

ohmqtt.serialization module

Primitives for encoding and decoding MQTT packet fields.

ohmqtt.serialization.decode_binary(data: memoryview) tuple[bytes, int]

Decode binary data from a buffer.

Returns a tuple of the decoded data and the number of bytes consumed.

ohmqtt.serialization.decode_bool(data: memoryview) tuple[bool, int]

Decode a boolean from a buffer.

Returns a tuple of the decoded boolean and the number of bytes consumed.

ohmqtt.serialization.decode_string(data: memoryview) tuple[str, int]

Decode a UTF-8 string from a buffer.

Returns a tuple of the decoded string and the number of bytes consumed.

ohmqtt.serialization.decode_string_pair(data: memoryview) tuple[tuple[str, str], int]

Decode a UTF-8 string pair from a buffer.

Returns a tuple of the decoded string pair and the number of bytes consumed.

ohmqtt.serialization.decode_uint16(data: memoryview) tuple[int, int]

Decode a 16-bit integer from a buffer.

Returns a tuple of the decoded integer and the number of bytes consumed.

ohmqtt.serialization.decode_uint32(data: memoryview) tuple[int, int]

Decode a 32-bit integer from a buffer.

Returns a tuple of the decoded integer and the number of bytes consumed.

ohmqtt.serialization.decode_uint8(data: memoryview) tuple[int, int]

Decode an 8-bit integer from a buffer.

Returns a tuple of the decoded integer and the number of bytes consumed.

ohmqtt.serialization.decode_varint(data: memoryview) tuple[int, int]

Decode a variable length integer from a buffer.

Returns a tuple of the decoded integer and the number of bytes consumed.

ohmqtt.serialization.encode_binary(data: bytes) bytes

Encode binary data to a buffer.

ohmqtt.serialization.encode_bool(x: bool) bytes

Encode a boolean to a buffer.

ohmqtt.serialization.encode_string(s: str) bytes

Encode a UTF-8 string to a buffer.

ohmqtt.serialization.encode_string_pair(values: tuple[str, str]) bytes

Encode a UTF-8 string pair to a buffer.

ohmqtt.serialization.encode_uint16(x: int) bytes

Encode a 16-bit integer to a buffer.

ohmqtt.serialization.encode_uint32(x: int) bytes

Encode a 32-bit integer to a buffer.

ohmqtt.serialization.encode_uint8(x: int) bytes

Encode an 8-bit integer to a buffer.

ohmqtt.serialization.encode_varint(x: int) bytes

Encode a variable length integer to a buffer.

ohmqtt.session module

class ohmqtt.session.Session(handlers: MessageHandlers, subscriptions: Subscriptions, connection: Connection, *, db_path: str = '', db_fast: bool = False)

Bases: object

connection
handle_connack(packet: MQTTConnAckPacket) None

Handle a connection open event.

handle_puback(packet: MQTTPubAckPacket) None

Handle a PUBACK packet from the server.

handle_pubcomp(packet: MQTTPubCompPacket) None

Handle a PUBCOMP packet from the server.

handle_publish(packet: MQTTPublishPacket) None

Handle a PUBLISH packet from the server.

handle_pubrec(packet: MQTTPubRecPacket) None

Handle a PUBREC packet from the server.

handle_pubrel(packet: MQTTPubRelPacket) None

Handle a PUBREL packet from the server.

inflight
params
persistence: Persistence
publish(topic: str, payload: bytes, *, qos: MQTTQoS = MQTTQoS.Q0, retain: bool = False, properties: ohmqtt.property.MQTTPublishProps | None = None, alias_policy: AliasPolicy = AliasPolicy.NEVER) Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]

Publish a message to a topic.

server_receive_maximum
set_params(params: ConnectParams) None
subscriptions
topic_alias

ohmqtt.subscriptions module

exception ohmqtt.subscriptions.NoMatchingSubscriptionError

Bases: Exception

Exception raised when no matching subscription is found when unsubscribing.

exception ohmqtt.subscriptions.ReconfiguredError

Bases: Exception

A subscribe operation was reconfigured before being acknowledged.

enum ohmqtt.subscriptions.RetainPolicy(value)

Bases: IntEnum

Policy for broker sending retained messages upon a subscription.

ALWAYS: Always send retained messages on subscription.

ONCE: Only send retained messages on the first subscription to a topic.

NEVER: Never send retained messages on subscription.

Member Type:

int

Valid values are as follows:

ALWAYS = <RetainPolicy.ALWAYS: 0>
ONCE = <RetainPolicy.ONCE: 1>
NEVER = <RetainPolicy.NEVER: 2>
class ohmqtt.subscriptions.Subscription(topic_filter: str, callback: SubscribeCallback, max_qos: MQTTQoS = MQTTQoS.Q2, share_name: str | None = None, no_local: bool = False, retain_as_published: bool = False, retain_policy: RetainPolicy = RetainPolicy.ALWAYS, sub_id: int | None = None, user_properties: tuple[tuple[str, str], ...] | None = None, state: _SubscriptionState = _SubscriptionState.SUBSCRIBING)

Bases: object

All the data about a request for subscription.

callback: SubscribeCallback
effective_filter: str
max_qos: MQTTQoS
no_local: bool
render_sub() MQTTSubscribePacket

Render the subscription into a SUBSCRIBE packet.

render_unsub() MQTTUnsubscribePacket

Render the subscription into an UNSUBSCRIBE packet.

retain_as_published: bool
retain_policy: RetainPolicy
share_name: str | None
state: _SubscriptionState
sub_id: int | None
topic_filter: str
user_properties: tuple[tuple[str, str], ...] | None
class ohmqtt.subscriptions.Subscriptions(handlers: MessageHandlers, connection: Connection, client: weakref.ReferenceType[Client])

Bases: object

Container for MQTT subscriptions and their callbacks.

handle_connack(packet: MQTTConnAckPacket) None

Handle incoming CONNACK packets.

handle_publish(packet: MQTTPublishPacket) None

Handle incoming PUBLISH packets.

handle_suback(packet: MQTTSubAckPacket) None

Handle incoming SUBACK packets.

handle_unsuback(packet: MQTTUnsubAckPacket) None

Handle incoming UNSUBACK packets.

subscribe(topic_filter: str, callback: SubscribeCallback, max_qos: MQTTQoS = MQTTQoS.Q2, *, share_name: str | None = None, no_local: bool = False, retain_as_published: bool = False, retain_policy: RetainPolicy = RetainPolicy.ALWAYS, sub_id: int | None = None, user_properties: Sequence[tuple[str, str]] | None = None) SubscribeHandle

Add a subscription with a callback.

Repeated calls to subscribe with the same combination of share_name and topic_filter will reconfigure the existing subscription with the new parameters, discarding the previous callback.

Returns:

A SubscribeHandle which can be used to wait for ack.

unsubscribe(topic_filter: str, *, share_name: str | None = None) Optional[Handle[MQTTUnsubAckPacket]]

Unsubscribe from a topic filter.

Returns:

UnsubscribeHandle if the topic was subscribed, otherwise None.

ohmqtt.topic_alias module

enum ohmqtt.topic_alias.AliasPolicy(value)

Bases: IntEnum

Topic alias policy.

NEVER: Never use topic aliases.

TRY: Use topic aliases if possible.

If an alias does not exist, attempt to create a new one. If the maximum number of aliases is reached, an alias will not be used.

ALWAYS: Always use topic aliases.

If an alias does not exist, attempt to create a new one. If the maximum number of aliases is reached, an exception will be raised.

Member Type:

int

Valid values are as follows:

NEVER = <AliasPolicy.NEVER: 0>
TRY = <AliasPolicy.TRY: 1>
ALWAYS = <AliasPolicy.ALWAYS: 2>
class ohmqtt.topic_alias.InboundTopicAlias(aliases: dict[int, str] = <factory>)

Bases: object

Inbound topic alias store.

aliases: dict[int, str]
handle(packet: MQTTPublishPacket) None

Handle the topic alias in a publish packet.

Stores the topic alias in the store if it is not recognized.

If the topic alias is recognized, the topic is replaced with the stored topic.

max_alias: int
reset() None

Reset the topic alias state.

This should be called at least when the connection is opened.

exception ohmqtt.topic_alias.MaxOutboundAliasError

Bases: Exception

Exception raised when the maximum number of topic aliases is reached.

class ohmqtt.topic_alias.OutboundLookupResult(alias: int, existed: bool)

Bases: NamedTuple

Result of an outbound lookup.

If the alias is 0, a topic alias should not be used in the publish packet.

If the alias is not 0 and existed is False,

the alias and topic should both be sent in the publish packet.

If the alias is not 0 and existed is True,

the alias should be sent in the publish packet and the topic should not be sent.

alias: int

Alias for field number 0

existed: bool

Alias for field number 1

class ohmqtt.topic_alias.OutboundTopicAlias(aliases: dict[str, int] = <factory>)

Bases: object

Inbound topic alias store.

aliases: dict[str, int]
lookup(topic: str, policy: AliasPolicy) OutboundLookupResult

Get the topic alias for a given topic from the client.

An alias integer and a boolean indicating if the alias already existed will be returned.

If the alias integer is 0, the alias was not created and the topic is not in the store. In this case, the topic alias must not be used in the publish packet.

max_alias: int
next_alias: int
pop() None

Remove the last assigned alias.

reset() None

Reset the topic alias state.

This should be called at least when the connection is opened.

ohmqtt.topic_filter module

ohmqtt.topic_filter.join_share(topic_filter: str, share_name: str | None) str

Join a topic filter with a share name.

ohmqtt.topic_filter.match_topic_filter(topic_filter: str, topic: str) bool

Check if the topic matches the filter.

This method will validate the topic, but assumes that the filter is already validated.

ohmqtt.topic_filter.validate_share_name(share_name: str) None

Validate an MQTT shared subscription name.

Raises ValueError if the share name is invalid.

ohmqtt.topic_filter.validate_topic(topic: str) None

Validate an MQTT topic name.

Raises ValueError if the topic name is invalid.

ohmqtt.topic_filter.validate_topic_filter(topic_filter: str) None

Validate an MQTT topic filter.

Raises ValueError if the topic filter is invalid.

Module contents