package session import ( "io" "log" "badat.dev/maeqtt/v2/mqtt/packets" "badat.dev/maeqtt/v2/subscription" ) func (s *Session) VisitConnect(_ packets.ConnectPacket) { // Disconnect, we handle the connect packet in Connect, // this means that we have an estabilished connection already log.Println("WARN: Got a connect packet on an already estabilished connection") s.Disconnect(packets.DisconnectReasonCodeProtocolError) } func (s *Session) VisitPublish(p packets.PublishPacket) { subNodes := subscription.Subscriptions.GetSubscriptions(p.TopicName) for _, subNode := range subNodes { subNode.NodeLock.RLock() defer subNode.NodeLock.RUnlock() if p.QOSLevel == 0 { if p.PacketId != nil { log.Printf("Client: %v, Got publish with qos 0 and a packet id, ignoring\n", s.ClientID) return } } else if p.QOSLevel == 1 { var reason packets.PubackReasonCode = packets.PubackReasonCodeSuccess ack := packets.PubackPacket{ PacketID: *p.PacketId, Reason: reason, } s.Connection.sendPacket(ack) } else if p.QOSLevel == 2 { panic("UNIMPLEMENTED QOS level 2") } for _, sub := range subNode.Subscriptions { if !(sub.NoLocal && sub.SubscriptionChannel == s.SubscriptionChannel) { go func(sub subscription.Subscription) { sub.SubscriptionChannel <- p }(sub) } } } } func (s *Session) VisitDisconnect(p packets.DisconnectPacket) { err := s.Connection.close() if err != nil && err != io.ErrClosedPipe { log.Println("Error closing connection", err) } s.onDisconnect() } func (s *Session) VisitSubscribe(p packets.SubscribePacket) { for _, filter := range p.TopicFilters { subscription.Subscriptions.Subscribe(filter, s.SubscriptionChannel) } s.Connection.sendPacket(packets.SubAckPacket{ PacketID: p.PacketId, Reason: packets.SubackReasonGrantedQoSTwo, }) } func (s *Session) VisitUnsubscribe(p packets.UnsubscribePacket) { for _, topic := range p.Topics { subscription.Subscriptions.Unsubscribe(topic, s.SubscriptionChannel) } s.Connection.sendPacket(packets.UnsubAckPacket{ PacketID: p.PacketID, Reason: packets.UnsubackReasonSuccess, }) } func (s *Session) VisitPing(p packets.PingreqPacket) { s.Connection.sendPacket(packets.PingrespPacket{}) } //TODO implement QoSLevel 2 func (s *Session) VisitPubackPacket(_ packets.PubackPacket) { panic("not implemented") } func (s *Session) VisitPubrecPacket(_ packets.PubrecPacket) { panic("not implemented") } func (s *Session) VisitPubrelPacket(_ packets.PubrelPacket) { panic("not implemented") } func (s *Session) VisitPubcompPacket(_ packets.PubcompPacket) { panic("not implemented") }