258 lines
5.6 KiB
Go
258 lines
5.6 KiB
Go
package packets
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"errors"
|
|
"io"
|
|
"strings"
|
|
|
|
"badat.dev/maeqtt/v2/mqtt/properties"
|
|
"badat.dev/maeqtt/v2/mqtt/types"
|
|
)
|
|
|
|
type Topic struct {
|
|
Fields []string
|
|
}
|
|
|
|
var multiLevelWildcardNotLast = errors.New("Multi level wildcard isn't the field in a topic")
|
|
|
|
func ParseTopic(topic_name string) (Topic, error) {
|
|
topic := Topic{}
|
|
fields := strings.Split(topic_name, "/")
|
|
for i, field := range fields {
|
|
if field == "#" && len(fields) > i+1 {
|
|
return topic, multiLevelWildcardNotLast
|
|
}
|
|
}
|
|
topic.Fields = fields
|
|
|
|
return topic, nil
|
|
}
|
|
|
|
type TopicFilter struct {
|
|
Topic Topic
|
|
MaxQoS uint
|
|
NoLocal bool
|
|
RetainAsPublished bool
|
|
RetainHandling uint
|
|
}
|
|
|
|
func parseTopicFilter(r *bufio.Reader) (TopicFilter, error) {
|
|
filter := TopicFilter{}
|
|
var err error
|
|
|
|
topic_str, err := types.DecodeUTF8String(r)
|
|
if err != nil {
|
|
return filter, err
|
|
}
|
|
|
|
filter.Topic, err = ParseTopic(topic_str)
|
|
if err != nil {
|
|
return filter, err
|
|
}
|
|
|
|
options, err := types.DecodeBits(r)
|
|
if err != nil {
|
|
return filter, err
|
|
}
|
|
filter.MaxQoS = types.BoolsToUint(options[0], options[1])
|
|
filter.NoLocal = options[2]
|
|
filter.RetainAsPublished = options[3]
|
|
filter.RetainHandling = types.BoolsToUint(options[4], options[5])
|
|
return filter, nil
|
|
}
|
|
|
|
type SubscribePacket struct {
|
|
PacketId uint16
|
|
TopicFilters []TopicFilter
|
|
Properties properties.SubscribePacketProperties
|
|
}
|
|
|
|
func parseSubscribePacket(control controlPacket) (SubscribePacket, error) {
|
|
if control.packetType != PacketTypeSubscribe {
|
|
panic("Wrong packet type for parseSubscribePacket")
|
|
}
|
|
|
|
packet := SubscribePacket{}
|
|
|
|
r := bufio.NewReader(control.reader)
|
|
|
|
if control.flags != 2 {
|
|
return packet, errors.New("Malformed subscription packet")
|
|
}
|
|
|
|
var err error
|
|
packet.PacketId, err = types.DecodeUint16(r)
|
|
if err != nil {
|
|
return packet, err
|
|
}
|
|
|
|
err = properties.ParseProperties(r, packet.Properties.ArrayOf())
|
|
if err != nil {
|
|
return packet, err
|
|
}
|
|
|
|
for err != io.EOF {
|
|
filter, err := parseTopicFilter(r)
|
|
packet.TopicFilters = append(packet.TopicFilters, filter)
|
|
if err != nil {
|
|
return packet, err
|
|
}
|
|
_, err = r.Peek(1)
|
|
if err != nil && err != io.EOF {
|
|
return packet, err
|
|
}
|
|
if err == io.EOF {
|
|
return packet, nil
|
|
}
|
|
}
|
|
|
|
return packet, nil
|
|
}
|
|
|
|
func (p SubscribePacket) Visit(v PacketVisitor) {
|
|
v.VisitSubscribe(p)
|
|
}
|
|
|
|
type SubackReasonCode byte
|
|
|
|
const (
|
|
SubackReasonGrantedQoSZero PubackReasonCode = 0
|
|
SubackReasonGrantedQoSOne = 1
|
|
SubackReasonGrantedQoSTwo = 2
|
|
SubackReasonUnspecified = 128
|
|
SubackReasonImplSpecificError = 131
|
|
SubackReasonNotAuthorized = 135
|
|
SubackReasonTopicFilterInvalid = 143
|
|
SubackReasonPacketIDInUse = 145
|
|
SubackReasonQuotaExceeded = 151
|
|
SubackReasonSharedSubNotSupported = 158
|
|
SubackReasonSubIDUnsupported = 161
|
|
SubackReasonWildcardSubUnsupported = 162
|
|
)
|
|
|
|
type SubAckPacket struct {
|
|
PacketID uint16
|
|
Properties properties.SubackPacketProperties
|
|
Reason SubackReasonCode
|
|
}
|
|
|
|
func (p SubAckPacket) Write(w io.Writer) error {
|
|
buf := bytes.NewBuffer([]byte{})
|
|
err := types.WriteUint16(buf, p.PacketID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = properties.WriteProps(buf, p.Properties.ArrayOf())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = buf.WriteByte(byte(p.Reason))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
conPack := controlPacket{
|
|
packetType: PacketTypeSuback,
|
|
flags: 0,
|
|
reader: buf,
|
|
}
|
|
return conPack.write(w)
|
|
}
|
|
|
|
type UnsubscribePacket struct {
|
|
PacketID uint16
|
|
Topics []Topic
|
|
Properties properties.UnsubscribePacketProperties
|
|
}
|
|
|
|
func parseUnsubscribePacket(control controlPacket) (UnsubscribePacket, error) {
|
|
if control.packetType != PacketTypeUnsubscribe {
|
|
panic("Wrong packet type for parseSubscribePacket")
|
|
}
|
|
|
|
packet := UnsubscribePacket{}
|
|
r := bufio.NewReader(control.reader)
|
|
|
|
if control.flags != 2 {
|
|
return packet, errors.New("Malformed subscription packet")
|
|
}
|
|
|
|
var err error
|
|
packet.PacketID, err = types.DecodeUint16(r)
|
|
if err != nil {
|
|
return packet, err
|
|
}
|
|
|
|
err = properties.ParseProperties(r, packet.Properties.ArrayOf())
|
|
if err != nil {
|
|
return packet, err
|
|
}
|
|
|
|
for err != io.EOF {
|
|
topic_str, err := types.DecodeUTF8String(r)
|
|
if err != nil && err != io.EOF {
|
|
return packet, err
|
|
} else if err == io.EOF {
|
|
return packet, nil
|
|
}
|
|
|
|
filter, err := ParseTopic(topic_str)
|
|
if err != nil {
|
|
return packet, err
|
|
}
|
|
|
|
packet.Topics = append(packet.Topics, filter)
|
|
}
|
|
|
|
return packet, nil
|
|
}
|
|
|
|
func (p UnsubscribePacket) Visit(v PacketVisitor) {
|
|
v.VisitUnsubscribe(p)
|
|
}
|
|
|
|
type UnsubackReasonCode byte
|
|
|
|
const (
|
|
UnsubackReasonSuccess UnsubackReasonCode = 0
|
|
UnSubackReasonUnspecified = 128
|
|
UnSubackReasonImplSpecificError = 131
|
|
UnSubackReasonNotAuthorized = 135
|
|
UnSubackReasonTopicFilterInvalid = 143
|
|
UnSubackReasonPacketIDInUse = 145
|
|
)
|
|
|
|
type UnsubAckPacket struct {
|
|
PacketID uint16
|
|
Properties properties.UnsubackPacketProperties
|
|
Reason UnsubackReasonCode
|
|
}
|
|
|
|
func (p UnsubAckPacket) Write(w io.Writer) error {
|
|
buf := bytes.NewBuffer([]byte{})
|
|
err := types.WriteUint16(buf, p.PacketID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = properties.WriteProps(buf, p.Properties.ArrayOf())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = buf.WriteByte(byte(p.Reason))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
conPack := controlPacket{
|
|
packetType: PacketTypeUnsuback,
|
|
flags: 0,
|
|
reader: buf,
|
|
}
|
|
return conPack.write(w)
|
|
}
|