Compare commits

..

No commits in common. "9cd116c5e42126319bcb56fa5aadb5c50115d41c" and "a4b5af256dd41e81f268bab28cc7913f9251ac3d" have entirely different histories.

12 changed files with 715 additions and 66 deletions

12
main.go
View file

@ -10,11 +10,11 @@ import (
)
func main() {
listenAddr := ":1883"
listener, err := net.Listen("tcp", listenAddr)
listen_addr := ":1883"
listener, err := net.Listen("tcp", listen_addr)
if err != nil {
log.Fatalf("Coulde't start a listener on tcp %v. Error: %e", listenAddr, err)
log.Fatalf("Coulde't start a listener on tcp %v. Error: %e", listen_addr, err)
}
var sessions map[string]*session.Session = make(map[string]*session.Session)
@ -35,10 +35,10 @@ func main() {
for {
select {
case con := <-connChan:
case con := <- connChan:
handleConnection(con, sessions, removeSessChan)
case sesID := <-removeSessChan:
delete(sessions, sesID)
case sesId := <- removeSessChan:
delete(sessions, sesId)
}
}
}

687
mqtt/GeneratedProperties.go Normal file
View file

@ -0,0 +1,687 @@
package mqtt
// This code has been generated with the genProps.py script. Do not modify
import "bufio"
type PayloadFormatIndicator struct {
value *byte
}
func (p PayloadFormatIndicator) id() int {
return 1
}
func (p *PayloadFormatIndicator) parse(r *bufio.Reader) error {
val, err := r.ReadByte()
if err != nil {
return err
}
p.value = &val
return nil
}
type MessageExpiryInterval struct {
value *uint32
}
func (p MessageExpiryInterval) id() int {
return 2
}
func (p *MessageExpiryInterval) parse(r *bufio.Reader) error {
val, err := decodeUint32(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type ContentType struct {
value *string
}
func (p ContentType) id() int {
return 3
}
func (p *ContentType) parse(r *bufio.Reader) error {
val, err := decodeUTF8String(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type ResponseTopic struct {
value *string
}
func (p ResponseTopic) id() int {
return 8
}
func (p *ResponseTopic) parse(r *bufio.Reader) error {
val, err := decodeUTF8String(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type CorrelationData struct {
value *[]byte
}
func (p CorrelationData) id() int {
return 9
}
func (p *CorrelationData) parse(r *bufio.Reader) error {
val, err := decodeBinaryData(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type SubscriptionIdentifier struct {
value *int
}
func (p SubscriptionIdentifier) id() int {
return 11
}
func (p *SubscriptionIdentifier) parse(r *bufio.Reader) error {
val, err := decodeVariableByteInt(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type SessionExpiryInterval struct {
value *uint32
}
func (p SessionExpiryInterval) id() int {
return 17
}
func (p *SessionExpiryInterval) parse(r *bufio.Reader) error {
val, err := decodeUint32(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type AssignedClientIdentifier struct {
value *string
}
func (p AssignedClientIdentifier) id() int {
return 18
}
func (p *AssignedClientIdentifier) parse(r *bufio.Reader) error {
val, err := decodeUTF8String(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type ServerKeepAlive struct {
value *uint16
}
func (p ServerKeepAlive) id() int {
return 19
}
func (p *ServerKeepAlive) parse(r *bufio.Reader) error {
val, err := decodeUint16(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type AuthenticationMethod struct {
value *string
}
func (p AuthenticationMethod) id() int {
return 21
}
func (p *AuthenticationMethod) parse(r *bufio.Reader) error {
val, err := decodeUTF8String(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type AuthenticationData struct {
value *[]byte
}
func (p AuthenticationData) id() int {
return 22
}
func (p *AuthenticationData) parse(r *bufio.Reader) error {
val, err := decodeBinaryData(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type RequestProblemInformation struct {
value *byte
}
func (p RequestProblemInformation) id() int {
return 23
}
func (p *RequestProblemInformation) parse(r *bufio.Reader) error {
val, err := r.ReadByte()
if err != nil {
return err
}
p.value = &val
return nil
}
type WillDelayInterval struct {
value *uint32
}
func (p WillDelayInterval) id() int {
return 24
}
func (p *WillDelayInterval) parse(r *bufio.Reader) error {
val, err := decodeUint32(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type RequestResponseInformation struct {
value *byte
}
func (p RequestResponseInformation) id() int {
return 25
}
func (p *RequestResponseInformation) parse(r *bufio.Reader) error {
val, err := r.ReadByte()
if err != nil {
return err
}
p.value = &val
return nil
}
type ResponseInformation struct {
value *string
}
func (p ResponseInformation) id() int {
return 26
}
func (p *ResponseInformation) parse(r *bufio.Reader) error {
val, err := decodeUTF8String(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type ServerReference struct {
value *string
}
func (p ServerReference) id() int {
return 28
}
func (p *ServerReference) parse(r *bufio.Reader) error {
val, err := decodeUTF8String(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type ReasonString struct {
value *string
}
func (p ReasonString) id() int {
return 31
}
func (p *ReasonString) parse(r *bufio.Reader) error {
val, err := decodeUTF8String(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type ReceiveMaximum struct {
value *uint16
}
func (p ReceiveMaximum) id() int {
return 33
}
func (p *ReceiveMaximum) parse(r *bufio.Reader) error {
val, err := decodeUint16(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type TopicAliasMaximum struct {
value *uint16
}
func (p TopicAliasMaximum) id() int {
return 34
}
func (p *TopicAliasMaximum) parse(r *bufio.Reader) error {
val, err := decodeUint16(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type TopicAlias struct {
value *uint16
}
func (p TopicAlias) id() int {
return 35
}
func (p *TopicAlias) parse(r *bufio.Reader) error {
val, err := decodeUint16(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type MaximumQoS struct {
value *byte
}
func (p MaximumQoS) id() int {
return 36
}
func (p *MaximumQoS) parse(r *bufio.Reader) error {
val, err := r.ReadByte()
if err != nil {
return err
}
p.value = &val
return nil
}
type RetainAvailable struct {
value *byte
}
func (p RetainAvailable) id() int {
return 37
}
func (p *RetainAvailable) parse(r *bufio.Reader) error {
val, err := r.ReadByte()
if err != nil {
return err
}
p.value = &val
return nil
}
type MaximumPacketSize struct {
value *uint32
}
func (p MaximumPacketSize) id() int {
return 39
}
func (p *MaximumPacketSize) parse(r *bufio.Reader) error {
val, err := decodeUint32(r)
if err != nil {
return err
}
p.value = &val
return nil
}
type WildcardSubscriptionAvailable struct {
value *byte
}
func (p WildcardSubscriptionAvailable) id() int {
return 40
}
func (p *WildcardSubscriptionAvailable) parse(r *bufio.Reader) error {
val, err := r.ReadByte()
if err != nil {
return err
}
p.value = &val
return nil
}
type SubscriptionIdentifierAvailable struct {
value *byte
}
func (p SubscriptionIdentifierAvailable) id() int {
return 41
}
func (p *SubscriptionIdentifierAvailable) parse(r *bufio.Reader) error {
val, err := r.ReadByte()
if err != nil {
return err
}
p.value = &val
return nil
}
type SharedSubscriptionAvailable struct {
value *byte
}
func (p SharedSubscriptionAvailable) id() int {
return 42
}
func (p *SharedSubscriptionAvailable) parse(r *bufio.Reader) error {
val, err := r.ReadByte()
if err != nil {
return err
}
p.value = &val
return nil
}
type PublishPacketProperties struct {
PayloadFormatIndicator PayloadFormatIndicator
MessageExpiryInterval MessageExpiryInterval
ContentType ContentType
ResponseTopic ResponseTopic
CorrelationData CorrelationData
SubscriptionIdentifier SubscriptionIdentifier
TopicAlias TopicAlias
UserProperty UserProperty
}
func (p *PublishPacketProperties) arrayOf() []Property {
return []Property{
&p.PayloadFormatIndicator,
&p.MessageExpiryInterval,
&p.ContentType,
&p.ResponseTopic,
&p.CorrelationData,
&p.SubscriptionIdentifier,
&p.TopicAlias,
&p.UserProperty,
}
}
type WillProperties struct {
PayloadFormatIndicator PayloadFormatIndicator
MessageExpiryInterval MessageExpiryInterval
ContentType ContentType
ResponseTopic ResponseTopic
CorrelationData CorrelationData
WillDelayInterval WillDelayInterval
UserProperty UserProperty
}
func (p *WillProperties) arrayOf() []Property {
return []Property{
&p.PayloadFormatIndicator,
&p.MessageExpiryInterval,
&p.ContentType,
&p.ResponseTopic,
&p.CorrelationData,
&p.WillDelayInterval,
&p.UserProperty,
}
}
type SubscribePacketProperties struct {
SubscriptionIdentifier SubscriptionIdentifier
UserProperty UserProperty
}
func (p *SubscribePacketProperties) arrayOf() []Property {
return []Property{
&p.SubscriptionIdentifier,
&p.UserProperty,
}
}
type ConnectPacketProperties struct {
SessionExpiryInterval SessionExpiryInterval
AuthenticationMethod AuthenticationMethod
AuthenticationData AuthenticationData
RequestProblemInformation RequestProblemInformation
RequestResponseInformation RequestResponseInformation
ReceiveMaximum ReceiveMaximum
TopicAliasMaximum TopicAliasMaximum
UserProperty UserProperty
MaximumPacketSize MaximumPacketSize
}
func (p *ConnectPacketProperties) arrayOf() []Property {
return []Property{
&p.SessionExpiryInterval,
&p.AuthenticationMethod,
&p.AuthenticationData,
&p.RequestProblemInformation,
&p.RequestResponseInformation,
&p.ReceiveMaximum,
&p.TopicAliasMaximum,
&p.UserProperty,
&p.MaximumPacketSize,
}
}
type ConnackPacketProperties struct {
SessionExpiryInterval SessionExpiryInterval
AssignedClientIdentifier AssignedClientIdentifier
ServerKeepAlive ServerKeepAlive
AuthenticationMethod AuthenticationMethod
AuthenticationData AuthenticationData
ResponseInformation ResponseInformation
ServerReference ServerReference
ReasonString ReasonString
ReceiveMaximum ReceiveMaximum
TopicAliasMaximum TopicAliasMaximum
MaximumQoS MaximumQoS
RetainAvailable RetainAvailable
UserProperty UserProperty
MaximumPacketSize MaximumPacketSize
WildcardSubscriptionAvailable WildcardSubscriptionAvailable
SubscriptionIdentifierAvailable SubscriptionIdentifierAvailable
SharedSubscriptionAvailable SharedSubscriptionAvailable
}
func (p *ConnackPacketProperties) arrayOf() []Property {
return []Property{
&p.SessionExpiryInterval,
&p.AssignedClientIdentifier,
&p.ServerKeepAlive,
&p.AuthenticationMethod,
&p.AuthenticationData,
&p.ResponseInformation,
&p.ServerReference,
&p.ReasonString,
&p.ReceiveMaximum,
&p.TopicAliasMaximum,
&p.MaximumQoS,
&p.RetainAvailable,
&p.UserProperty,
&p.MaximumPacketSize,
&p.WildcardSubscriptionAvailable,
&p.SubscriptionIdentifierAvailable,
&p.SharedSubscriptionAvailable,
}
}
type DisconnectPacketProperties struct {
SessionExpiryInterval SessionExpiryInterval
ServerReference ServerReference
ReasonString ReasonString
UserProperty UserProperty
}
func (p *DisconnectPacketProperties) arrayOf() []Property {
return []Property{
&p.SessionExpiryInterval,
&p.ServerReference,
&p.ReasonString,
&p.UserProperty,
}
}
type AuthPacketProperties struct {
AuthenticationMethod AuthenticationMethod
AuthenticationData AuthenticationData
ReasonString ReasonString
UserProperty UserProperty
}
func (p *AuthPacketProperties) arrayOf() []Property {
return []Property{
&p.AuthenticationMethod,
&p.AuthenticationData,
&p.ReasonString,
&p.UserProperty,
}
}
type PubackPacketProperties struct {
ReasonString ReasonString
UserProperty UserProperty
}
func (p *PubackPacketProperties) arrayOf() []Property {
return []Property{
&p.ReasonString,
&p.UserProperty,
}
}
type PubrecPacketProperties struct {
ReasonString ReasonString
UserProperty UserProperty
}
func (p *PubrecPacketProperties) arrayOf() []Property {
return []Property{
&p.ReasonString,
&p.UserProperty,
}
}
type PubrelPacketProperties struct {
ReasonString ReasonString
UserProperty UserProperty
}
func (p *PubrelPacketProperties) arrayOf() []Property {
return []Property{
&p.ReasonString,
&p.UserProperty,
}
}
type PubcompPacketProperties struct {
ReasonString ReasonString
UserProperty UserProperty
}
func (p *PubcompPacketProperties) arrayOf() []Property {
return []Property{
&p.ReasonString,
&p.UserProperty,
}
}
type SubackPacketProperties struct {
ReasonString ReasonString
UserProperty UserProperty
}
func (p *SubackPacketProperties) arrayOf() []Property {
return []Property{
&p.ReasonString,
&p.UserProperty,
}
}
type UnsubackPacketProperties struct {
ReasonString ReasonString
UserProperty UserProperty
}
func (p *UnsubackPacketProperties) arrayOf() []Property {
return []Property{
&p.ReasonString,
&p.UserProperty,
}
}
type UnsubscribePacketProperties struct {
UserProperty UserProperty
}
func (p *UnsubscribePacketProperties) arrayOf() []Property {
return []Property{
&p.UserProperty,
}
}

View file

@ -26,13 +26,7 @@ func (c controlPacket) write(w io.Writer) error {
return err
}
err = types.WriteDataWithVarIntLen(buf, data)
if err != nil {
return err
}
_, err = w.Write(buf.Bytes())
if err != nil {
return err
}
types.WriteDataWithVarIntLen(buf, data)
w.Write(buf.Bytes())
return nil
}

View file

@ -74,10 +74,7 @@ func WriteVariableByteInt(w io.Writer, v uint32) error {
encodedByte = encodedByte | 128
}
_,err := w.Write([]byte{encodedByte})
if err != nil {
return err
}
w.Write([]byte{encodedByte})
if v == 0 {
return nil

View file

@ -1,17 +0,0 @@
#!/usr/bin/env bash
# stolen from somewhere idk where tho tbh
GOPATH="${GOPATH:-"$HOME/go"}"
PATH="$PATH:$GOPATH/bin/"
# Check for golangci-lint
if [[ ! "which golangci-lint 2>&1 > /dev/null" ]]; then
printf "\t\033[41mPlease install golangci-lint\033"
exit 1
fi
golangci-lint run --fix --new-from-rev HEAD ./...
exit 0

View file

@ -59,10 +59,7 @@ func (s *Session) Connect(req ConnectionRequest) {
connAck.Properties.SharedSubscriptionAvailable.Value = &false
s.Connection = req.Connection
err := s.Connection.sendPacket(connAck)
if err != nil {
panic("TODO, handle this")
}
s.Connection.sendPacket(connAck)
}
// Starts a loop the recieves and responds to packets
@ -72,7 +69,7 @@ func (s *Session) HandlerLoop() {
select {
case packet := <-s.Connection.PacketChannel:
packet.Visit(s)
case <-s.Connection.ClientDisconnectedChan:
case _ = <-s.Connection.ClientDisconnectedChan:
s.onDisconnect()
case c := <-s.ConnecionChannel:
s.Connect(c)
@ -80,10 +77,7 @@ func (s *Session) HandlerLoop() {
// TODO implement other qos levels
subMessage.QOSLevel = 0
subMessage.Dup = false
err := s.Connection.sendPacket(subMessage)
if err != nil {
panic("TOOO handle this")
}
s.Connection.sendPacket(subMessage)
}
}
@ -92,7 +86,7 @@ func (s *Session) HandlerLoop() {
s.Connect(c)
// Tail recursion baybeeee
s.HandlerLoop()
case <- s.expireTimer.C:
case _ = <- s.expireTimer.C:
s.expireSession()
}
}

View file

@ -29,7 +29,6 @@ type Connection struct {
func (c *Connection) resetKeepAlive() {
if c.KeepAliveInterval != 0 {
panic("TODO")
// TODO IMPLEMENT THIS
//s.keepAliveTicker.Reset(s.KeepAliveInterval)
}

View file

@ -31,7 +31,7 @@ func (s *Session) expireSession() {
// newTime is nullable
func (s *Session) SetExpireTimer(newTime *uint32) {
expiry := uint32(0)
var expiry = uint32(0)
if newTime != nil {
expiry = *newTime
} else {

View file

@ -31,10 +31,7 @@ func (s *Session) VisitPublish(p packets.PublishPacket) {
PacketID: *p.PacketId,
Reason: reason,
}
err := s.Connection.sendPacket(ack)
if err != nil {
panic("TODO")
}
s.Connection.sendPacket(ack)
} else if p.QOSLevel == 2 {
panic("UNIMPLEMENTED QOS level 2")
}
@ -59,30 +56,24 @@ func (s *Session) VisitSubscribe(p packets.SubscribePacket) {
for _, filter := range p.TopicFilters {
subscription.Subscriptions.Subscribe(filter, s.SubscriptionChannel)
}
err := s.Connection.sendPacket(packets.SubAckPacket{
s.Connection.sendPacket(packets.SubAckPacket{
PacketID: p.PacketId,
Reason: packets.SubackReasonGrantedQoSTwo,
})
if err != nil {
panic("TODO")
}
}
func (s *Session) VisitUnsubscribe(p packets.UnsubscribePacket) {
for _, topic := range p.Topics {
subscription.Subscriptions.Unsubscribe(topic, s.SubscriptionChannel)
}
err := s.Connection.sendPacket(packets.UnsubAckPacket{
s.Connection.sendPacket(packets.UnsubAckPacket{
PacketID: p.PacketID,
Reason: packets.UnsubackReasonSuccess,
})
if err != nil {
panic("TODO")
}
}
func (s *Session) VisitPing(p packets.PingreqPacket) {
_ = s.Connection.sendPacket(packets.PingrespPacket{})
s.Connection.sendPacket(packets.PingrespPacket{})
}

View file

@ -24,14 +24,17 @@ func genClientID() *string {
return &id
}
func (s *Session) Disconnect(code packets.DisconnectReasonCode) {
// If disconnetion fails that means we are already disconnected, great!
_ = s.Connection.sendPacket(packets.DisconnectPacket{
func (s *Session) Disconnect(code packets.DisconnectReasonCode) error {
s.Connection.sendPacket(packets.DisconnectPacket{
ReasonCode: code,
})
_ = s.Connection.close()
err := s.Connection.close()
if err != nil {
return err
}
s.onDisconnect()
return nil
}

View file

@ -1,4 +1,5 @@
{ pkgs ? import <nixpkgs> {} }:
pkgs.mkShell {
nativeBuildInputs = with pkgs; [ go golangci-lint (enableDebugging mosquitto) wireshark delve ];
# nativeBuildInputs is usually what you want -- tools you need to run
nativeBuildInputs = with pkgs; [ (enableDebugging mosquitto) wireshark delve ];
}

View file

@ -52,7 +52,7 @@ func (s *SubscriptionTreeNode) findNode(fields []string) *SubscriptionTreeNode {
s.NodeLock.RLock()
}
child := s.children[field]
child, _ := s.children[field]
s.NodeLock.RUnlock()
return child.findNode(fields[1:])
}