maeqtt/session/Session.go

105 lines
2.4 KiB
Go
Raw Normal View History

2021-10-16 23:38:23 +02:00
package session
import (
"fmt"
"log"
"badat.dev/maeqtt/v2/mqtt/packets"
)
type Session struct {
ClientID *string
// Nullable
Connection *Connection
SubscriptionChannel chan packets.PublishPacket
ConnecionChannel chan ConnectionRequest
freePacketID uint16
Expiry
}
func NewSession(req ConnectionRequest, rmSessChan RemoveSessionChannel) Session {
sess := Session{}
sess.SubscriptionChannel = make(chan packets.PublishPacket,)
sess.Expiry = NewExpiry(rmSessChan)
sess.Connect(req)
return sess
}
func (s *Session) Connect(req ConnectionRequest) {
if s.Connection != nil {
s.Disconnect(packets.DisconnectReasonCodeSessionTakenOver)
}
connAck := packets.ConnackPacket{}
s.SetExpireTimer(req.ConnectPakcet.Properties.SessionExpiryInterval.Value)
s.expireTimer.Stop()
if req.ConnectPakcet.ClientId == nil {
if s.ClientID == nil {
s.ClientID = genClientID()
}
connAck.Properties.AssignedClientIdentifier.Value = s.ClientID
} else if s.ClientID != nil && s.ClientID != req.ConnectPakcet.ClientId {
panic(fmt.Errorf("Session %s connect called with a connect packet with an ID: %s", *s.ClientID, *req.ConnectPakcet.ClientId))
} else {
s.ClientID = req.ConnectPakcet.ClientId
}
true := byte(1)
false := byte(0)
connAck.Properties.WildcardSubscriptionAvailable.Value = &true
connAck.Properties.RetainAvailable.Value = &false
connAck.Properties.SharedSubscriptionAvailable.Value = &false
s.Connection = req.Connection
2021-10-17 20:58:16 +02:00
err := s.Connection.sendPacket(connAck)
if err != nil {
panic("TODO, handle this")
}
2021-10-16 23:38:23 +02:00
}
2021-10-17 21:27:17 +02:00
// Starts a loop the receives and responds to packets
2021-10-16 23:38:23 +02:00
func (s *Session) HandlerLoop() {
go s.Connection.PacketReadLoop()
for s.Connection != nil {
select {
case packet := <-s.Connection.PacketChannel:
packet.Visit(s)
2021-10-17 20:58:16 +02:00
case <-s.Connection.ClientDisconnectedChan:
2021-10-16 23:38:23 +02:00
s.onDisconnect()
case c := <-s.ConnecionChannel:
s.Connect(c)
case subMessage := <-s.SubscriptionChannel:
// TODO implement other qos levels
subMessage.QOSLevel = 0
subMessage.Dup = false
2021-10-17 20:58:16 +02:00
err := s.Connection.sendPacket(subMessage)
if err != nil {
panic("TOOO handle this")
}
2021-10-16 23:38:23 +02:00
}
}
select {
case c := <-s.ConnecionChannel:
s.Connect(c)
// Tail recursion baybeeee
s.HandlerLoop()
2021-10-17 20:58:16 +02:00
case <- s.expireTimer.C:
2021-10-16 23:38:23 +02:00
s.expireSession()
}
}
func (s *Session) onDisconnect() {
s.Connection = nil
s.resetExpireTimer()
log.Printf("Client disconnected, id: %s", *s.ClientID)
}