2021-10-16 23:38:23 +02:00
|
|
|
package session
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
|
|
|
|
"badat.dev/maeqtt/v2/mqtt/packets"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Session struct {
|
|
|
|
ClientID *string
|
|
|
|
|
|
|
|
// Nullable
|
|
|
|
Connection *Connection
|
|
|
|
SubscriptionChannel chan packets.PublishPacket
|
|
|
|
ConnecionChannel chan ConnectionRequest
|
|
|
|
|
|
|
|
freePacketID uint16
|
|
|
|
|
|
|
|
Expiry
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewSession(req ConnectionRequest, rmSessChan RemoveSessionChannel) Session {
|
|
|
|
sess := Session{}
|
2021-10-19 12:00:53 +02:00
|
|
|
sess.SubscriptionChannel = make(chan packets.PublishPacket)
|
|
|
|
sess.ConnecionChannel = make(chan ConnectionRequest)
|
|
|
|
|
2021-10-16 23:38:23 +02:00
|
|
|
sess.Expiry = NewExpiry(rmSessChan)
|
|
|
|
|
|
|
|
sess.Connect(req)
|
|
|
|
return sess
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Session) Connect(req ConnectionRequest) {
|
2021-10-19 12:00:53 +02:00
|
|
|
s.stopExpireTimer()
|
|
|
|
|
2021-10-16 23:38:23 +02:00
|
|
|
if s.Connection != nil {
|
|
|
|
s.Disconnect(packets.DisconnectReasonCodeSessionTakenOver)
|
|
|
|
}
|
|
|
|
connAck := packets.ConnackPacket{}
|
|
|
|
|
2021-10-19 12:00:53 +02:00
|
|
|
s.SetExpireTimerDuration(req.ConnectPakcet.Properties.SessionExpiryInterval.Value)
|
2021-10-16 23:38:23 +02:00
|
|
|
|
|
|
|
if req.ConnectPakcet.ClientId == nil {
|
|
|
|
if s.ClientID == nil {
|
|
|
|
s.ClientID = genClientID()
|
|
|
|
}
|
|
|
|
connAck.Properties.AssignedClientIdentifier.Value = s.ClientID
|
2021-10-19 12:00:53 +02:00
|
|
|
} else if s.ClientID != nil && *s.ClientID != *req.ConnectPakcet.ClientId {
|
2021-10-16 23:38:23 +02:00
|
|
|
panic(fmt.Errorf("Session %s connect called with a connect packet with an ID: %s", *s.ClientID, *req.ConnectPakcet.ClientId))
|
|
|
|
} else {
|
|
|
|
s.ClientID = req.ConnectPakcet.ClientId
|
|
|
|
}
|
|
|
|
|
|
|
|
true := byte(1)
|
|
|
|
false := byte(0)
|
|
|
|
connAck.Properties.WildcardSubscriptionAvailable.Value = &true
|
|
|
|
|
|
|
|
connAck.Properties.RetainAvailable.Value = &false
|
|
|
|
connAck.Properties.SharedSubscriptionAvailable.Value = &false
|
|
|
|
|
|
|
|
s.Connection = req.Connection
|
2021-10-19 12:00:53 +02:00
|
|
|
_ = s.Connection.sendPacket(connAck)
|
|
|
|
go s.Connection.PacketReadLoop()
|
2021-10-16 23:38:23 +02:00
|
|
|
}
|
|
|
|
|
2021-10-17 21:27:17 +02:00
|
|
|
// Starts a loop the receives and responds to packets
|
2021-10-16 23:38:23 +02:00
|
|
|
func (s *Session) HandlerLoop() {
|
2021-10-19 12:00:53 +02:00
|
|
|
for {
|
|
|
|
var packetChan chan packets.ClientPacket
|
|
|
|
if s.Connection != nil {
|
|
|
|
packetChan = s.Connection.PacketChannel
|
|
|
|
}
|
2021-10-16 23:38:23 +02:00
|
|
|
select {
|
2021-10-19 12:00:53 +02:00
|
|
|
case packet, more := <-packetChan:
|
|
|
|
if more {
|
|
|
|
packet.Visit(s)
|
|
|
|
} else {
|
|
|
|
s.onDisconnect()
|
|
|
|
}
|
2021-10-16 23:38:23 +02:00
|
|
|
case c := <-s.ConnecionChannel:
|
|
|
|
s.Connect(c)
|
2021-10-19 12:00:53 +02:00
|
|
|
case <-s.expiryChannel():
|
|
|
|
s.expireSession()
|
|
|
|
break
|
2021-10-16 23:38:23 +02:00
|
|
|
case subMessage := <-s.SubscriptionChannel:
|
2021-10-19 12:00:53 +02:00
|
|
|
if s.Connection != nil {
|
|
|
|
// TODO implement other qos levels
|
|
|
|
subMessage.QOSLevel = 0
|
|
|
|
subMessage.Dup = false
|
|
|
|
err := s.Connection.sendPacket(subMessage)
|
|
|
|
if err != nil {
|
|
|
|
panic("TOOO handle this")
|
|
|
|
}
|
2021-10-17 20:58:16 +02:00
|
|
|
}
|
2021-10-16 23:38:23 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Session) onDisconnect() {
|
|
|
|
s.Connection = nil
|
2021-10-19 12:00:53 +02:00
|
|
|
s.startExpireTimer()
|
2021-10-16 23:38:23 +02:00
|
|
|
log.Printf("Client disconnected, id: %s", *s.ClientID)
|
|
|
|
}
|