Implementation Interfaces
Subpackages
- ohmqtt.connection package
- Submodules
- ohmqtt.connection.address module
- ohmqtt.connection.decoder module
- ohmqtt.connection.fsm module
- ohmqtt.connection.handlers module
- ohmqtt.connection.keepalive module
- ohmqtt.connection.selector module
- ohmqtt.connection.states module
- ohmqtt.connection.timeout module
- ohmqtt.connection.types module
ConnectParamsConnectParams.addressConnectParams.clean_startConnectParams.client_idConnectParams.connect_propertiesConnectParams.connect_timeoutConnectParams.keepalive_intervalConnectParams.passwordConnectParams.protocol_versionConnectParams.reconnect_delayConnectParams.tcp_nodelayConnectParams.tls_contextConnectParams.tls_hostnameConnectParams.usernameConnectParams.will_payloadConnectParams.will_propertiesConnectParams.will_qosConnectParams.will_retainConnectParams.will_topic
StateDataStateEnvironment
- Module contents
ConnectionConnection.can_send()Connection.connect()Connection.disconnect()Connection.fsmConnection.handle_packet()Connection.is_connected()Connection.loop_forever()Connection.loop_once()Connection.loop_until_connected()Connection.send()Connection.shutdown()Connection.wait_for_connect()Connection.wait_for_disconnect()Connection.wait_for_shutdown()
- ohmqtt.packet package
- Submodules
- ohmqtt.packet.auth module
- ohmqtt.packet.base module
- ohmqtt.packet.connect module
MQTTConnAckPacketMQTTConnectPacketMQTTConnectPacket.clean_startMQTTConnectPacket.client_idMQTTConnectPacket.decode()MQTTConnectPacket.encode()MQTTConnectPacket.keep_aliveMQTTConnectPacket.packet_typeMQTTConnectPacket.passwordMQTTConnectPacket.propertiesMQTTConnectPacket.props_typeMQTTConnectPacket.protocol_versionMQTTConnectPacket.usernameMQTTConnectPacket.will_payloadMQTTConnectPacket.will_propsMQTTConnectPacket.will_qosMQTTConnectPacket.will_retainMQTTConnectPacket.will_topic
MQTTDisconnectPacket
- ohmqtt.packet.ping module
- ohmqtt.packet.publish module
- ohmqtt.packet.subscribe module
- Module contents
- ohmqtt.persistence package
Submodules
ohmqtt.client module
- class ohmqtt.client.Client(db_path: str = '', *, db_fast: bool = False)
Bases:
objectHigh level interface for the MQTT client.
- Parameters:
db_path – Path to the database file for persistence. If none provided, in-memory store will be used.
db_fast – If True, use a faster database implementation (e.g. SQLite WAL).
- auth(*, authentication_method: str | None = None, authentication_data: bytes | None = None, reason_string: str | None = None, user_properties: Optional[Sequence[tuple[str, str]]] = None, reason_code: MQTTReasonCode = MQTTReasonCode.Success) None
Send an AUTH packet to the broker.
- Parameters:
authentication_method – The authentication method to use.
authentication_data – Authentication data to send.
reason_string – A reason string to include in the AUTH packet.
user_properties – Optional user properties to include in the AUTH packet.
reason_code – The reason code for the AUTH packet.
- connect(address: str, *, client_id: str = '', clean_start: bool = False, connect_timeout: float | None = None, reconnect_delay: int = 0, keepalive_interval: int = 0, tcp_nodelay: bool = True, tls_context: ssl.SSLContext | None = None, tls_hostname: str = '', username: str | None = None, password: bytes | None = None, will_topic: str = '', will_payload: bytes = b'', will_qos: int = 0, will_retain: bool = False, will_properties: ohmqtt.property.MQTTWillProps | None = None, connect_properties: ohmqtt.property.MQTTConnectProps | None = None) None
Connect to the broker.
- Parameters:
address –
The address of the broker to connect to. Addresses may be in the form: - mqtt://host:port (TCP) - mqtts://host:port (TLS) - ws://host:port (WebSocket) - wss://host:port (WebSocket/TLS) - unix:/path/to/socket (if supported on the platform) - When the protocol is omitted, mqtt:// is assumed. - When the port is omitted, the default port for the protocol is used.
mqtt: 1883
mqtts: 8883
ws: 80
wss: 443
client_id – The client ID to use for the connection, or empty string to request one from the broker.
clean_start – If True, an existing session will not be resumed.
connect_timeout – Timeout for the connection attempt in seconds, or None for no timeout.
reconnect_delay – Delay in seconds before attempting to reconnect after a disconnection, or 0 to disable reconnect.
keepalive_interval – The keep alive interval in seconds, or 0 to disable keep alive.
tcp_nodelay – If True, enable TCP_NODELAY to disable Nagle’s algorithm.
tls_context – An SSLContext for TLS connections, or None to use the default.
tls_hostname – The hostname to use for TLS connections, or empty string to determine from the address.
username – The username for MQTT authentication, or None to disable.
password – The password for MQTT authentication, or None to disable.
will_topic – The topic for the Will message, or empty string to disable.
will_payload – The payload for the Will message.
will_qos – The QoS level for the Will message (0, 1, or 2).
will_retain – If True, the Will message will be retained.
will_properties – Properties for the Will message.
connect_properties – Properties for the CONNECT packet.
- connection
- disconnect() None
Disconnect from the broker.
- handle_auth(packet: MQTTAuthPacket) None
Callback for an AUTH packet from the broker.
- is_connected() bool
Check if the client is connected to the broker.
- loop_forever() None
Run the MQTT client loop.
This will run until the client is stopped or shutdown.
- loop_once(max_wait: float | None = 0.0) None
Run a single iteration of the MQTT client loop.
If max_wait is 0.0 (the default), this call will not block.
If max_wait is None, this call will block until the next event.
Any other numeric max_wait value may block for maximum that amount of time in seconds.
- loop_until_connected(timeout: float | None = None) bool
Run the MQTT client loop until the client is connected to the broker.
If a timeout is provided, the loop will give up after that amount of time.
Returns True if the client is connected, False if the timeout was reached.
- publish(topic: str, payload: bytes | bytearray | str, *, qos: int | ohmqtt.mqtt_spec.MQTTQoS = MQTTQoS.Q0, retain: bool = False, properties: ohmqtt.property.MQTTPublishProps | None = None, alias_policy: AliasPolicy = AliasPolicy.NEVER) Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]
Publish a message to a topic.
- Parameters:
topic – The topic to publish to.
payload – The payload of the message. If a string is provided, it will be encoded as UTF-8.
qos – The QoS level for the message (0, 1, or 2).
retain – If True, the message will be retained by the broker.
properties – Properties for the PUBLISH packet.
alias_policy – The policy for using automatic topic aliases.
- session
- shutdown() None
Shutdown the client and close the connection.
- start_loop() None
Start the client state machine in a separate thread.
- subscribe(topic_filter: str, callback: Callable[[Client, MQTTPublishPacket], None], max_qos: int | ohmqtt.mqtt_spec.MQTTQoS = MQTTQoS.Q2, *, share_name: str | None = None, no_local: bool = False, retain_as_published: bool = False, retain_policy: RetainPolicy = RetainPolicy.ALWAYS, sub_id: int | None = None, user_properties: Optional[Sequence[tuple[str, str]]] = None) Optional[Handle[MQTTSubAckPacket]]
Subscribe to a topic filter with a callback.
If the client is connected, returns a handle which can be used to unsubscribe from the topic filter or wait for the subscription to be acknowledged.
If the client is not connected, returns None.
- Parameters:
topic_filter – The topic filter to subscribe to.
callback – The callback to call when a message is received on the subscribed topic.
max_qos – The maximum QoS level for the subscription (0, 1, or 2).
share_name – The name of a shared subscription to use.
no_local – If True, do not receive messages published by this client.
retain_as_published – If True, the retain flag of messages will match the original message.
retain_policy – The policy for retained messages.
sub_id – An optional subscription ID for the subscription.
user_properties – Optional user properties to include in the subscription request.
- unsubscribe(topic_filter: str, *, share_name: str | None = None) Optional[Handle[MQTTUnsubAckPacket]]
Unsubscribe from a topic filter.
If the client is connected, returns a handle which can be used to wait for the unsubscription to be acknowledged.
If the client is not connected, returns None.
- Parameters:
topic_filter – The topic filter to unsubscribe from.
share_name – The name of a shared subscription to use.
- wait_for_connect(timeout: float | None = None) None
Wait for the client to connect to the broker.
- Raises:
TimeoutError – The timeout was exceeded.
- wait_for_disconnect(timeout: float | None = None) None
Wait for the client to disconnect from the broker.
- Raises:
TimeoutError – The timeout was exceeded.
- wait_for_shutdown(timeout: float | None = None) None
Wait for the client to disconnect and finalize.
- Raises:
TimeoutError – The timeout was exceeded.
ohmqtt.error module
- exception ohmqtt.error.MQTTError(message: str, reason_code: MQTTReasonCode = MQTTReasonCode.UnspecifiedError)
Bases:
Exception
ohmqtt.logger module
- ohmqtt.logger.get_logger(name: str) Logger
Get a logger with the specified name.
- ohmqtt.logger.set_log_level(level: int) None
Set the log level for the ohmqtt logger.
ohmqtt.mqtt_spec module
Constants from the MQTT specification.
- enum ohmqtt.mqtt_spec.MQTTPacketType(value)
Bases:
IntEnumTypes of MQTT control packets, mapped to identifiers from the specification.
- Member Type:
int
Valid values are as follows:
- CONNECT = <MQTTPacketType.CONNECT: 1>
- CONNACK = <MQTTPacketType.CONNACK: 2>
- PUBLISH = <MQTTPacketType.PUBLISH: 3>
- PUBACK = <MQTTPacketType.PUBACK: 4>
- PUBREC = <MQTTPacketType.PUBREC: 5>
- PUBREL = <MQTTPacketType.PUBREL: 6>
- PUBCOMP = <MQTTPacketType.PUBCOMP: 7>
- SUBSCRIBE = <MQTTPacketType.SUBSCRIBE: 8>
- SUBACK = <MQTTPacketType.SUBACK: 9>
- UNSUBSCRIBE = <MQTTPacketType.UNSUBSCRIBE: 10>
- UNSUBACK = <MQTTPacketType.UNSUBACK: 11>
- PINGREQ = <MQTTPacketType.PINGREQ: 12>
- PINGRESP = <MQTTPacketType.PINGRESP: 13>
- DISCONNECT = <MQTTPacketType.DISCONNECT: 14>
- AUTH = <MQTTPacketType.AUTH: 15>
- enum ohmqtt.mqtt_spec.MQTTPropertyId(value)
Bases:
IntEnumTypes of MQTT properties, mapped to identifiers from the specification.
- Member Type:
int
Valid values are as follows:
- PayloadFormatIndicator = <MQTTPropertyId.PayloadFormatIndicator: 1>
- MessageExpiryInterval = <MQTTPropertyId.MessageExpiryInterval: 2>
- ContentType = <MQTTPropertyId.ContentType: 3>
- ResponseTopic = <MQTTPropertyId.ResponseTopic: 8>
- CorrelationData = <MQTTPropertyId.CorrelationData: 9>
- SubscriptionIdentifier = <MQTTPropertyId.SubscriptionIdentifier: 11>
- SessionExpiryInterval = <MQTTPropertyId.SessionExpiryInterval: 17>
- AssignedClientIdentifier = <MQTTPropertyId.AssignedClientIdentifier: 18>
- ServerKeepAlive = <MQTTPropertyId.ServerKeepAlive: 19>
- AuthenticationMethod = <MQTTPropertyId.AuthenticationMethod: 21>
- AuthenticationData = <MQTTPropertyId.AuthenticationData: 22>
- RequestProblemInformation = <MQTTPropertyId.RequestProblemInformation: 23>
- WillDelayInterval = <MQTTPropertyId.WillDelayInterval: 24>
- RequestResponseInformation = <MQTTPropertyId.RequestResponseInformation: 25>
- ResponseInformation = <MQTTPropertyId.ResponseInformation: 26>
- ServerReference = <MQTTPropertyId.ServerReference: 28>
- ReasonString = <MQTTPropertyId.ReasonString: 31>
- ReceiveMaximum = <MQTTPropertyId.ReceiveMaximum: 33>
- TopicAliasMaximum = <MQTTPropertyId.TopicAliasMaximum: 34>
- TopicAlias = <MQTTPropertyId.TopicAlias: 35>
- MaximumQoS = <MQTTPropertyId.MaximumQoS: 36>
- RetainAvailable = <MQTTPropertyId.RetainAvailable: 37>
- UserProperty = <MQTTPropertyId.UserProperty: 38>
- MaximumPacketSize = <MQTTPropertyId.MaximumPacketSize: 39>
- WildcardSubscriptionAvailable = <MQTTPropertyId.WildcardSubscriptionAvailable: 40>
- SubscriptionIdentifierAvailable = <MQTTPropertyId.SubscriptionIdentifierAvailable: 41>
- enum ohmqtt.mqtt_spec.MQTTQoS(value)
Bases:
IntEnumQuality of Service level for messaging.
- Member Type:
int
Valid values are as follows:
- Q0 = <MQTTQoS.Q0: 0>
- Q1 = <MQTTQoS.Q1: 1>
- Q2 = <MQTTQoS.Q2: 2>
- enum ohmqtt.mqtt_spec.MQTTReasonCode(value)
Bases:
IntEnumIndicates the result of an operation.
- Member Type:
int
Valid values are as follows:
- Success = <MQTTReasonCode.Success: 0>
- GrantedQoS1 = <MQTTReasonCode.GrantedQoS1: 1>
- GrantedQoS2 = <MQTTReasonCode.GrantedQoS2: 2>
- DisconnectWithWillMessage = <MQTTReasonCode.DisconnectWithWillMessage: 4>
- NoMatchingSubscribers = <MQTTReasonCode.NoMatchingSubscribers: 16>
- NoSubscriptionExisted = <MQTTReasonCode.NoSubscriptionExisted: 17>
- ContinueAuthentication = <MQTTReasonCode.ContinueAuthentication: 24>
- ReAuthenticate = <MQTTReasonCode.ReAuthenticate: 25>
- UnspecifiedError = <MQTTReasonCode.UnspecifiedError: 128>
- MalformedPacket = <MQTTReasonCode.MalformedPacket: 129>
- ProtocolError = <MQTTReasonCode.ProtocolError: 130>
- ImplementationSpecificError = <MQTTReasonCode.ImplementationSpecificError: 131>
- UnsupportedProtocolVersion = <MQTTReasonCode.UnsupportedProtocolVersion: 132>
- ClientIdentifierNotValid = <MQTTReasonCode.ClientIdentifierNotValid: 133>
- BadUserNameOrPassword = <MQTTReasonCode.BadUserNameOrPassword: 134>
- NotAuthorized = <MQTTReasonCode.NotAuthorized: 135>
- ServerBusy = <MQTTReasonCode.ServerBusy: 137>
- Banned = <MQTTReasonCode.Banned: 138>
- ServerShuttingDown = <MQTTReasonCode.ServerShuttingDown: 139>
- BadAuthenticationMethod = <MQTTReasonCode.BadAuthenticationMethod: 140>
- KeepAliveTimeout = <MQTTReasonCode.KeepAliveTimeout: 141>
- SessionTakenOver = <MQTTReasonCode.SessionTakenOver: 142>
- TopicFilterInvalid = <MQTTReasonCode.TopicFilterInvalid: 143>
- TopicNameInvalid = <MQTTReasonCode.TopicNameInvalid: 144>
- PacketIdentifierInUse = <MQTTReasonCode.PacketIdentifierInUse: 145>
- PacketIdentifierNotFound = <MQTTReasonCode.PacketIdentifierNotFound: 146>
- ReceiveMaximumExceeded = <MQTTReasonCode.ReceiveMaximumExceeded: 147>
- TopicAliasInvalid = <MQTTReasonCode.TopicAliasInvalid: 148>
- PacketTooLarge = <MQTTReasonCode.PacketTooLarge: 149>
- MessageRateTooHigh = <MQTTReasonCode.MessageRateTooHigh: 150>
- QuotaExceeded = <MQTTReasonCode.QuotaExceeded: 151>
- AdministrativeAction = <MQTTReasonCode.AdministrativeAction: 152>
- PayloadFormatInvalid = <MQTTReasonCode.PayloadFormatInvalid: 153>
- RetainNotSupported = <MQTTReasonCode.RetainNotSupported: 154>
- QoSNotSupported = <MQTTReasonCode.QoSNotSupported: 155>
- UseAnotherServer = <MQTTReasonCode.UseAnotherServer: 156>
- ServerMoved = <MQTTReasonCode.ServerMoved: 157>
- ConnectionRateExceeded = <MQTTReasonCode.ConnectionRateExceeded: 159>
- MaximumConnectTime = <MQTTReasonCode.MaximumConnectTime: 160>
- SubscriptionIdentifiersNotSupported = <MQTTReasonCode.SubscriptionIdentifiersNotSupported: 161>
- WildcardSubscriptionsNotSupported = <MQTTReasonCode.WildcardSubscriptionsNotSupported: 162>
The
Enumand its members also have the following methods:- is_error() bool
Check if the reason code indicates an error.
ohmqtt.platform module
- exception ohmqtt.platform.PlatformError
Bases:
ExceptionA feature is not supported on this platform.
ohmqtt.property module
- class ohmqtt.property.MQTTAuthProps
Bases:
MQTTPropertiesProperties for MQTT AUTH packet.
- AuthenticationData: bytes | None = None
- AuthenticationMethod: str | None = None
- ReasonString: str | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTConnAckProps
Bases:
MQTTPropertiesProperties for MQTT CONNACK packet.
- AssignedClientIdentifier: str | None = None
- AuthenticationData: bytes | None = None
- AuthenticationMethod: str | None = None
- MaximumPacketSize: int | None = None
- MaximumQoS: int | None = None
- ReasonString: str | None = None
- ReceiveMaximum: int | None = None
- ResponseInformation: str | None = None
- RetainAvailable: bool | None = None
- ServerKeepAlive: int | None = None
- ServerReference: str | None = None
- SessionExpiryInterval: int | None = None
- SubscriptionIdentifierAvailable: bool | None = None
- TopicAliasMaximum: int | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- WildcardSubscriptionAvailable: bool | None = None
- class ohmqtt.property.MQTTConnectProps
Bases:
MQTTPropertiesProperties for MQTT CONNECT packet.
- AuthenticationData: bytes | None = None
- AuthenticationMethod: str | None = None
- MaximumPacketSize: int | None = None
- ReceiveMaximum: int | None = None
- RequestProblemInformation: bool | None = None
- RequestResponseInformation: bool | None = None
- SessionExpiryInterval: int | None = None
- TopicAliasMaximum: int | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTDisconnectProps
Bases:
MQTTPropertiesProperties for MQTT DISCONNECT packet.
- ReasonString: str | None = None
- ServerReference: str | None = None
- SessionExpiryInterval: int | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTProperties
Bases:
MQTTPropertiesBase- classmethod decode(data: memoryview) tuple[Self, int]
Decode MQTT properties from a buffer.
Returns a tuple of the decoded properties and the number of bytes consumed.
- encode() bytes
Encode MQTT properties to a buffer.
- class ohmqtt.property.MQTTPropertiesBase
Bases:
SimpleNamespaceRepresents MQTT packet properties.
- class ohmqtt.property.MQTTPubAckProps
Bases:
MQTTPropertiesProperties for MQTT PUBACK packet.
- ReasonString: str | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTPubCompProps
Bases:
MQTTPropertiesProperties for MQTT PUBCOMP packet.
- ReasonString: str | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTPubRecProps
Bases:
MQTTPropertiesProperties for MQTT PUBREC packet.
- ReasonString: str | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTPubRelProps
Bases:
MQTTPropertiesProperties for MQTT PUBREL packet.
- ReasonString: str | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTPublishProps
Bases:
MQTTPropertiesProperties for MQTT PUBLISH packet.
- ContentType: str | None = None
- CorrelationData: bytes | None = None
- MessageExpiryInterval: int | None = None
- PayloadFormatIndicator: int | None = None
- ResponseTopic: str | None = None
- SubscriptionIdentifier: set[int] | None = None
- TopicAlias: int | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTSubAckProps
Bases:
MQTTPropertiesProperties for MQTT SUBACK packet.
- ReasonString: str | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTSubscribeProps
Bases:
MQTTPropertiesProperties for MQTT SUBSCRIBE packet.
- SubscriptionIdentifier: set[int] | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTUnsubAckProps
Bases:
MQTTPropertiesProperties for MQTT UNSUBACK packet.
- ReasonString: str | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTUnsubscribeProps
Bases:
MQTTPropertiesProperties for MQTT UNSUBSCRIBE packet.
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- class ohmqtt.property.MQTTWillProps
Bases:
MQTTPropertiesProperties for MQTT Will message.
- ContentType: str | None = None
- CorrelationData: bytes | None = None
- MessageExpiryInterval: int | None = None
- PayloadFormatIndicator: int | None = None
- ResponseTopic: str | None = None
- UserProperty: Optional[Sequence[tuple[str, str]]] = None
- WillDelayInterval: int | None = None
ohmqtt.protected module
- class ohmqtt.protected.Protected(lock: Optional[Union[RLock, allocate_lock]] = None)
Bases:
objectA wrapper to protect a resource or resources.
Combine with the @protect decorator to protect methods of this class.
- acquire
- release
- ohmqtt.protected.protect(func: Callable[[Concatenate[ProtectedT, ProtectP]], ProtectR]) Callable[[Concatenate[ProtectedT, ProtectP]], ProtectR]
Decorator to protect a method of a Protected instance.
ohmqtt.serialization module
Primitives for encoding and decoding MQTT packet fields.
- ohmqtt.serialization.decode_binary(data: memoryview) tuple[bytes, int]
Decode binary data from a buffer.
Returns a tuple of the decoded data and the number of bytes consumed.
- ohmqtt.serialization.decode_bool(data: memoryview) tuple[bool, int]
Decode a boolean from a buffer.
Returns a tuple of the decoded boolean and the number of bytes consumed.
- ohmqtt.serialization.decode_string(data: memoryview) tuple[str, int]
Decode a UTF-8 string from a buffer.
Returns a tuple of the decoded string and the number of bytes consumed.
- ohmqtt.serialization.decode_string_pair(data: memoryview) tuple[tuple[str, str], int]
Decode a UTF-8 string pair from a buffer.
Returns a tuple of the decoded string pair and the number of bytes consumed.
- ohmqtt.serialization.decode_uint16(data: memoryview) tuple[int, int]
Decode a 16-bit integer from a buffer.
Returns a tuple of the decoded integer and the number of bytes consumed.
- ohmqtt.serialization.decode_uint32(data: memoryview) tuple[int, int]
Decode a 32-bit integer from a buffer.
Returns a tuple of the decoded integer and the number of bytes consumed.
- ohmqtt.serialization.decode_uint8(data: memoryview) tuple[int, int]
Decode an 8-bit integer from a buffer.
Returns a tuple of the decoded integer and the number of bytes consumed.
- ohmqtt.serialization.decode_varint(data: memoryview) tuple[int, int]
Decode a variable length integer from a buffer.
Returns a tuple of the decoded integer and the number of bytes consumed.
- ohmqtt.serialization.encode_binary(data: bytes) bytes
Encode binary data to a buffer.
- ohmqtt.serialization.encode_bool(x: bool) bytes
Encode a boolean to a buffer.
- ohmqtt.serialization.encode_string(s: str) bytes
Encode a UTF-8 string to a buffer.
- ohmqtt.serialization.encode_string_pair(values: tuple[str, str]) bytes
Encode a UTF-8 string pair to a buffer.
- ohmqtt.serialization.encode_uint16(x: int) bytes
Encode a 16-bit integer to a buffer.
- ohmqtt.serialization.encode_uint32(x: int) bytes
Encode a 32-bit integer to a buffer.
- ohmqtt.serialization.encode_uint8(x: int) bytes
Encode an 8-bit integer to a buffer.
- ohmqtt.serialization.encode_varint(x: int) bytes
Encode a variable length integer to a buffer.
ohmqtt.session module
- class ohmqtt.session.Session(handlers: MessageHandlers, subscriptions: Subscriptions, connection: Connection, *, db_path: str = '', db_fast: bool = False)
Bases:
object- connection
- handle_connack(packet: MQTTConnAckPacket) None
Handle a connection open event.
- handle_puback(packet: MQTTPubAckPacket) None
Handle a PUBACK packet from the server.
- handle_pubcomp(packet: MQTTPubCompPacket) None
Handle a PUBCOMP packet from the server.
- handle_publish(packet: MQTTPublishPacket) None
Handle a PUBLISH packet from the server.
- handle_pubrec(packet: MQTTPubRecPacket) None
Handle a PUBREC packet from the server.
- handle_pubrel(packet: MQTTPubRelPacket) None
Handle a PUBREL packet from the server.
- inflight
- params
- persistence: Persistence
- publish(topic: str, payload: bytes, *, qos: MQTTQoS = MQTTQoS.Q0, retain: bool = False, properties: ohmqtt.property.MQTTPublishProps | None = None, alias_policy: AliasPolicy = AliasPolicy.NEVER) Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]
Publish a message to a topic.
- server_receive_maximum
- set_params(params: ConnectParams) None
- subscriptions
- topic_alias
ohmqtt.subscriptions module
- exception ohmqtt.subscriptions.NoMatchingSubscriptionError
Bases:
ExceptionException raised when no matching subscription is found when unsubscribing.
- exception ohmqtt.subscriptions.ReconfiguredError
Bases:
ExceptionA subscribe operation was reconfigured before being acknowledged.
- enum ohmqtt.subscriptions.RetainPolicy(value)
Bases:
IntEnumPolicy for broker sending retained messages upon a subscription.
ALWAYS: Always send retained messages on subscription.
ONCE: Only send retained messages on the first subscription to a topic.
NEVER: Never send retained messages on subscription.
- Member Type:
int
Valid values are as follows:
- ALWAYS = <RetainPolicy.ALWAYS: 0>
- ONCE = <RetainPolicy.ONCE: 1>
- NEVER = <RetainPolicy.NEVER: 2>
- class ohmqtt.subscriptions.Subscription(topic_filter: str, callback: SubscribeCallback, max_qos: MQTTQoS = MQTTQoS.Q2, share_name: str | None = None, no_local: bool = False, retain_as_published: bool = False, retain_policy: RetainPolicy = RetainPolicy.ALWAYS, sub_id: int | None = None, user_properties: tuple[tuple[str, str], ...] | None = None, state: _SubscriptionState = _SubscriptionState.SUBSCRIBING)
Bases:
objectAll the data about a request for subscription.
- callback: SubscribeCallback
- effective_filter: str
- no_local: bool
- render_sub() MQTTSubscribePacket
Render the subscription into a SUBSCRIBE packet.
- render_unsub() MQTTUnsubscribePacket
Render the subscription into an UNSUBSCRIBE packet.
- retain_as_published: bool
- retain_policy: RetainPolicy
- state: _SubscriptionState
- sub_id: int | None
- topic_filter: str
- user_properties: tuple[tuple[str, str], ...] | None
- class ohmqtt.subscriptions.Subscriptions(handlers: MessageHandlers, connection: Connection, client: weakref.ReferenceType[Client])
Bases:
objectContainer for MQTT subscriptions and their callbacks.
- handle_connack(packet: MQTTConnAckPacket) None
Handle incoming CONNACK packets.
- handle_publish(packet: MQTTPublishPacket) None
Handle incoming PUBLISH packets.
- handle_suback(packet: MQTTSubAckPacket) None
Handle incoming SUBACK packets.
- handle_unsuback(packet: MQTTUnsubAckPacket) None
Handle incoming UNSUBACK packets.
- subscribe(topic_filter: str, callback: SubscribeCallback, max_qos: MQTTQoS = MQTTQoS.Q2, *, share_name: str | None = None, no_local: bool = False, retain_as_published: bool = False, retain_policy: RetainPolicy = RetainPolicy.ALWAYS, sub_id: int | None = None, user_properties: Sequence[tuple[str, str]] | None = None) SubscribeHandle
Add a subscription with a callback.
Repeated calls to subscribe with the same combination of share_name and topic_filter will reconfigure the existing subscription with the new parameters, discarding the previous callback.
- Returns:
A SubscribeHandle which can be used to wait for ack.
- unsubscribe(topic_filter: str, *, share_name: str | None = None) Optional[Handle[MQTTUnsubAckPacket]]
Unsubscribe from a topic filter.
- Returns:
UnsubscribeHandle if the topic was subscribed, otherwise None.
ohmqtt.topic_alias module
- enum ohmqtt.topic_alias.AliasPolicy(value)
Bases:
IntEnumTopic alias policy.
NEVER: Never use topic aliases.
- TRY: Use topic aliases if possible.
If an alias does not exist, attempt to create a new one. If the maximum number of aliases is reached, an alias will not be used.
- ALWAYS: Always use topic aliases.
If an alias does not exist, attempt to create a new one. If the maximum number of aliases is reached, an exception will be raised.
- Member Type:
int
Valid values are as follows:
- NEVER = <AliasPolicy.NEVER: 0>
- TRY = <AliasPolicy.TRY: 1>
- ALWAYS = <AliasPolicy.ALWAYS: 2>
- class ohmqtt.topic_alias.InboundTopicAlias(aliases: dict[int, str] = <factory>)
Bases:
objectInbound topic alias store.
- aliases: dict[int, str]
- handle(packet: MQTTPublishPacket) None
Handle the topic alias in a publish packet.
Stores the topic alias in the store if it is not recognized.
If the topic alias is recognized, the topic is replaced with the stored topic.
- max_alias: int
- reset() None
Reset the topic alias state.
This should be called at least when the connection is opened.
- exception ohmqtt.topic_alias.MaxOutboundAliasError
Bases:
ExceptionException raised when the maximum number of topic aliases is reached.
- class ohmqtt.topic_alias.OutboundLookupResult(alias: int, existed: bool)
Bases:
NamedTupleResult of an outbound lookup.
If the alias is 0, a topic alias should not be used in the publish packet.
- If the alias is not 0 and existed is False,
the alias and topic should both be sent in the publish packet.
- If the alias is not 0 and existed is True,
the alias should be sent in the publish packet and the topic should not be sent.
- alias: int
Alias for field number 0
- existed: bool
Alias for field number 1
- class ohmqtt.topic_alias.OutboundTopicAlias(aliases: dict[str, int] = <factory>)
Bases:
objectInbound topic alias store.
- aliases: dict[str, int]
- lookup(topic: str, policy: AliasPolicy) OutboundLookupResult
Get the topic alias for a given topic from the client.
An alias integer and a boolean indicating if the alias already existed will be returned.
If the alias integer is 0, the alias was not created and the topic is not in the store. In this case, the topic alias must not be used in the publish packet.
- max_alias: int
- next_alias: int
- pop() None
Remove the last assigned alias.
- reset() None
Reset the topic alias state.
This should be called at least when the connection is opened.
ohmqtt.topic_filter module
Join a topic filter with a share name.
- ohmqtt.topic_filter.match_topic_filter(topic_filter: str, topic: str) bool
Check if the topic matches the filter.
This method will validate the topic, but assumes that the filter is already validated.
Validate an MQTT shared subscription name.
Raises ValueError if the share name is invalid.
- ohmqtt.topic_filter.validate_topic(topic: str) None
Validate an MQTT topic name.
Raises ValueError if the topic name is invalid.
- ohmqtt.topic_filter.validate_topic_filter(topic_filter: str) None
Validate an MQTT topic filter.
Raises ValueError if the topic filter is invalid.