181 lines
4.5 KiB
Go
181 lines
4.5 KiB
Go
|
package packets
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"errors"
|
||
|
"io"
|
||
|
|
||
|
"badat.dev/maeqtt/v2/mqtt/properties"
|
||
|
"badat.dev/maeqtt/v2/mqtt/types"
|
||
|
)
|
||
|
|
||
|
type TopicFilter struct {
|
||
|
Topic string
|
||
|
MaxQoS uint
|
||
|
NoLocal bool
|
||
|
RetainAsPublished bool
|
||
|
RetainHandling uint
|
||
|
}
|
||
|
|
||
|
func parseTopicFilter(r *bufio.Reader) (TopicFilter, error) {
|
||
|
filter := TopicFilter{}
|
||
|
var err error
|
||
|
|
||
|
filter.Topic, err = types.DecodeUTF8String(r)
|
||
|
if err != nil {
|
||
|
return filter, err
|
||
|
}
|
||
|
|
||
|
options, err := types.DecodeBits(r)
|
||
|
if err != nil {
|
||
|
return filter, err
|
||
|
}
|
||
|
filter.MaxQoS = types.BoolsToUint(options[0], options[1])
|
||
|
filter.NoLocal = options[2]
|
||
|
filter.RetainAsPublished = options[3]
|
||
|
filter.RetainHandling = types.BoolsToUint(options[4], options[5])
|
||
|
return filter, nil
|
||
|
}
|
||
|
|
||
|
// Both sub and unsubscribe packets are identitcal so we can reuse the parsing logic
|
||
|
type SubscriptionPacket struct {
|
||
|
PacketId uint16
|
||
|
TopicFilters []TopicFilter
|
||
|
}
|
||
|
|
||
|
func parseSubscriptionPacket(control controlPacket, props []properties.Property) (SubscriptionPacket, error) {
|
||
|
var err error
|
||
|
r := bufio.NewReader(control.reader)
|
||
|
packet := SubscriptionPacket{}
|
||
|
|
||
|
if control.flags != 2 {
|
||
|
return packet, errors.New("Malformed subscription packet")
|
||
|
}
|
||
|
|
||
|
packet.PacketId, err = types.DecodeUint16(r)
|
||
|
if err != nil {
|
||
|
return packet, err
|
||
|
}
|
||
|
|
||
|
err = properties.ParseProperties(r, props)
|
||
|
if err != nil {
|
||
|
return packet, err
|
||
|
}
|
||
|
|
||
|
for err != io.EOF {
|
||
|
filter, err := parseTopicFilter(r)
|
||
|
packet.TopicFilters = append(packet.TopicFilters, filter)
|
||
|
if err != nil {
|
||
|
return packet, err
|
||
|
}
|
||
|
_, err = r.Peek(1)
|
||
|
if err != nil || err != io.EOF {
|
||
|
return packet, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return packet, nil
|
||
|
}
|
||
|
|
||
|
type SubscribePacket struct {
|
||
|
*SubscriptionPacket
|
||
|
props properties.SubscribePacketProperties
|
||
|
}
|
||
|
|
||
|
func parseSubscribePacket(control controlPacket) (SubscribePacket, error) {
|
||
|
if control.packetType != PacketTypeSubscribe {
|
||
|
panic("Wrong packet type for parseSubscribePacket")
|
||
|
}
|
||
|
|
||
|
pack := SubscribePacket{}
|
||
|
subscriptionPack, err := parseSubscriptionPacket(control, pack.props.ArrayOf())
|
||
|
if err != nil {
|
||
|
return pack, err
|
||
|
}
|
||
|
pack.PacketId = subscriptionPack.PacketId
|
||
|
pack.TopicFilters = subscriptionPack.TopicFilters
|
||
|
return pack, nil
|
||
|
}
|
||
|
|
||
|
type SubackReasonCode byte
|
||
|
|
||
|
const (
|
||
|
SubackReasonGrantedQoSZero PubackReasonCode = 0
|
||
|
SubackReasonGrantedQoSOne = 1
|
||
|
SubackReasonGrantedQoSTwo = 2
|
||
|
SubackReasonUnspecified = 128
|
||
|
SubackReasonImplSpecificError = 131
|
||
|
SubackReasonNotAuthorized = 135
|
||
|
SubackReasonTopicFilterInvalid = 143
|
||
|
SubackReasonPacketIDInUse = 145
|
||
|
SubackReasonQuotaExceeded = 151
|
||
|
SubackReasonSharedSubNotSupported = 151
|
||
|
SubackReasonSubIDUnsupported = 151
|
||
|
SubackReasonWildcardSubUnsupported = 151
|
||
|
)
|
||
|
|
||
|
type SubAckPacket struct {
|
||
|
PacketID uint16
|
||
|
Properties properties.SubackPacketProperties
|
||
|
Reason SubackReasonCode
|
||
|
}
|
||
|
|
||
|
|
||
|
func (p SubAckPacket) Write(w io.Writer) error {
|
||
|
resp := pubRespPacket{
|
||
|
PacketType: PacketTypeSuback,
|
||
|
PacketID: p.PacketID,
|
||
|
Properties: p.Properties.ArrayOf(),
|
||
|
Reason: byte(p.Reason),
|
||
|
}
|
||
|
return resp.Write(w)
|
||
|
}
|
||
|
|
||
|
type UnsubscribePacket struct {
|
||
|
*SubscriptionPacket
|
||
|
props properties.UnsubscribePacketProperties
|
||
|
}
|
||
|
|
||
|
func parseUnsubscribePacket(control controlPacket) (UnsubscribePacket, error) {
|
||
|
if control.packetType != PacketTypeUnsubscribe {
|
||
|
panic("Wrong packet type for parseSubscribePacket")
|
||
|
}
|
||
|
|
||
|
pack := UnsubscribePacket{}
|
||
|
subscriptionPack, err := parseSubscriptionPacket(control, pack.props.ArrayOf())
|
||
|
if err != nil {
|
||
|
return pack, err
|
||
|
}
|
||
|
pack.PacketId = subscriptionPack.PacketId
|
||
|
pack.TopicFilters = subscriptionPack.TopicFilters
|
||
|
return pack, nil
|
||
|
}
|
||
|
|
||
|
type UnsubackReasonCode byte
|
||
|
|
||
|
const (
|
||
|
UnsubackReasonSuccess PubackReasonCode = 0
|
||
|
UnSubackReasonUnspecified = 128
|
||
|
UnSubackReasonImplSpecificError = 131
|
||
|
UnSubackReasonNotAuthorized = 135
|
||
|
UnSubackReasonTopicFilterInvalid = 143
|
||
|
UnSubackReasonPacketIDInUse = 145
|
||
|
)
|
||
|
|
||
|
type UnsubAckPacket struct {
|
||
|
PacketID uint16
|
||
|
Properties properties.UnsubackPacketProperties
|
||
|
Reason UnsubackReasonCode
|
||
|
}
|
||
|
|
||
|
|
||
|
func (p UnsubAckPacket) Write(w io.Writer) error {
|
||
|
resp := pubRespPacket{
|
||
|
PacketType: PacketTypeUnsuback,
|
||
|
PacketID: p.PacketID,
|
||
|
Properties: p.Properties.ArrayOf(),
|
||
|
Reason: byte(p.Reason),
|
||
|
}
|
||
|
return resp.Write(w)
|
||
|
}
|