ohmqtt.packet package
Submodules
ohmqtt.packet.auth module
AUTH packets.
- class ohmqtt.packet.auth.MQTTAuthPacket(reason_code: 'MQTTReasonCode' = <MQTTReasonCode.Success: 0>, properties: 'MQTTAuthProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTAuthPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_type: ClassVar[MQTTPacketType] = 15
- properties: MQTTAuthProps
- props_type
alias of
MQTTAuthProps
- reason_code: MQTTReasonCode
ohmqtt.packet.base module
- class ohmqtt.packet.base.MQTTPacket
Bases:
objectBase class for MQTT packets.
- abstract classmethod decode(flags: int, data: memoryview) MQTTPacket
Decode a packet from bytes.
- abstract encode() bytes
Encode the packet to bytes.
- packet_type: ClassVar[MQTTPacketType]
- props_type: ClassVar[type[MQTTProperties]]
ohmqtt.packet.connect module
CONNECT, CONNACK, and DISCONNECT packets.
- class ohmqtt.packet.connect.MQTTConnAckPacket(reason_code: 'MQTTReasonCode' = <MQTTReasonCode.Success: 0>, session_present: 'bool' = False, properties: 'MQTTConnAckProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTConnAckPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_type: ClassVar[MQTTPacketType] = 2
- properties: MQTTConnAckProps
- props_type
alias of
MQTTConnAckProps
- reason_code: MQTTReasonCode
- session_present: bool
- class ohmqtt.packet.connect.MQTTConnectPacket(client_id: 'str' = '', keep_alive: 'int' = 0, protocol_version: 'int' = 5, clean_start: 'bool' = False, will_props: 'MQTTWillProps' = <factory>, will_topic: 'str' = '', will_payload: 'bytes' = b'', will_qos: 'int' = 0, will_retain: 'bool' = False, username: 'str | None' = None, password: 'bytes | None' = None, properties: 'MQTTConnectProps' = <factory>)
Bases:
MQTTPacket- clean_start: bool
- client_id: str
- classmethod decode(flags: int, data: memoryview) MQTTConnectPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- keep_alive: int
- packet_type: ClassVar[MQTTPacketType] = 1
- password: bytes | None
- properties: MQTTConnectProps
- props_type
alias of
MQTTConnectProps
- protocol_version: int
- username: str | None
- will_payload: bytes
- will_props: MQTTWillProps
- will_qos: int
- will_retain: bool
- will_topic: str
- class ohmqtt.packet.connect.MQTTDisconnectPacket(reason_code: 'MQTTReasonCode' = <MQTTReasonCode.Success: 0>, properties: 'MQTTDisconnectProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTDisconnectPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_type: ClassVar[MQTTPacketType] = 14
- properties: MQTTDisconnectProps
- props_type
alias of
MQTTDisconnectProps
- reason_code: MQTTReasonCode
ohmqtt.packet.ping module
PINGREQ and PINGRESP packets.
- class ohmqtt.packet.ping.MQTTPingReqPacket
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTPingReqPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_type: ClassVar[MQTTPacketType] = 12
- class ohmqtt.packet.ping.MQTTPingRespPacket
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTPingRespPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_type: ClassVar[MQTTPacketType] = 13
ohmqtt.packet.publish module
PUBLISH, PUBACK, PUBREC, PUBREL, and PUBCOMP packets.
- class ohmqtt.packet.publish.MQTTPubAckPacket(packet_id: 'int', reason_code: 'MQTTReasonCode' = <MQTTReasonCode.Success: 0>, properties: 'MQTTPubAckProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTPubAckPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 4
- properties: MQTTPubAckProps
- props_type
alias of
MQTTPubAckProps
- reason_code: MQTTReasonCode
- class ohmqtt.packet.publish.MQTTPubCompPacket(packet_id: 'int', reason_code: 'MQTTReasonCode' = <MQTTReasonCode.Success: 0>, properties: 'MQTTPubCompProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTPubCompPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 7
- properties: MQTTPubCompProps
- props_type
alias of
MQTTPubCompProps
- reason_code: MQTTReasonCode
- class ohmqtt.packet.publish.MQTTPubRecPacket(packet_id: 'int', reason_code: 'MQTTReasonCode' = <MQTTReasonCode.Success: 0>, properties: 'MQTTPubRecProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTPubRecPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 5
- properties: MQTTPubRecProps
- props_type
alias of
MQTTPubRecProps
- reason_code: MQTTReasonCode
- class ohmqtt.packet.publish.MQTTPubRelPacket(packet_id: 'int', reason_code: 'MQTTReasonCode' = <MQTTReasonCode.Success: 0>, properties: 'MQTTPubRelProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTPubRelPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 6
- properties: MQTTPubRelProps
- props_type
alias of
MQTTPubRelProps
- reason_code: MQTTReasonCode
- class ohmqtt.packet.publish.MQTTPublishPacket(topic: 'str' = '', payload: 'bytes' = b'', qos: 'MQTTQoS' = <MQTTQoS.Q0: 0>, retain: 'bool' = False, packet_id: 'int' = 0, properties: 'MQTTPublishProps' = <factory>, dup: 'bool' = False)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTPublishPacket
Decode a packet from bytes.
- dup: bool
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 3
- payload: bytes
- properties: MQTTPublishProps
- props_type
alias of
MQTTPublishProps
- retain: bool
- topic: str
ohmqtt.packet.subscribe module
SUBSCRIBE, SUBACK, UNSUBSCRIBE, and UNSUBACK packets.
- class ohmqtt.packet.subscribe.MQTTSubAckPacket(packet_id: 'int', reason_codes: 'Sequence[MQTTReasonCode]' = <factory>, properties: 'MQTTSubAckProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTSubAckPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 9
- properties: MQTTSubAckProps
- props_type
alias of
MQTTSubAckProps
- reason_codes: Sequence[MQTTReasonCode]
- class ohmqtt.packet.subscribe.MQTTSubscribePacket(topics: 'Sequence[tuple[str, int]]' = <factory>, packet_id: 'int' = 0, properties: 'MQTTSubscribeProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTSubscribePacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 8
- properties: MQTTSubscribeProps
- props_type
alias of
MQTTSubscribeProps
- topics: Sequence[tuple[str, int]]
- class ohmqtt.packet.subscribe.MQTTUnsubAckPacket(packet_id: 'int', reason_codes: 'Sequence[MQTTReasonCode]' = <factory>, properties: 'MQTTUnsubAckProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTUnsubAckPacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 11
- properties: MQTTUnsubAckProps
- props_type
alias of
MQTTUnsubAckProps
- reason_codes: Sequence[MQTTReasonCode]
- class ohmqtt.packet.subscribe.MQTTUnsubscribePacket(topics: 'Sequence[str]' = <factory>, packet_id: 'int' = 0, properties: 'MQTTUnsubscribeProps' = <factory>)
Bases:
MQTTPacket- classmethod decode(flags: int, data: memoryview) MQTTUnsubscribePacket
Decode a packet from bytes.
- encode() bytes
Encode the packet to bytes.
- packet_id: int
- packet_type: ClassVar[MQTTPacketType] = 10
- properties: MQTTUnsubscribeProps
- props_type
alias of
MQTTUnsubscribeProps
- topics: Sequence[str]
Module contents
- ohmqtt.packet.decode_packet(data: bytes) MQTTPacket
Decode a packet from binary data.
The packet must be complete and correctly framed.
- ohmqtt.packet.decode_packet_from_parts(head: int, data: memoryview) MQTTPacket
Finish decoding a packet which has already been split into parts by an incremental reader.