maeqtt/mqtt/packets/Subscriptions.go
2021-10-17 21:09:44 +02:00

258 lines
5.6 KiB
Go

package packets
import (
"bufio"
"bytes"
"errors"
"io"
"strings"
"badat.dev/maeqtt/v2/mqtt/properties"
"badat.dev/maeqtt/v2/mqtt/types"
)
type Topic struct {
Fields []string
}
var multiLevelWildcardNotLast = errors.New("Multi level wildcard isn't the field in a topic")
func ParseTopic(topic_name string) (Topic, error) {
topic := Topic{}
fields := strings.Split(topic_name, "/")
for i, field := range fields {
if field == "#" && len(fields) > i+1 {
return topic, multiLevelWildcardNotLast
}
}
topic.Fields = fields
return topic, nil
}
type TopicFilter struct {
Topic Topic
MaxQoS uint
NoLocal bool
RetainAsPublished bool
RetainHandling uint
}
func parseTopicFilter(r *bufio.Reader) (TopicFilter, error) {
filter := TopicFilter{}
var err error
topic_str, err := types.DecodeUTF8String(r)
if err != nil {
return filter, err
}
filter.Topic, err = ParseTopic(topic_str)
if err != nil {
return filter, err
}
options, err := types.DecodeBits(r)
if err != nil {
return filter, err
}
filter.MaxQoS = types.BoolsToUint(options[0], options[1])
filter.NoLocal = options[2]
filter.RetainAsPublished = options[3]
filter.RetainHandling = types.BoolsToUint(options[4], options[5])
return filter, nil
}
type SubscribePacket struct {
PacketId uint16
TopicFilters []TopicFilter
Properties properties.SubscribePacketProperties
}
func parseSubscribePacket(control controlPacket) (SubscribePacket, error) {
if control.packetType != PacketTypeSubscribe {
panic("Wrong packet type for parseSubscribePacket")
}
packet := SubscribePacket{}
r := bufio.NewReader(control.reader)
if control.flags != 2 {
return packet, errors.New("Malformed subscription packet")
}
var err error
packet.PacketId, err = types.DecodeUint16(r)
if err != nil {
return packet, err
}
err = properties.ParseProperties(r, packet.Properties.ArrayOf())
if err != nil {
return packet, err
}
for err != io.EOF {
filter, err := parseTopicFilter(r)
packet.TopicFilters = append(packet.TopicFilters, filter)
if err != nil {
return packet, err
}
_, err = r.Peek(1)
if err != nil && err != io.EOF {
return packet, err
}
if err == io.EOF {
return packet, nil
}
}
return packet, nil
}
func (p SubscribePacket) Visit(v PacketVisitor) {
v.VisitSubscribe(p)
}
type SubackReasonCode byte
const (
SubackReasonGrantedQoSZero SubackReasonCode = 0
SubackReasonGrantedQoSOne SubackReasonCode = 1
SubackReasonGrantedQoSTwo SubackReasonCode = 2
SubackReasonUnspecified SubackReasonCode = 128
SubackReasonImplSpecificError SubackReasonCode = 131
SubackReasonNotAuthorized SubackReasonCode = 135
SubackReasonTopicFilterInvalid SubackReasonCode = 143
SubackReasonPacketIDInUse SubackReasonCode = 145
SubackReasonQuotaExceeded SubackReasonCode = 151
SubackReasonSharedSubNotSupported SubackReasonCode = 158
SubackReasonSubIDUnsupported SubackReasonCode = 161
SubackReasonWildcardSubUnsupported SubackReasonCode = 162
)
type SubAckPacket struct {
PacketID uint16
Properties properties.SubackPacketProperties
Reason SubackReasonCode
}
func (p SubAckPacket) Write(w io.Writer) error {
buf := bytes.NewBuffer([]byte{})
err := types.WriteUint16(buf, p.PacketID)
if err != nil {
return err
}
err = properties.WriteProps(buf, p.Properties.ArrayOf())
if err != nil {
return err
}
err = buf.WriteByte(byte(p.Reason))
if err != nil {
return err
}
conPack := controlPacket{
packetType: PacketTypeSuback,
flags: 0,
reader: buf,
}
return conPack.write(w)
}
type UnsubscribePacket struct {
PacketID uint16
Topics []Topic
Properties properties.UnsubscribePacketProperties
}
func parseUnsubscribePacket(control controlPacket) (UnsubscribePacket, error) {
if control.packetType != PacketTypeUnsubscribe {
panic("Wrong packet type for parseSubscribePacket")
}
packet := UnsubscribePacket{}
r := bufio.NewReader(control.reader)
if control.flags != 2 {
return packet, errors.New("Malformed subscription packet")
}
var err error
packet.PacketID, err = types.DecodeUint16(r)
if err != nil {
return packet, err
}
err = properties.ParseProperties(r, packet.Properties.ArrayOf())
if err != nil {
return packet, err
}
for err != io.EOF {
topic_str, err := types.DecodeUTF8String(r)
if err != nil && err != io.EOF {
return packet, err
} else if err == io.EOF {
return packet, nil
}
filter, err := ParseTopic(topic_str)
if err != nil {
return packet, err
}
packet.Topics = append(packet.Topics, filter)
}
return packet, nil
}
func (p UnsubscribePacket) Visit(v PacketVisitor) {
v.VisitUnsubscribe(p)
}
type UnsubackReasonCode byte
const (
UnsubackReasonSuccess UnsubackReasonCode = 0
UnSubackReasonUnspecified UnsubackReasonCode = 128
UnSubackReasonImplSpecificError UnsubackReasonCode = 131
UnSubackReasonNotAuthorized UnsubackReasonCode = 135
UnSubackReasonTopicFilterInvalid UnsubackReasonCode = 143
UnSubackReasonPacketIDInUse UnsubackReasonCode = 145
)
type UnsubAckPacket struct {
PacketID uint16
Properties properties.UnsubackPacketProperties
Reason UnsubackReasonCode
}
func (p UnsubAckPacket) Write(w io.Writer) error {
buf := bytes.NewBuffer([]byte{})
err := types.WriteUint16(buf, p.PacketID)
if err != nil {
return err
}
err = properties.WriteProps(buf, p.Properties.ArrayOf())
if err != nil {
return err
}
err = buf.WriteByte(byte(p.Reason))
if err != nil {
return err
}
conPack := controlPacket{
packetType: PacketTypeUnsuback,
flags: 0,
reader: buf,
}
return conPack.write(w)
}