ohmqtt.persistence package

Submodules

ohmqtt.persistence.base module

exception ohmqtt.persistence.base.LostMessageError

Bases: Exception

Raised when a message is lost from the persistence store and can not be acknowledged.

class ohmqtt.persistence.base.Persistence

Bases: object

Abstract base class for message persistence.

abstract ack(packet: ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket) None

Ack a PUBLISH or PUBREL message in the persistence store.

Raises ValueError if the packet_id is not found in the store.

abstract add(topic: str, payload: bytes, qos: MQTTQoS, retain: bool, properties: MQTTPublishProps, alias_policy: AliasPolicy) Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]

Add a PUBLISH message to the persistence store.

abstract check_rec(packet: MQTTPublishPacket) bool

Validate that a QoS 2 PUBLISH packet has not already been received.

Returns True if the packet has not already been received, otherwise False.

Raises ValueError if the packet is not a QoS 2 PUBLISH packet.

abstract clear() None

Clear the persistence store, discarding all pending messages.

abstract close() None

Finalize and close the persistence store.

The store must not be used after this call.

abstract get(count: int) Sequence[int]

Get the packet ids of some pending messages from the store.

abstract open(client_id: str, clear: bool = False) None

Indicate to the persistence store that the broker has acknowledged our connection.

This may clear the persistence store if the client_id is different from the persisted, or if clear is True.

abstract rel(packet: MQTTPubRelPacket) None

Release a QoS 2 PUBLISH message.

abstract render(packet_id: int) RenderedPacket

Render a PUBLISH message from the persistence store.

This also indicates to the persistence store that the message is inflight.

Raises KeyError if the ID is not retained.

abstract set_rec(packet: MQTTPublishPacket) None

Indicate that a QoS 2 PUBLISH message has been received.

Raises ValueError if the packet is not a QoS 2 PUBLISH packet.

class ohmqtt.persistence.base.RenderedPacket(packet: ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubRelPacket, alias_policy: AliasPolicy)

Bases: NamedTuple

Represents a rendered packet.

alias_policy: AliasPolicy

Alias for field number 1

packet: ohmqtt.packet.publish.MQTTPublishPacket | ohmqtt.packet.publish.MQTTPubRelPacket

Alias for field number 0

ohmqtt.persistence.in_memory module

class ohmqtt.persistence.in_memory.InMemoryPersistence

Bases: Persistence

Store for retained messages in the session.

This store is in memory only and is not persistent.

ack(packet: ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket) None

Ack a PUBLISH or PUBREL message in the persistence store.

Raises ValueError if the packet_id is not found in the store.

add(topic: str, payload: bytes, qos: MQTTQoS, retain: bool, properties: MQTTPublishProps, alias_policy: AliasPolicy) Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]

Add a PUBLISH message to the persistence store.

check_rec(packet: MQTTPublishPacket) bool

Validate that a QoS 2 PUBLISH packet has not already been received.

Returns True if the packet has not already been received, otherwise False.

Raises ValueError if the packet is not a QoS 2 PUBLISH packet.

clear() None

Clear the persistence store, discarding all pending messages.

close() None

Finalize and close the persistence store.

The store must not be used after this call.

get(count: int) Sequence[int]

Get the packet ids of some pending messages from the store.

open(client_id: str, clear: bool = False) None

Indicate to the persistence store that the broker has acknowledged our connection.

This may clear the persistence store if the client_id is different from the persisted, or if clear is True.

rel(packet: MQTTPubRelPacket) None

Release a QoS 2 PUBLISH message.

render(msg_id: int) RenderedPacket

Render a PUBLISH message from the persistence store.

This also indicates to the persistence store that the message is inflight.

Raises KeyError if the ID is not retained.

set_rec(packet: MQTTPublishPacket) None

Indicate that a QoS 2 PUBLISH message has been received.

Raises ValueError if the packet is not a QoS 2 PUBLISH packet.

class ohmqtt.persistence.in_memory.RetainedMessage(topic: str, payload: bytes, msg_id: int, qos: MQTTQoS, retain: bool, properties: MQTTPublishProps, dup: bool, received: bool, handle: ReferenceType[Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]], alias_policy: AliasPolicy)

Bases: object

Represents a qos>0 message in the session.

alias_policy: AliasPolicy
dup: bool
handle: ReferenceType[Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]]
msg_id: int
payload: bytes
properties: MQTTPublishProps
qos: MQTTQoS
received: bool
retain: bool
topic: str

ohmqtt.persistence.sqlite module

class ohmqtt.persistence.sqlite.SQLitePersistence(db_path: str, *, db_fast: bool = False)

Bases: Persistence

SQLite persistence for MQTT messages.

This class provides a SQLite-based persistence layer for MQTT messages.

ack(packet: ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket) None

Ack a PUBLISH or PUBREL message in the persistence store.

Raises ValueError if the packet_id is not found in the store.

add(topic: str, payload: bytes, qos: MQTTQoS, retain: bool, properties: MQTTPublishProps, alias_policy: AliasPolicy) Handle[ohmqtt.packet.publish.MQTTPubAckPacket | ohmqtt.packet.publish.MQTTPubRecPacket | ohmqtt.packet.publish.MQTTPubCompPacket]

Add a PUBLISH message to the persistence store.

check_rec(packet: MQTTPublishPacket) bool

Validate that a QoS 2 PUBLISH packet has not already been received.

Returns True if the packet has not already been received, otherwise False.

Raises ValueError if the packet is not a QoS 2 PUBLISH packet.

clear() None

Clear the persistence store, discarding all pending messages.

close() None

Finalize and close the persistence store.

The store must not be used after this call.

get(count: int) list[int]

Get the packet ids of some pending messages from the store.

open(client_id: str, clear: bool = False) None

Indicate to the persistence store that the broker has acknowledged our connection.

This may clear the persistence store if the client_id is different from the persisted, or if clear is True.

rel(packet: MQTTPubRelPacket) None

Release a QoS 2 PUBLISH message.

render(message_id: int) RenderedPacket

Render a PUBLISH message from the persistence store.

This also indicates to the persistence store that the message is inflight.

Raises KeyError if the ID is not retained.

set_rec(packet: MQTTPublishPacket) None

Indicate that a QoS 2 PUBLISH message has been received.

Raises ValueError if the packet is not a QoS 2 PUBLISH packet.

exception ohmqtt.persistence.sqlite.SchemaError

Bases: Exception

Database schema error.

Module contents