maeqtt/session/packetVisitors.go

105 lines
2.7 KiB
Go
Raw Normal View History

2021-10-16 23:38:23 +02:00
package session
import (
"io"
"log"
"badat.dev/maeqtt/v2/mqtt/packets"
"badat.dev/maeqtt/v2/subscription"
)
func (s *Session) VisitConnect(_ packets.ConnectPacket) {
// Disconnect, we handle the connect packet in Connect,
// this means that we have an estabilished connection already
log.Println("WARN: Got a connect packet on an already estabilished connection")
s.Disconnect(packets.DisconnectReasonCodeProtocolError)
}
func (s *Session) VisitPublish(p packets.PublishPacket) {
subNodes := subscription.Subscriptions.GetSubscriptions(p.TopicName)
for _, subNode := range subNodes {
subNode.NodeLock.RLock()
defer subNode.NodeLock.RUnlock()
if p.QOSLevel == 0 {
if p.PacketId != nil {
log.Printf("Client: %v, Got publish with qos 0 and a packet id, ignoring\n", s.ClientID)
return
}
} else if p.QOSLevel == 1 {
var reason packets.PubackReasonCode = packets.PubackReasonCodeSuccess
ack := packets.PubackPacket{
PacketID: *p.PacketId,
Reason: reason,
}
2021-10-17 20:58:16 +02:00
err := s.Connection.sendPacket(ack)
if err != nil {
panic("TODO")
}
2021-10-16 23:38:23 +02:00
} else if p.QOSLevel == 2 {
panic("UNIMPLEMENTED QOS level 2")
}
for _, sub := range subNode.Subscriptions {
if !(sub.NoLocal && sub.SubscriptionChannel == s.SubscriptionChannel) {
go func(sub subscription.Subscription) { sub.SubscriptionChannel <- p }(sub)
}
}
}
}
func (s *Session) VisitDisconnect(p packets.DisconnectPacket) {
err := s.Connection.close()
if err != nil && err != io.ErrClosedPipe {
log.Println("Error closing connection", err)
}
s.onDisconnect()
}
func (s *Session) VisitSubscribe(p packets.SubscribePacket) {
for _, filter := range p.TopicFilters {
subscription.Subscriptions.Subscribe(filter, s.SubscriptionChannel)
}
2021-10-17 20:58:16 +02:00
err := s.Connection.sendPacket(packets.SubAckPacket{
2021-10-16 23:38:23 +02:00
PacketID: p.PacketId,
Reason: packets.SubackReasonGrantedQoSTwo,
})
2021-10-17 20:58:16 +02:00
if err != nil {
panic("TODO")
}
2021-10-16 23:38:23 +02:00
}
func (s *Session) VisitUnsubscribe(p packets.UnsubscribePacket) {
for _, topic := range p.Topics {
subscription.Subscriptions.Unsubscribe(topic, s.SubscriptionChannel)
}
2021-10-17 20:58:16 +02:00
err := s.Connection.sendPacket(packets.UnsubAckPacket{
2021-10-16 23:38:23 +02:00
PacketID: p.PacketID,
Reason: packets.UnsubackReasonSuccess,
})
2021-10-17 20:58:16 +02:00
if err != nil {
panic("TODO")
}
2021-10-16 23:38:23 +02:00
}
func (s *Session) VisitPing(p packets.PingreqPacket) {
2021-10-17 20:58:16 +02:00
_ = s.Connection.sendPacket(packets.PingrespPacket{})
2021-10-16 23:38:23 +02:00
}
2021-10-16 23:58:47 +02:00
//TODO implement QoSLevel 2
2021-10-16 23:38:23 +02:00
func (s *Session) VisitPubackPacket(_ packets.PubackPacket) {
2021-10-16 23:58:47 +02:00
panic("not implemented")
2021-10-16 23:38:23 +02:00
}
func (s *Session) VisitPubrecPacket(_ packets.PubrecPacket) {
2021-10-16 23:58:47 +02:00
panic("not implemented")
2021-10-16 23:38:23 +02:00
}
func (s *Session) VisitPubrelPacket(_ packets.PubrelPacket) {
2021-10-16 23:58:47 +02:00
panic("not implemented")
2021-10-16 23:38:23 +02:00
}
func (s *Session) VisitPubcompPacket(_ packets.PubcompPacket) {
2021-10-16 23:58:47 +02:00
panic("not implemented")
2021-10-16 23:38:23 +02:00
}