maeqtt/session/connection.go

112 lines
2.5 KiB
Go
Raw Normal View History

2021-10-01 22:18:48 +02:00
package session
2021-09-28 12:30:32 +02:00
import (
"bufio"
2021-10-07 22:01:52 +02:00
"errors"
2021-09-28 12:30:32 +02:00
"io"
2021-10-07 22:01:52 +02:00
"log"
2021-09-28 12:30:32 +02:00
"time"
"badat.dev/maeqtt/v2/mqtt/packets"
)
type Connection struct {
MaxPacketSize *uint32
RecvMax uint16
TopicAliasMax uint16
WantsRespInf bool
WantsProblemInf bool
Will packets.Will
KeepAliveInterval time.Duration
keepAliveTicker time.Ticker
PacketChannel chan packets.ClientPacket
ClientDisconnectedChan chan bool
rw io.ReadWriteCloser
}
func (c *Connection) resetKeepAlive() {
if c.KeepAliveInterval != 0 {
2021-10-17 20:58:16 +02:00
panic("TODO")
2021-09-28 12:30:32 +02:00
// TODO IMPLEMENT THIS
//s.keepAliveTicker.Reset(s.KeepAliveInterval)
}
}
func (c *Connection) readPacket() (*packets.ClientPacket, error) {
return packets.ReadPacket(bufio.NewReader(c.rw))
}
func (c *Connection) sendPacket(p packets.ServerPacket) error {
c.resetKeepAlive()
return p.Write(c.rw)
}
func (c *Connection) close() error {
close(c.PacketChannel)
return c.rw.Close()
}
2021-10-07 22:01:52 +02:00
func (c *Connection) PacketReadLoop() {
2021-09-28 12:30:32 +02:00
for {
pack, err := c.readPacket()
2021-10-07 22:01:52 +02:00
if err != nil {
2021-09-28 12:30:32 +02:00
c.ClientDisconnectedChan <- true
2021-10-07 22:01:52 +02:00
c.close()
2021-09-28 12:30:32 +02:00
} else {
c.PacketChannel <- *pack
}
}
}
2021-10-07 22:01:52 +02:00
var FirstPackNotConnect error = errors.New("Failed to connect, first packet is not connect")
func NewConnection(rw io.ReadWriteCloser) (ConnectionRequest, error) {
connReq := ConnectionRequest{}
2021-09-28 12:30:32 +02:00
conn := Connection{}
2021-10-07 22:01:52 +02:00
connReq.Connection = &conn
2021-09-28 12:30:32 +02:00
conn.rw = rw
2021-10-07 22:01:52 +02:00
packet, err := conn.readPacket()
conPack, isConn := (*packet).(packets.ConnectPacket)
if !isConn {
log.Println("Didn't recieve a connect packet")
err := packets.DisconnectPacket{
ReasonCode: packets.DisconnectReasonCodeProtocolError,
}.Write(rw)
if err != nil {
log.Println("Failed to disconnect after not recieving a connect packet", err)
}
return connReq, FirstPackNotConnect
}
connReq.ConnectPakcet = conPack
2021-09-28 12:30:32 +02:00
2021-10-07 22:01:52 +02:00
if conPack.Properties.ReceiveMaximum.Value != nil {
conn.RecvMax = *conPack.Properties.ReceiveMaximum.Value
2021-09-28 12:30:32 +02:00
} else {
conn.RecvMax = 65535
}
2021-10-07 22:01:52 +02:00
conn.MaxPacketSize = conPack.Properties.MaximumPacketSize.Value
2021-09-28 12:30:32 +02:00
2021-10-07 22:01:52 +02:00
if conPack.Properties.TopicAliasMaximum.Value != nil {
conn.TopicAliasMax = *conPack.Properties.TopicAliasMaximum.Value
2021-09-28 12:30:32 +02:00
} else {
conn.TopicAliasMax = 0
}
2021-10-07 22:01:52 +02:00
if conPack.Properties.RequestProblemInformation.Value != nil {
conn.WantsRespInf = *conPack.Properties.RequestProblemInformation.Value != 0
2021-09-28 12:30:32 +02:00
} else {
conn.WantsRespInf = false
}
2021-10-07 22:01:52 +02:00
conn.KeepAliveInterval = time.Duration(conPack.KeepAliveInterval) * time.Second
2021-09-28 12:30:32 +02:00
2021-10-07 22:01:52 +02:00
conn.PacketChannel = make(chan packets.ClientPacket, 1)
2021-09-28 12:30:32 +02:00
2021-10-07 22:01:52 +02:00
return connReq, err
2021-09-28 12:30:32 +02:00
}