commit a927a5ac1a982d30210b63afb017ec88be6563d1 Author: bad Date: Thu Aug 26 15:08:24 2021 +0200 Basic packet parsing diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..1d953f4 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use nix diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7c145b2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.direnv/ +maeqtt +mqtt/properties/GeneratedProperties.go +*.test +*.exe +*.out +*.exe~ +*.dll +*.so +*.dylib diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c48e2ce --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module badat.dev/maeqtt/v2 + +go 1.16 + +require github.com/gdexlab/go-render v1.0.1 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..349e9bd --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/gdexlab/go-render v1.0.1 h1:rxqB3vo5s4n1kF0ySmoNeSPRYkEsyHgln4jFIQY7v0U= +github.com/gdexlab/go-render v1.0.1/go.mod h1:wRi5nW2qfjiGj4mPukH4UV0IknS1cHD4VgFTmJX5JzM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..8ccef38 --- /dev/null +++ b/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "bufio" + "log" + "net" + + "badat.dev/maeqtt/v2/mqtt/packets" + "badat.dev/maeqtt/v2/mqtt/properties" + "github.com/gdexlab/go-render/render" // For testing +) + +func main() { + listen_addr := ":1883" + listener, err := net.Listen("tcp", listen_addr) + + if err != nil { + log.Fatal(err) + } + + for { + conn, err := listener.Accept() + if err != nil { + log.Println("Failed accepting connection ", err) + } + go handleConnection(conn) + } +} + +func handleConnection(con net.Conn) { + defer closeConnection(con) + + for { + reader := bufio.NewReader(con) + packet, err := packets.ReadPacket(reader) + if err != nil { + log.Println("Error reading packet ", err) + break + } + log.Println(render.AsCode(packet)) + clientId := "aa" + resp := packets.ConnackPacket{ + ResonCode: packets.ConnectReasonCodeSuccess, + SessionPresent: false, + Properties: properties.ConnackPacketProperties{ + AssignedClientIdentifier: properties.AssignedClientIdentifier{Value: &clientId}, + }, + } + err = resp.Write(con) + log.Println("Wrote response") + if err != nil { + log.Println("Error writing response ", err) + break + } + } + +} + +func closeConnection(con net.Conn) { + err := con.Close() + if err != nil { + log.Println("Failed to close connection", err) + } +} diff --git a/mqtt/GeneratedProperties.go b/mqtt/GeneratedProperties.go new file mode 100644 index 0000000..7a27990 --- /dev/null +++ b/mqtt/GeneratedProperties.go @@ -0,0 +1,685 @@ +package mqtt +// This code has been generated with the genProps.py script. Do not modify + + +import "bufio" + +type PayloadFormatIndicator struct { + value *byte +} + +func (p PayloadFormatIndicator) id() int { + return 1 +} + +func (p *PayloadFormatIndicator) parse(r *bufio.Reader) error { + val, err := r.ReadByte() + if err != nil { + return err + } + p.value = &val + return nil +} + + +type MessageExpiryInterval struct { + value *uint32 +} + +func (p MessageExpiryInterval) id() int { + return 2 +} + +func (p *MessageExpiryInterval) parse(r *bufio.Reader) error { + val, err := decodeUint32(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type ContentType struct { + value *string +} + +func (p ContentType) id() int { + return 3 +} + +func (p *ContentType) parse(r *bufio.Reader) error { + val, err := decodeUTF8String(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type ResponseTopic struct { + value *string +} + +func (p ResponseTopic) id() int { + return 8 +} + +func (p *ResponseTopic) parse(r *bufio.Reader) error { + val, err := decodeUTF8String(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type CorrelationData struct { + value *[]byte +} + +func (p CorrelationData) id() int { + return 9 +} + +func (p *CorrelationData) parse(r *bufio.Reader) error { + val, err := decodeBinaryData(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type SubscriptionIdentifier struct { + value *int +} + +func (p SubscriptionIdentifier) id() int { + return 11 +} + +func (p *SubscriptionIdentifier) parse(r *bufio.Reader) error { + val, err := decodeVariableByteInt(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type SessionExpiryInterval struct { + value *uint32 +} + +func (p SessionExpiryInterval) id() int { + return 17 +} + +func (p *SessionExpiryInterval) parse(r *bufio.Reader) error { + val, err := decodeUint32(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type AssignedClientIdentifier struct { + value *string +} + +func (p AssignedClientIdentifier) id() int { + return 18 +} + +func (p *AssignedClientIdentifier) parse(r *bufio.Reader) error { + val, err := decodeUTF8String(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type ServerKeepAlive struct { + value *uint16 +} + +func (p ServerKeepAlive) id() int { + return 19 +} + +func (p *ServerKeepAlive) parse(r *bufio.Reader) error { + val, err := decodeUint16(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type AuthenticationMethod struct { + value *string +} + +func (p AuthenticationMethod) id() int { + return 21 +} + +func (p *AuthenticationMethod) parse(r *bufio.Reader) error { + val, err := decodeUTF8String(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type AuthenticationData struct { + value *[]byte +} + +func (p AuthenticationData) id() int { + return 22 +} + +func (p *AuthenticationData) parse(r *bufio.Reader) error { + val, err := decodeBinaryData(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type RequestProblemInformation struct { + value *byte +} + +func (p RequestProblemInformation) id() int { + return 23 +} + +func (p *RequestProblemInformation) parse(r *bufio.Reader) error { + val, err := r.ReadByte() + if err != nil { + return err + } + p.value = &val + return nil +} + + +type WillDelayInterval struct { + value *uint32 +} + +func (p WillDelayInterval) id() int { + return 24 +} + +func (p *WillDelayInterval) parse(r *bufio.Reader) error { + val, err := decodeUint32(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type RequestResponseInformation struct { + value *byte +} + +func (p RequestResponseInformation) id() int { + return 25 +} + +func (p *RequestResponseInformation) parse(r *bufio.Reader) error { + val, err := r.ReadByte() + if err != nil { + return err + } + p.value = &val + return nil +} + + +type ResponseInformation struct { + value *string +} + +func (p ResponseInformation) id() int { + return 26 +} + +func (p *ResponseInformation) parse(r *bufio.Reader) error { + val, err := decodeUTF8String(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type ServerReference struct { + value *string +} + +func (p ServerReference) id() int { + return 28 +} + +func (p *ServerReference) parse(r *bufio.Reader) error { + val, err := decodeUTF8String(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type ReasonString struct { + value *string +} + +func (p ReasonString) id() int { + return 31 +} + +func (p *ReasonString) parse(r *bufio.Reader) error { + val, err := decodeUTF8String(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type ReceiveMaximum struct { + value *uint16 +} + +func (p ReceiveMaximum) id() int { + return 33 +} + +func (p *ReceiveMaximum) parse(r *bufio.Reader) error { + val, err := decodeUint16(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type TopicAliasMaximum struct { + value *uint16 +} + +func (p TopicAliasMaximum) id() int { + return 34 +} + +func (p *TopicAliasMaximum) parse(r *bufio.Reader) error { + val, err := decodeUint16(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type TopicAlias struct { + value *uint16 +} + +func (p TopicAlias) id() int { + return 35 +} + +func (p *TopicAlias) parse(r *bufio.Reader) error { + val, err := decodeUint16(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type MaximumQoS struct { + value *byte +} + +func (p MaximumQoS) id() int { + return 36 +} + +func (p *MaximumQoS) parse(r *bufio.Reader) error { + val, err := r.ReadByte() + if err != nil { + return err + } + p.value = &val + return nil +} + + +type RetainAvailable struct { + value *byte +} + +func (p RetainAvailable) id() int { + return 37 +} + +func (p *RetainAvailable) parse(r *bufio.Reader) error { + val, err := r.ReadByte() + if err != nil { + return err + } + p.value = &val + return nil +} + + +type MaximumPacketSize struct { + value *uint32 +} + +func (p MaximumPacketSize) id() int { + return 39 +} + +func (p *MaximumPacketSize) parse(r *bufio.Reader) error { + val, err := decodeUint32(r) + if err != nil { + return err + } + p.value = &val + return nil +} + + +type WildcardSubscriptionAvailable struct { + value *byte +} + +func (p WildcardSubscriptionAvailable) id() int { + return 40 +} + +func (p *WildcardSubscriptionAvailable) parse(r *bufio.Reader) error { + val, err := r.ReadByte() + if err != nil { + return err + } + p.value = &val + return nil +} + + +type SubscriptionIdentifierAvailable struct { + value *byte +} + +func (p SubscriptionIdentifierAvailable) id() int { + return 41 +} + +func (p *SubscriptionIdentifierAvailable) parse(r *bufio.Reader) error { + val, err := r.ReadByte() + if err != nil { + return err + } + p.value = &val + return nil +} + + +type SharedSubscriptionAvailable struct { + value *byte +} + +func (p SharedSubscriptionAvailable) id() int { + return 42 +} + +func (p *SharedSubscriptionAvailable) parse(r *bufio.Reader) error { + val, err := r.ReadByte() + if err != nil { + return err + } + p.value = &val + return nil +} + +type PublishPacketProperties struct { +PayloadFormatIndicator PayloadFormatIndicator +MessageExpiryInterval MessageExpiryInterval +ContentType ContentType +ResponseTopic ResponseTopic +CorrelationData CorrelationData +SubscriptionIdentifier SubscriptionIdentifier +TopicAlias TopicAlias +UserProperty UserProperty +} +func (p *PublishPacketProperties) arrayOf() []Property { +return []Property { +&p.PayloadFormatIndicator, +&p.MessageExpiryInterval, +&p.ContentType, +&p.ResponseTopic, +&p.CorrelationData, +&p.SubscriptionIdentifier, +&p.TopicAlias, +&p.UserProperty, +} +} +type WillProperties struct { +PayloadFormatIndicator PayloadFormatIndicator +MessageExpiryInterval MessageExpiryInterval +ContentType ContentType +ResponseTopic ResponseTopic +CorrelationData CorrelationData +WillDelayInterval WillDelayInterval +UserProperty UserProperty +} +func (p *WillProperties) arrayOf() []Property { +return []Property { +&p.PayloadFormatIndicator, +&p.MessageExpiryInterval, +&p.ContentType, +&p.ResponseTopic, +&p.CorrelationData, +&p.WillDelayInterval, +&p.UserProperty, +} +} +type SubscribePacketProperties struct { +SubscriptionIdentifier SubscriptionIdentifier +UserProperty UserProperty +} +func (p *SubscribePacketProperties) arrayOf() []Property { +return []Property { +&p.SubscriptionIdentifier, +&p.UserProperty, +} +} +type ConnectPacketProperties struct { +SessionExpiryInterval SessionExpiryInterval +AuthenticationMethod AuthenticationMethod +AuthenticationData AuthenticationData +RequestProblemInformation RequestProblemInformation +RequestResponseInformation RequestResponseInformation +ReceiveMaximum ReceiveMaximum +TopicAliasMaximum TopicAliasMaximum +UserProperty UserProperty +MaximumPacketSize MaximumPacketSize +} +func (p *ConnectPacketProperties) arrayOf() []Property { +return []Property { +&p.SessionExpiryInterval, +&p.AuthenticationMethod, +&p.AuthenticationData, +&p.RequestProblemInformation, +&p.RequestResponseInformation, +&p.ReceiveMaximum, +&p.TopicAliasMaximum, +&p.UserProperty, +&p.MaximumPacketSize, +} +} +type ConnackPacketProperties struct { +SessionExpiryInterval SessionExpiryInterval +AssignedClientIdentifier AssignedClientIdentifier +ServerKeepAlive ServerKeepAlive +AuthenticationMethod AuthenticationMethod +AuthenticationData AuthenticationData +ResponseInformation ResponseInformation +ServerReference ServerReference +ReasonString ReasonString +ReceiveMaximum ReceiveMaximum +TopicAliasMaximum TopicAliasMaximum +MaximumQoS MaximumQoS +RetainAvailable RetainAvailable +UserProperty UserProperty +MaximumPacketSize MaximumPacketSize +WildcardSubscriptionAvailable WildcardSubscriptionAvailable +SubscriptionIdentifierAvailable SubscriptionIdentifierAvailable +SharedSubscriptionAvailable SharedSubscriptionAvailable +} +func (p *ConnackPacketProperties) arrayOf() []Property { +return []Property { +&p.SessionExpiryInterval, +&p.AssignedClientIdentifier, +&p.ServerKeepAlive, +&p.AuthenticationMethod, +&p.AuthenticationData, +&p.ResponseInformation, +&p.ServerReference, +&p.ReasonString, +&p.ReceiveMaximum, +&p.TopicAliasMaximum, +&p.MaximumQoS, +&p.RetainAvailable, +&p.UserProperty, +&p.MaximumPacketSize, +&p.WildcardSubscriptionAvailable, +&p.SubscriptionIdentifierAvailable, +&p.SharedSubscriptionAvailable, +} +} +type DisconnectPacketProperties struct { +SessionExpiryInterval SessionExpiryInterval +ServerReference ServerReference +ReasonString ReasonString +UserProperty UserProperty +} +func (p *DisconnectPacketProperties) arrayOf() []Property { +return []Property { +&p.SessionExpiryInterval, +&p.ServerReference, +&p.ReasonString, +&p.UserProperty, +} +} +type AuthPacketProperties struct { +AuthenticationMethod AuthenticationMethod +AuthenticationData AuthenticationData +ReasonString ReasonString +UserProperty UserProperty +} +func (p *AuthPacketProperties) arrayOf() []Property { +return []Property { +&p.AuthenticationMethod, +&p.AuthenticationData, +&p.ReasonString, +&p.UserProperty, +} +} +type PubackPacketProperties struct { +ReasonString ReasonString +UserProperty UserProperty +} +func (p *PubackPacketProperties) arrayOf() []Property { +return []Property { +&p.ReasonString, +&p.UserProperty, +} +} +type PubrecPacketProperties struct { +ReasonString ReasonString +UserProperty UserProperty +} +func (p *PubrecPacketProperties) arrayOf() []Property { +return []Property { +&p.ReasonString, +&p.UserProperty, +} +} +type PubrelPacketProperties struct { +ReasonString ReasonString +UserProperty UserProperty +} +func (p *PubrelPacketProperties) arrayOf() []Property { +return []Property { +&p.ReasonString, +&p.UserProperty, +} +} +type PubcompPacketProperties struct { +ReasonString ReasonString +UserProperty UserProperty +} +func (p *PubcompPacketProperties) arrayOf() []Property { +return []Property { +&p.ReasonString, +&p.UserProperty, +} +} +type SubackPacketProperties struct { +ReasonString ReasonString +UserProperty UserProperty +} +func (p *SubackPacketProperties) arrayOf() []Property { +return []Property { +&p.ReasonString, +&p.UserProperty, +} +} +type UnsubackPacketProperties struct { +ReasonString ReasonString +UserProperty UserProperty +} +func (p *UnsubackPacketProperties) arrayOf() []Property { +return []Property { +&p.ReasonString, +&p.UserProperty, +} +} +type UnsubscribePacketProperties struct { +UserProperty UserProperty +} +func (p *UnsubscribePacketProperties) arrayOf() []Property { +return []Property { +&p.UserProperty, +} +} diff --git a/mqtt/packets/Connack.go b/mqtt/packets/Connack.go new file mode 100644 index 0000000..a23e078 --- /dev/null +++ b/mqtt/packets/Connack.go @@ -0,0 +1,70 @@ +package packets + +import ( + "bytes" + "io" + + "badat.dev/maeqtt/v2/mqtt/properties" + "badat.dev/maeqtt/v2/mqtt/types" +) + +type ConnectReasonCode byte + +const ( + ConnectReasonCodeSuccess ConnectReasonCode = 0 + ConnectReasonCodeUnspecified = 128 + ConnectReasonCodeMalformedPacket = 129 + ConnectReasonCodeProtocolError = 130 + ConnectReasonCodeImplErorr = 131 + ConnectReasonCodeUnsupportedProtoVer = 132 + ConnectReasonCodeClientIDNotValid = 133 + ConnectReasonCodeBadUsernameOrPassword = 134 + ConnectReasonCodeNotAuthorized = 135 + ConnectReasonCodeServerUnavaliable = 136 + ConnectReasonCodeServerBusy = 137 + ConnectReasonCodeBanned = 138 + ConnectReasonCodeBadAuthenticationMethod = 140 + ConnectReasonCodeTopicNameInvalid = 144 + ConnectReasonCodePacketTooLarge = 149 + ConnectReasonCodeQuotaExceeded = 151 + ConnectReasonCodePayloadFormatInvalid = 153 + ConnectReasonCodeRetainNotSupported = 154 + ConnectReasonCodeQoSNotSupported = 155 + ConnectReasonCodeUseAnotherServer = 156 + ConnectReasonCodeServerMoved = 157 + ConnectReasonCodeConnectionRateExceeded = 159 +) + +type ConnackPacket struct { + ResonCode ConnectReasonCode + SessionPresent bool + Properties properties.ConnackPacketProperties +} + +func (p ConnackPacket) Write(w io.Writer) error { + buf := bytes.NewBuffer([]byte{}) + var ackFlags [8]bool + ackFlags[0] = p.SessionPresent + err := types.WriteBits(buf, ackFlags) + if err != nil { + return err + } + + err = buf.WriteByte(byte(p.ResonCode)) + if err != nil { + return err + } + + err = properties.WriteProps(buf, p.Properties.ArrayOf()) + if err != nil { + return err + } + + conPack := controlPacket{ + packetType: PacketTypeConnack, + flags: 0, + reader: buf, + } + + return conPack.write(w) +} diff --git a/mqtt/packets/Connect.go b/mqtt/packets/Connect.go new file mode 100644 index 0000000..f7560e8 --- /dev/null +++ b/mqtt/packets/Connect.go @@ -0,0 +1,124 @@ +package packets + +import ( + "badat.dev/maeqtt/v2/mqtt/properties" + "badat.dev/maeqtt/v2/mqtt/types" + "bufio" + "errors" +) + +type Will struct { + retain bool + properties properties.WillProperties +} + +type ConnectPacket struct { + ClientId *string + Username *string + Password *[]byte + CleanStart bool + KeepAliveInterval uint16 + + Will *Will // Optional + Properties properties.ConnectPacketProperties +} + +func (c ConnectPacket) visit(visitor PacketVisitor) { + visitor.visitConnect(c) +} + +func parseConnectPacket(control controlPacket) (ConnectPacket, error) { + packet := ConnectPacket{} + + if control.packetType != PacketTypeConnect { + panic("Wrong packet type for parseConnectPacket") + } + if control.flags != 0 { + return packet, errors.New("Malformed connect packet") + } + + r := bufio.NewReader(control.reader) + + protocolName, err := types.DecodeUTF8String(r) + if err != nil { + return packet, err + } + if protocolName != "MQTT" { + return ConnectPacket{}, errors.New("Malformed connect packet, invalid protocol name") + } + + protocolVersion, err := r.ReadByte() + if err != nil { + return ConnectPacket{}, err + } + if protocolVersion != 5 { + return ConnectPacket{}, errors.New("Malformed connect packet, unsupported protocol version") + } + + connectFlags, err := types.DecodeBits(r) + if err != nil { + return packet, err + } + userNameFlag := connectFlags[7] + passwordFlag := connectFlags[6] + willRetainFlag := connectFlags[5] + willFlag := connectFlags[2] + packet.CleanStart = connectFlags[1] + reserved := connectFlags[0] + + if reserved { + return ConnectPacket{}, errors.New("Malformed connect packet, reserved connect flag set") + } + + QOSLevel := types.BoolToUint(connectFlags[4])*2 + types.BoolToUint(connectFlags[3]) + if QOSLevel > 3 { + return ConnectPacket{}, errors.New("Malformed connect packet, invalid QOS Level") + } + + keepAlive, err := types.DecodeUint16(r) + if err != nil { + return packet, err + } + packet.KeepAliveInterval = keepAlive + + err = properties.ParseProperties(r, packet.Properties.ArrayOf()) + if err != nil { + return packet, err + } + + // Parse payload(3.1.3) + clientId, err := types.DecodeUTF8String(r) + if err != nil { + return packet, err + } + packet.ClientId = &clientId + + if willFlag { + packet.Will = &Will{} + err = properties.ParseProperties(r, packet.Will.properties.ArrayOf()) + if err != nil { + return packet, err + } + packet.Will.retain = willRetainFlag + } + + var username string + if userNameFlag { + username, err = types.DecodeUTF8String(r) + if err != nil { + return packet, err + } + packet.Username = &username + } + + var password []byte + if passwordFlag { + password, err = types.DecodeBinaryData(r) + if err != nil { + return packet, err + } + packet.Password = &password + } + + return packet, nil +} diff --git a/mqtt/packets/ControlPacket.go b/mqtt/packets/ControlPacket.go new file mode 100644 index 0000000..9c36317 --- /dev/null +++ b/mqtt/packets/ControlPacket.go @@ -0,0 +1,32 @@ +package packets + +import ( + "bytes" + "io" + + "badat.dev/maeqtt/v2/mqtt/types" +) + +type controlPacket struct { + packetType PacketType + flags uint + reader io.Reader +} + +func (c controlPacket) write(w io.Writer) error { + buf := bytes.NewBuffer([]byte{}) + + var fixedHeader byte = (byte(c.packetType) << 4 & 0b11110000) + (byte(c.flags) & 0b1111) + _, err := buf.Write([]byte{fixedHeader}) + if err != nil { + return err + } + data, err := io.ReadAll(c.reader) + if err != nil { + return err + } + + types.WriteDataWithVarIntLen(buf, data) + w.Write(buf.Bytes()) + return nil +} diff --git a/mqtt/packets/Disconnect.go b/mqtt/packets/Disconnect.go new file mode 100644 index 0000000..47637e9 --- /dev/null +++ b/mqtt/packets/Disconnect.go @@ -0,0 +1,104 @@ +package packets + +import ( + "bufio" + "bytes" + "errors" + "io" + + "badat.dev/maeqtt/v2/mqtt/properties" +) + +type DisconnectReasonCode byte + +const ( + DisconnectReasonCodeNormal DisconnectReasonCode = 0 + DisconnectReasonCodeWithWill DisconnectReasonCode = 0 + DisconnectReasonCodeUnspecified = 128 + DisconnectReasonCodeMalformedPacket = 129 + DisconnectReasonCodeProtocolError = 130 + DisconnectReasonCodeImplErorr = 131 + DisconnectReasonCodeNotAuthorized = 135 + DisconnectReasonCodeServerBusy = 137 + DisconnectReasonServerShuttingDown = 139 + DisconnectReasonCodeKeepAliveTimeout = 141 + DisconnectReasonCodeSessionTakenOver = 142 + DisconnectReasonCodeTopicFilterInvalid = 143 + DisconnectReasonCodeTopicNameInvalid = 144 + DisconnectReasonCodeReceiveMaxiumExceeded = 147 + DisconnectReasonCodeTopicAliasInvalid = 148 + DisconnectReasonCodePacketTooLarge = 149 + DisconnectReasonCodeMessageRateTooHigh = 150 + DisconnectReasonCodeQuotaExceeded = 151 + DisconnectReasonCodeAdminiAction = 152 + DisconnectReasonCodePayloadFormatInvalid = 153 + DisconnectReasonCodeRetainNotSupported = 154 + DisconnectReasonCodeQoSNotSupported = 155 + DisconnectReasonCodeUseAnotherServer = 156 + DisconnectReasonCodeServerMoved = 157 + DisconnectReasonCodeSharedSubscriptionNotSupported = 158 + DisconnectReasonCodeConnectionRateExceeded = 159 + DisconnectReasonCodeMaximumConnectTime = 160 + DisconnectReasonCodeSubscriptionIdNotSupported = 161 + DisconnectReasonCodeWildcardSubscriptionNotSupported = 162 +) + +type DisconnectPacket struct { + ReasonCode DisconnectReasonCode + Properties properties.DisconnectPacketProperties +} + +func parseDisconnectPacket(control controlPacket) (DisconnectPacket, error) { + packet := DisconnectPacket{} + + if control.packetType != PacketTypeDisconnect { + panic("Wrong packet type for parseDisconnect") + } + if control.flags != 0 { + return packet, errors.New("Malformed disconnect packet") + } + + r := bufio.NewReader(control.reader) + + + // If there is less then a byte in the reader assume the reason code == 0 + reason,err := r.ReadByte() + if err == io.EOF { + reason = 0 + } else if err != nil { + return packet, err + } + packet.ReasonCode = DisconnectReasonCode(reason) + + // If there are less than 2 bytes remaining in the reader assume that the packet has no properties + _, err = r.Peek(2) + if err == nil { + err = properties.ParseProperties(r,packet.Properties.ArrayOf()) + } else if err != io.EOF { + return packet, err + } else if err == io.EOF { + err = nil + } + + return packet, err +} + +func (p DisconnectPacket) Write(w io.Writer) error { + buf := bytes.NewBuffer([]byte{}) + buf.WriteByte(byte(p.ReasonCode)) + err := properties.WriteProps(w, p.Properties.ArrayOf()) + if err != nil { + return err + } + + control := controlPacket { + packetType: PacketTypeDisconnect, + flags: 0, + reader: buf, + } + return control.write(w) +} + +func (p DisconnectPacket) visit(v PacketVisitor) { + v.visitDisconnect(p) +} diff --git a/mqtt/packets/Ping.go b/mqtt/packets/Ping.go new file mode 100644 index 0000000..89f4b0c --- /dev/null +++ b/mqtt/packets/Ping.go @@ -0,0 +1,35 @@ +package packets + +import ( + "bytes" + "errors" + "io" +) + + +type PingreqPacket struct {} + +func ParesPingreq(control controlPacket) (PingreqPacket, error) { + packet := PingreqPacket{} + + if control.packetType != PacketTypePingreq { + panic("Wrong packet type for parsePingreq") + } + if control.flags != 0 { + return packet, errors.New("Malformed connect packet") + } + + return packet, nil +} + +type PingrespPacket struct {} + +func (p PingrespPacket) Write(w io.Writer) error { + control := controlPacket { + packetType: PacketTypePingresp, + flags: 0, + reader: bytes.NewReader([]byte{}), + } + + return control.write(w) +} diff --git a/mqtt/packets/PubAckRecRel.go b/mqtt/packets/PubAckRecRel.go new file mode 100644 index 0000000..15355ab --- /dev/null +++ b/mqtt/packets/PubAckRecRel.go @@ -0,0 +1,92 @@ +package packets + +import ( + "io" + "badat.dev/maeqtt/v2/mqtt/properties" +) + + +type PubackReasonCode byte + +const ( + PubackReasonCodeSuccess PubackReasonCode = 0 + PubackReasonCodeNoMatchingSubscribers = 16 + PubackReasonCodeUnspecifiedError = 128 + PubackReasonCodeImplementationSpecyficEror = 131 + PubackReasonCodeNotAuthorized = 135 + PubackReasonCodeTopicNameInvalid = 144 + PubackReasonCodePacketIDInUse = 145 + PubackReasonCodeQuotaExceeded = 151 + PubackReasonCodePayloadFormatInvalid = 153 +) + +type PubackPacket struct { + PacketID uint16 + Properties properties.PubackPacketProperties + Reason PubackReasonCode +} + +func (p PubackPacket) Write(w io.Writer) error { + resp := pubRespPacket{ + PacketType: PacketTypePuback, + PacketID: p.PacketID, + Properties: p.Properties.ArrayOf(), + Reason: byte(p.Reason), + } + return resp.Write(w) +} + +type PubrecPacket struct { + PacketID uint16 + Properties properties.PubrecPacketProperties + Reason PubackReasonCode +} + +func (p PubrecPacket) Write(w io.Writer) error { + resp := pubRespPacket{ + PacketType: PacketTypePubrec, + PacketID: p.PacketID, + Properties: p.Properties.ArrayOf(), + Reason: byte(p.Reason), + } + return resp.Write(w) +} + +type PubrelReasonCode byte + +const ( + PubrelReasonCodeSuccess PubackReasonCode = 0 + PubrelReasonPacketIDNotFound = 146 +) + +type PubrelPacket struct { + PacketID uint16 + Properties properties.PubrecPacketProperties + Reason PubrelReasonCode +} + +func (p PubrelPacket) Write(w io.Writer) error { + resp := pubRespPacket{ + PacketType: PacketTypePubrel, + PacketID: p.PacketID, + Properties: p.Properties.ArrayOf(), + Reason: byte(p.Reason), + } + return resp.Write(w) +} + +type PubcompPacket struct { + PacketID uint16 + Properties properties.PubcompPacketProperties + Reason PubrelReasonCode +} + +func (p PubcompPacket) Write(w io.Writer) error { + resp := pubRespPacket{ + PacketType: PacketTypePubrel, + PacketID: p.PacketID, + Properties: p.Properties.ArrayOf(), + Reason: byte(p.Reason), + } + return resp.Write(w) +} diff --git a/mqtt/packets/Publish.go b/mqtt/packets/Publish.go new file mode 100644 index 0000000..5d551c2 --- /dev/null +++ b/mqtt/packets/Publish.go @@ -0,0 +1,62 @@ +package packets + +import ( + "bufio" + "errors" + "io" + + "badat.dev/maeqtt/v2/mqtt/properties" + "badat.dev/maeqtt/v2/mqtt/types" +) + +type PublishPacket struct { + Dup bool + Retain bool + QOSLevel byte + TopicName string + Payload []byte + PacketId *uint16 + Properties properties.PublishPacketProperties +} + +func (p PublishPacket) visit(v PacketVisitor) { + v.visitPublish(p) +} + +func parsePublishPacket(control controlPacket) (PublishPacket, error) { + var err error + r := bufio.NewReader(control.reader) + packet := PublishPacket{} + + if control.packetType != PacketTypePublish { + return packet, errors.New("Wrong packet type for parseConnectPacket") + } + + packet.Retain = control.flags&1 == 1 + packet.QOSLevel = byte((control.flags >> 1) & 0b11) + packet.Dup = (control.flags>>3)&1 == 0 + + packet.TopicName, err = types.DecodeUTF8String(r) + if err != nil { + return packet, err + } + + if packet.QOSLevel != 0 { + packId, err := types.DecodeUint16(r) + if err != nil { + return packet, err + } + packet.PacketId = &packId + } + err = properties.ParseProperties(r, packet.Properties.ArrayOf()) + if err != nil { + return packet, err + } + + packet.Payload, err = io.ReadAll(r) + if err != nil { + return packet, err + } + + return packet, nil +} diff --git a/mqtt/packets/Subscriptions.go b/mqtt/packets/Subscriptions.go new file mode 100644 index 0000000..64c587a --- /dev/null +++ b/mqtt/packets/Subscriptions.go @@ -0,0 +1,180 @@ +package packets + +import ( + "bufio" + "errors" + "io" + + "badat.dev/maeqtt/v2/mqtt/properties" + "badat.dev/maeqtt/v2/mqtt/types" +) + +type TopicFilter struct { + Topic string + MaxQoS uint + NoLocal bool + RetainAsPublished bool + RetainHandling uint +} + +func parseTopicFilter(r *bufio.Reader) (TopicFilter, error) { + filter := TopicFilter{} + var err error + + filter.Topic, err = types.DecodeUTF8String(r) + if err != nil { + return filter, err + } + + options, err := types.DecodeBits(r) + if err != nil { + return filter, err + } + filter.MaxQoS = types.BoolsToUint(options[0], options[1]) + filter.NoLocal = options[2] + filter.RetainAsPublished = options[3] + filter.RetainHandling = types.BoolsToUint(options[4], options[5]) + return filter, nil +} + +// Both sub and unsubscribe packets are identitcal so we can reuse the parsing logic +type SubscriptionPacket struct { + PacketId uint16 + TopicFilters []TopicFilter +} + +func parseSubscriptionPacket(control controlPacket, props []properties.Property) (SubscriptionPacket, error) { + var err error + r := bufio.NewReader(control.reader) + packet := SubscriptionPacket{} + + if control.flags != 2 { + return packet, errors.New("Malformed subscription packet") + } + + packet.PacketId, err = types.DecodeUint16(r) + if err != nil { + return packet, err + } + + err = properties.ParseProperties(r, props) + if err != nil { + return packet, err + } + + for err != io.EOF { + filter, err := parseTopicFilter(r) + packet.TopicFilters = append(packet.TopicFilters, filter) + if err != nil { + return packet, err + } + _, err = r.Peek(1) + if err != nil || err != io.EOF { + return packet, err + } + } + + return packet, nil +} + +type SubscribePacket struct { + *SubscriptionPacket + props properties.SubscribePacketProperties +} + +func parseSubscribePacket(control controlPacket) (SubscribePacket, error) { + if control.packetType != PacketTypeSubscribe { + panic("Wrong packet type for parseSubscribePacket") + } + + pack := SubscribePacket{} + subscriptionPack, err := parseSubscriptionPacket(control, pack.props.ArrayOf()) + if err != nil { + return pack, err + } + pack.PacketId = subscriptionPack.PacketId + pack.TopicFilters = subscriptionPack.TopicFilters + return pack, nil +} + +type SubackReasonCode byte + +const ( + SubackReasonGrantedQoSZero PubackReasonCode = 0 + SubackReasonGrantedQoSOne = 1 + SubackReasonGrantedQoSTwo = 2 + SubackReasonUnspecified = 128 + SubackReasonImplSpecificError = 131 + SubackReasonNotAuthorized = 135 + SubackReasonTopicFilterInvalid = 143 + SubackReasonPacketIDInUse = 145 + SubackReasonQuotaExceeded = 151 + SubackReasonSharedSubNotSupported = 151 + SubackReasonSubIDUnsupported = 151 + SubackReasonWildcardSubUnsupported = 151 +) + +type SubAckPacket struct { + PacketID uint16 + Properties properties.SubackPacketProperties + Reason SubackReasonCode +} + + +func (p SubAckPacket) Write(w io.Writer) error { + resp := pubRespPacket{ + PacketType: PacketTypeSuback, + PacketID: p.PacketID, + Properties: p.Properties.ArrayOf(), + Reason: byte(p.Reason), + } + return resp.Write(w) +} + +type UnsubscribePacket struct { + *SubscriptionPacket + props properties.UnsubscribePacketProperties +} + +func parseUnsubscribePacket(control controlPacket) (UnsubscribePacket, error) { + if control.packetType != PacketTypeUnsubscribe { + panic("Wrong packet type for parseSubscribePacket") + } + + pack := UnsubscribePacket{} + subscriptionPack, err := parseSubscriptionPacket(control, pack.props.ArrayOf()) + if err != nil { + return pack, err + } + pack.PacketId = subscriptionPack.PacketId + pack.TopicFilters = subscriptionPack.TopicFilters + return pack, nil +} + +type UnsubackReasonCode byte + +const ( + UnsubackReasonSuccess PubackReasonCode = 0 + UnSubackReasonUnspecified = 128 + UnSubackReasonImplSpecificError = 131 + UnSubackReasonNotAuthorized = 135 + UnSubackReasonTopicFilterInvalid = 143 + UnSubackReasonPacketIDInUse = 145 +) + +type UnsubAckPacket struct { + PacketID uint16 + Properties properties.UnsubackPacketProperties + Reason UnsubackReasonCode +} + + +func (p UnsubAckPacket) Write(w io.Writer) error { + resp := pubRespPacket{ + PacketType: PacketTypeUnsuback, + PacketID: p.PacketID, + Properties: p.Properties.ArrayOf(), + Reason: byte(p.Reason), + } + return resp.Write(w) +} diff --git a/mqtt/packets/ack.go b/mqtt/packets/ack.go new file mode 100644 index 0000000..61533a3 --- /dev/null +++ b/mqtt/packets/ack.go @@ -0,0 +1,42 @@ +package packets + +import ( + "bytes" + "io" + + "badat.dev/maeqtt/v2/mqtt/properties" + "badat.dev/maeqtt/v2/mqtt/types" +) + +// Boilerplate struct for de/serializing various ack packets +type pubRespPacket struct { + PacketType PacketType + PacketID uint16 + Properties []properties.Property + Reason byte +} + +func (p pubRespPacket) Write(w io.Writer) error { + buf := bytes.NewBuffer([]byte{}) + err := types.WriteUint16(buf, p.PacketID) + if err != nil { + return err + } + + err = buf.WriteByte(byte(p.Reason)) + if err != nil { + return err + } + + err = properties.WriteProps(buf, p.Properties) + if err != nil { + return err + } + + conPack := controlPacket{ + packetType: PacketTypePuback, + flags: 0, + reader: buf, + } + return conPack.write(w) +} diff --git a/mqtt/packets/packets.go b/mqtt/packets/packets.go new file mode 100644 index 0000000..3210264 --- /dev/null +++ b/mqtt/packets/packets.go @@ -0,0 +1,82 @@ +package packets + +import ( + "bufio" + "bytes" + "fmt" + "io" + + "badat.dev/maeqtt/v2/mqtt/types" +) + +type PacketVisitor interface { + visitConnect(ConnectPacket) + visitPublish(PublishPacket) + visitDisconnect(DisconnectPacket) +} + +type ClientPacket interface { + visit(PacketVisitor) +} + +type ServerPacket interface { + Encode() (bytes.Buffer, error) +} + +type PacketType byte + +const ( + PacketTypeReserved PacketType = 0 // Forbidden + PacketTypeConnect = 1 + PacketTypeConnack = 2 + PacketTypePublish = 3 + PacketTypePuback = 4 + PacketTypePubrec = 5 + PacketTypePubrel = 6 + PacketTypePubcomp = 7 + PacketTypeSubscribe = 8 + PacketTypeSuback = 9 + PacketTypeUnsubscribe = 10 + PacketTypeUnsuback = 11 + PacketTypePingreq = 12 + PacketTypePingresp = 13 + PacketTypeDisconnect = 14 + PacketTypeAuth = 15 +) + +func ReadPacket(r *bufio.Reader) (*ClientPacket, error) { + println("AAAA") + fixedHeader, err := r.ReadByte() + if err != nil { + return nil, err + } + println("BBB") + + highestFourBits := uint((fixedHeader >> 4) & 0b1111) + lowerFourBits := uint(fixedHeader & 0b1111) + + dataLength, err := types.DecodeVariableByteInt(r) + if err != nil { + return nil, err + } + reader := io.LimitReader(r, int64(dataLength)) + control := controlPacket{ + packetType: PacketType(highestFourBits), + flags: lowerFourBits, + reader: reader, + } + + var packet ClientPacket + switch control.packetType { + case PacketTypeConnect: + packet, err = parseConnectPacket(control) + case PacketTypePublish: + packet, err = parsePublishPacket(control) + case PacketTypeDisconnect: + packet, err = parseDisconnectPacket(control) + default: + return nil, fmt.Errorf("Unknown packet type %v", control.packetType) + } + + return &packet, err +} diff --git a/mqtt/properties/Properties.go b/mqtt/properties/Properties.go new file mode 100644 index 0000000..f61b4de --- /dev/null +++ b/mqtt/properties/Properties.go @@ -0,0 +1,121 @@ +package properties + +import ( + "bufio" + "bytes" + "errors" + "io" + + "badat.dev/maeqtt/v2/mqtt/types" +) + +type Property interface { + id() int + parse(r *bufio.Reader) error + write(w io.Writer) error + hasValue() bool +} + +func findMatchingProp(props []Property, id int) *Property { + for _, prop := range props { + if prop.id() == id { + return (&prop) + } + } + return nil + +} + +func ParseProperties(r *bufio.Reader, props []Property) error { + propLen, err := types.DecodeVariableByteInt(r) + if err != nil { + return err + } + + limitReader := io.LimitReader(r, int64(propLen)) + r = bufio.NewReader(limitReader) + + for { + propId, err := r.ReadByte() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + prop := findMatchingProp(props, int(propId)) + if prop == nil { + return errors.New("Malformed packet invalid propid") + } + err = (*prop).parse(r) + if err != nil { + return err + } + } + + return nil +} + +func WriteProps(w io.Writer, props []Property) error { + buf := bytes.NewBuffer([]byte{}) + for _, p := range props { + if p.hasValue() { + buf.Write([]byte{byte(p.id())}) + err := p.write(buf) + if err != nil { + return err + } + } + } + return types.WriteDataWithVarIntLen(w, buf.Bytes()) +} + +type KVPair struct { + key string + value string +} + +type UserProperty struct { + values []KVPair +} + +func (*UserProperty) id() int { + return 38 +} + +func (u *UserProperty) parse(r *bufio.Reader) error { + key, err := types.DecodeUTF8String(r) + if err != nil { + return err + } + + value, err := types.DecodeUTF8String(r) + if err != nil { + return err + } + u.values = append(u.values, KVPair{ + key: key, + value: value, + }) + return nil +} + +func (u *UserProperty) write(w io.Writer) error { + for _, k := range u.values { + err := types.WriteUTF8String(w, k.key) + if err != nil { + return err + } + err = types.WriteUTF8String(w, k.value) + if err != nil { + return err + } + + } + return nil +} + +func (u *UserProperty) hasValue() bool { + return len(u.values) > 0 +} diff --git a/mqtt/properties/genProps.py b/mqtt/properties/genProps.py new file mode 100644 index 0000000..a0e9f2b --- /dev/null +++ b/mqtt/properties/genProps.py @@ -0,0 +1,104 @@ +import json +from collections import defaultdict + +print("package properties") +print("// This code has been generated with the genProps.py script. Do not modify\n\n") + +print("import \"bufio\"") +print("import \"io\"") +print("import . \"badat.dev/maeqtt/v2/mqtt/types\"") + +TYPE_TO_GOTYPE = { + "Byte": "byte", + "Two Byte Integer": "uint16", + "Four Byte Integer": "uint32", + "Binary Data": "[]byte", + "UTF-8 Encoded String": "string", + "Variable Byte Integer": "uint32", + } +TYPE_DECODE_CODE = { + "Byte": "val, err := r.ReadByte()", + "Two Byte Integer": "val, err := DecodeUint16(r)", + "Four Byte Integer": "val, err := DecodeUint32(r)", + "Binary Data": "val, err := DecodeBinaryData(r)", + "UTF-8 Encoded String": "val, err := DecodeUTF8String(r)", + "Variable Byte Integer": "val, err := DecodeVariableByteInt(r)" +} + +TYPE_WRITE_CODE = { + "Byte": "(func () error {_, err := w.Write([]byte{*p.Value}); return err})()", + "Two Byte Integer": "WriteUint16(w, *p.Value)", + "Four Byte Integer": "WriteUint32(w, *p.Value)", + "Binary Data": "WriteBinaryData(w, *p.Value)", + "UTF-8 Encoded String": "WriteUTF8String(w, *p.Value)", + "Variable Byte Integer": "WriteVariableByteInt(w, *p.Value)" +} + +applicationToProp = defaultdict(lambda: []) + +with open("./properties.json") as f: + properties = json.load(f) + +for prop in properties: + prop["name"] = "".join(prop["name"].split(" ")) + +for prop in properties: + for application in prop["appliesTo"]: + applicationToProp[application].append(prop) + + val = prop["val"] + name= prop["name"] + if val == "38": + #needs manual handling + continue + + gotype = TYPE_TO_GOTYPE[prop["type"]] + godecode = TYPE_DECODE_CODE[prop["type"]] + gowrite = TYPE_WRITE_CODE[prop["type"]] + + print(""" +type {name} struct {{ + Value *{gotype} +}} + +func (p {name}) id() int {{ + return {val} +}} + +func (p *{name}) parse(r *bufio.Reader) error {{ + {godecode} + if err != nil {{ + return err + }} + p.Value = &val + return nil +}} + +func (p {name}) hasValue() bool {{ + return p.Value != nil +}} + +func (p {name}) write(w io.Writer) error {{ + return {gowrite} +}} + """.format(name= name, gotype = gotype, gowrite = gowrite, godecode = godecode, val = val + )) + +for k,v in applicationToProp.items(): + if k == "Will Properties": + arrName = "WillProperties" + else: + arrName = k.lower().capitalize() + "PacketProperties" + + print("type", arrName, "struct {") + + for prop in v: + print(prop["name"], prop["name"]) + print("}") + print(f"func (p *{arrName}) ArrayOf() []Property {{") + print("return []Property {") + for prop in v: + propName = prop["name"] + print(f"&p.{propName},") + print("}") + print("}") diff --git a/mqtt/properties/properties.json b/mqtt/properties/properties.json new file mode 100644 index 0000000..d7685ee --- /dev/null +++ b/mqtt/properties/properties.json @@ -0,0 +1,189 @@ +[ + { + "val": "1", + "name": "Payload Format Indicator", + "type": "Byte", + "appliesTo": ["PUBLISH", "Will Properties"] + }, + { + "val": "2", + "name": "Message Expiry Interval", + "type": "Four Byte Integer", + "appliesTo": ["PUBLISH", "Will Properties"] + }, + { + "val": "3", + "name": "Content Type", + "type": "UTF-8 Encoded String", + "appliesTo": ["PUBLISH", "Will Properties"] + }, + { + "val": "8", + "name": "Response Topic", + "type": "UTF-8 Encoded String", + "appliesTo": ["PUBLISH", "Will Properties"] + }, + { + "val": "9", + "name": "Correlation Data", + "type": "Binary Data", + "appliesTo": ["PUBLISH", "Will Properties"] + }, + { + "val": "11", + "name": "Subscription Identifier", + "type": "Variable Byte Integer", + "appliesTo": ["PUBLISH", "SUBSCRIBE"] + }, + { + "val": "17", + "name": "Session Expiry Interval", + "type": "Four Byte Integer", + "appliesTo": ["CONNECT", "CONNACK", "DISCONNECT"] + }, + { + "val": "18", + "name": "Assigned Client Identifier", + "type": "UTF-8 Encoded String", + "appliesTo": ["CONNACK"] + }, + { + "val": "19", + "name": "Server Keep Alive", + "type": "Two Byte Integer", + "appliesTo": ["CONNACK"] + }, + { + "val": "21", + "name": "Authentication Method", + "type": "UTF-8 Encoded String", + "appliesTo": ["CONNECT", "CONNACK", "AUTH"] + }, + { + "val": "22", + "name": "Authentication Data", + "type": "Binary Data", + "appliesTo": ["CONNECT", "CONNACK", "AUTH"] + }, + { + "val": "23", + "name": "Request Problem Information", + "type": "Byte", + "appliesTo": ["CONNECT"] + }, + { + "val": "24", + "name": "Will Delay Interval", + "type": "Four Byte Integer", + "appliesTo": ["Will Properties"] + }, + { + "val": "25", + "name": "Request Response Information", + "type": "Byte", + "appliesTo": ["CONNECT"] + }, + { + "val": "26", + "name": "Response Information", + "type": "UTF-8 Encoded String", + "appliesTo": ["CONNACK"] + }, + { + "val": "28", + "name": "Server Reference", + "type": "UTF-8 Encoded String", + "appliesTo": ["CONNACK", "DISCONNECT"] + }, + { + "val": "31", + "name": "Reason String", + "type": "UTF-8 Encoded String", + "appliesTo": [ + "CONNACK", + "PUBACK", + "PUBREC", + "PUBREL", + "PUBCOMP", + "SUBACK", + "UNSUBACK", + "DISCONNECT", + "AUTH" + ] + }, + { + "val": "33", + "name": "Receive Maximum", + "type": "Two Byte Integer", + "appliesTo": ["CONNECT", "CONNACK"] + }, + { + "val": "34", + "name": "Topic Alias Maximum", + "type": "Two Byte Integer", + "appliesTo": ["CONNECT", "CONNACK"] + }, + { + "val": "35", + "name": "Topic Alias", + "type": "Two Byte Integer", + "appliesTo": ["PUBLISH"] + }, + { + "val": "36", + "name": "Maximum QoS", + "type": "Byte", + "appliesTo": ["CONNACK"] + }, + { + "val": "37", + "name": "Retain Available", + "type": "Byte", + "appliesTo": ["CONNACK"] + }, + { + "val": "38", + "name": "User Property", + "type": "UTF-8 String Pair", + "appliesTo": [ + "CONNECT", + "CONNACK", + "PUBLISH", + "Will Properties", + "PUBACK", + "PUBREC", + "PUBREL", + "PUBCOMP", + "SUBSCRIBE", + "SUBACK", + "UNSUBSCRIBE", + "UNSUBACK", + "DISCONNECT", + "AUTH" + ] + }, + { + "val": "39", + "name": "Maximum Packet Size", + "type": "Four Byte Integer", + "appliesTo": ["CONNECT", "CONNACK"] + }, + { + "val": "40", + "name": "Wildcard Subscription Available", + "type": "Byte", + "appliesTo": ["CONNACK"] + }, + { + "val": "41", + "name": "Subscription Identifier Available", + "type": "Byte", + "appliesTo": ["CONNACK"] + }, + { + "val": "42", + "name": "Shared Subscription Available", + "type": "Byte", + "appliesTo": ["CONNACK"] + } +] diff --git a/mqtt/types/Decoding.go b/mqtt/types/Decoding.go new file mode 100644 index 0000000..edd20e0 --- /dev/null +++ b/mqtt/types/Decoding.go @@ -0,0 +1,86 @@ +package types + +import ( + "bufio" + "encoding/binary" + "errors" +) + +func DecodeBits(r *bufio.Reader) ([8]bool, error) { + bitflags, err := r.ReadByte() + if err != nil { + return [8]bool{}, err + } + + var res [8]bool + for i := range res { + res[i] = (bitflags>>i)&1 == 1 + } + return res, nil +} + +func DecodeUint16(r *bufio.Reader) (uint16, error) { + buf := make([]byte, 2) + _, err := r.Read(buf) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint16(buf), nil +} + +func DecodeUint32(r *bufio.Reader) (uint32, error) { + buf := make([]byte, 4) + _, err := r.Read(buf) + if err != nil { + return 0, err + } + return binary.BigEndian.Uint32(buf), nil +} + +func DecodeDataWithVarIntLen(r *bufio.Reader) ([]byte, error) { + len, err := DecodeVariableByteInt(r) + if err != nil { + return []byte{}, err + } + buffer := make([]byte, len) + _, err = r.Read(buffer) + return buffer, err +} + +func DecodeBinaryData(r *bufio.Reader) ([]byte, error) { + len, err := DecodeUint16(r) + if err != nil { + return []byte{}, err + } + buffer := make([]byte, len) + _, err = r.Read(buffer) + return buffer, err +} + + +func DecodeUTF8String(r *bufio.Reader) (string, error) { + binary, err := DecodeBinaryData(r) + return string(binary[:]), err +} + +func DecodeVariableByteInt(r *bufio.Reader) (value uint32, err error) { + multiplier := uint32(1) + value = 0 + + for { + encodedByte, err := r.ReadByte() + if err != nil { + return value, err + } + value += uint32((encodedByte & 127)) * multiplier + if multiplier > 128*128*128 { + return value, errors.New("Malformed Variable Byte Integer") + } + multiplier *= 128 + + if encodedByte&128 == 0 { + break + } + } + return value, nil +} diff --git a/mqtt/types/Encoding.go b/mqtt/types/Encoding.go new file mode 100644 index 0000000..dd8dae1 --- /dev/null +++ b/mqtt/types/Encoding.go @@ -0,0 +1,83 @@ +package types + +import ( + "encoding/binary" + "errors" + "io" +) + +func WriteBits(w io.Writer, data [8]bool) error { + encoded := byte(0) + for i, v := range data { + encoded = encoded | byte(BoolToUint(v)< int(uint32Max) { + return errors.New("Tried to write more data than max varint size") + } + + err := WriteVariableByteInt(w, uint32(len(data))) + if err != nil { + return err + } + + _, err = w.Write(data) + return err +} + +const uint16Max uint16 = ^uint16(0) +func WriteBinaryData(w io.Writer, data []byte) error { + if len(data) > int(uint16Max) { + return errors.New("Tried to write more data than max uint16 size") + } + + err := WriteUint16(w, uint16(len(data))) + if err != nil { + return err + } + + _, err = w.Write(data) + return err +} + +func WriteUTF8String(w io.Writer, str string) error { + return WriteBinaryData(w, []byte(str)) +} + + +func WriteVariableByteInt(w io.Writer, v uint32) error { + for { + encodedByte := byte(v % 128) + v = v / 128 + + if v > 0 { + encodedByte = encodedByte | 128 + } + + w.Write([]byte{encodedByte}) + + if v == 0 { + return nil + } + } +} diff --git a/mqtt/types/Utils.go b/mqtt/types/Utils.go new file mode 100644 index 0000000..3c321f6 --- /dev/null +++ b/mqtt/types/Utils.go @@ -0,0 +1,19 @@ +package types + +// Returns 1 if b is true and 0 if false +func BoolToUint(b bool) uint { + if b { + return 1 + } + return 0 +} + +// Reinterprets a slice of bools as a uint +// This method doesn't check for overflow +func BoolsToUint(b ...bool) uint { + var res uint + for i, v := range b { + res += BoolToUint(v) << i + } + return res +} diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..2db1ebe --- /dev/null +++ b/shell.nix @@ -0,0 +1,5 @@ +{ pkgs ? import {} }: + pkgs.mkShell { + # nativeBuildInputs is usually what you want -- tools you need to run + nativeBuildInputs = with pkgs; [ (enableDebugging mosquitto) wireshark delve ]; +}