diff --git a/main.go b/main.go index 1a89c79..125f09e 100644 --- a/main.go +++ b/main.go @@ -10,11 +10,11 @@ import ( ) func main() { - listen_addr := ":1883" - listener, err := net.Listen("tcp", listen_addr) + listenAddr := ":1883" + listener, err := net.Listen("tcp", listenAddr) if err != nil { - log.Fatalf("Coulde't start a listener on tcp %v. Error: %e", listen_addr, err) + log.Fatalf("Coulde't start a listener on tcp %v. Error: %e", listenAddr, err) } var sessions map[string]*session.Session = make(map[string]*session.Session) @@ -35,10 +35,10 @@ func main() { for { select { - case con := <- connChan: + case con := <-connChan: handleConnection(con, sessions, removeSessChan) - case sesId := <- removeSessChan: - delete(sessions, sesId) + case sesID := <-removeSessChan: + delete(sessions, sesID) } } } diff --git a/mqtt/packets/ControlPacket.go b/mqtt/packets/ControlPacket.go index 9c36317..3864f96 100644 --- a/mqtt/packets/ControlPacket.go +++ b/mqtt/packets/ControlPacket.go @@ -26,7 +26,13 @@ func (c controlPacket) write(w io.Writer) error { return err } - types.WriteDataWithVarIntLen(buf, data) - w.Write(buf.Bytes()) + err = types.WriteDataWithVarIntLen(buf, data) + if err != nil { + return err + } + _, err = w.Write(buf.Bytes()) + if err != nil { + return err + } return nil } diff --git a/mqtt/types/Encoding.go b/mqtt/types/Encoding.go index a07baee..df076ce 100644 --- a/mqtt/types/Encoding.go +++ b/mqtt/types/Encoding.go @@ -74,7 +74,10 @@ func WriteVariableByteInt(w io.Writer, v uint32) error { encodedByte = encodedByte | 128 } - w.Write([]byte{encodedByte}) + _,err := w.Write([]byte{encodedByte}) + if err != nil { + return err + } if v == 0 { return nil diff --git a/session/Session.go b/session/Session.go index 9f84d76..a7c23ac 100644 --- a/session/Session.go +++ b/session/Session.go @@ -59,7 +59,10 @@ func (s *Session) Connect(req ConnectionRequest) { connAck.Properties.SharedSubscriptionAvailable.Value = &false s.Connection = req.Connection - s.Connection.sendPacket(connAck) + err := s.Connection.sendPacket(connAck) + if err != nil { + panic("TODO, handle this") + } } // Starts a loop the recieves and responds to packets @@ -69,7 +72,7 @@ func (s *Session) HandlerLoop() { select { case packet := <-s.Connection.PacketChannel: packet.Visit(s) - case _ = <-s.Connection.ClientDisconnectedChan: + case <-s.Connection.ClientDisconnectedChan: s.onDisconnect() case c := <-s.ConnecionChannel: s.Connect(c) @@ -77,7 +80,10 @@ func (s *Session) HandlerLoop() { // TODO implement other qos levels subMessage.QOSLevel = 0 subMessage.Dup = false - s.Connection.sendPacket(subMessage) + err := s.Connection.sendPacket(subMessage) + if err != nil { + panic("TOOO handle this") + } } } @@ -86,7 +92,7 @@ func (s *Session) HandlerLoop() { s.Connect(c) // Tail recursion baybeeee s.HandlerLoop() - case _ = <- s.expireTimer.C: + case <- s.expireTimer.C: s.expireSession() } } diff --git a/session/connection.go b/session/connection.go index afdc69a..eb9ae1a 100644 --- a/session/connection.go +++ b/session/connection.go @@ -29,6 +29,7 @@ type Connection struct { func (c *Connection) resetKeepAlive() { if c.KeepAliveInterval != 0 { + panic("TODO") // TODO IMPLEMENT THIS //s.keepAliveTicker.Reset(s.KeepAliveInterval) } diff --git a/session/expiry.go b/session/expiry.go index 23ded86..81a827a 100644 --- a/session/expiry.go +++ b/session/expiry.go @@ -31,7 +31,7 @@ func (s *Session) expireSession() { // newTime is nullable func (s *Session) SetExpireTimer(newTime *uint32) { - var expiry = uint32(0) + expiry := uint32(0) if newTime != nil { expiry = *newTime } else { diff --git a/session/packetVisitors.go b/session/packetVisitors.go index cf59deb..f27b951 100644 --- a/session/packetVisitors.go +++ b/session/packetVisitors.go @@ -31,7 +31,10 @@ func (s *Session) VisitPublish(p packets.PublishPacket) { PacketID: *p.PacketId, Reason: reason, } - s.Connection.sendPacket(ack) + err := s.Connection.sendPacket(ack) + if err != nil { + panic("TODO") + } } else if p.QOSLevel == 2 { panic("UNIMPLEMENTED QOS level 2") } @@ -56,24 +59,30 @@ func (s *Session) VisitSubscribe(p packets.SubscribePacket) { for _, filter := range p.TopicFilters { subscription.Subscriptions.Subscribe(filter, s.SubscriptionChannel) } - s.Connection.sendPacket(packets.SubAckPacket{ + err := s.Connection.sendPacket(packets.SubAckPacket{ PacketID: p.PacketId, Reason: packets.SubackReasonGrantedQoSTwo, }) + if err != nil { + panic("TODO") + } } func (s *Session) VisitUnsubscribe(p packets.UnsubscribePacket) { for _, topic := range p.Topics { subscription.Subscriptions.Unsubscribe(topic, s.SubscriptionChannel) } - s.Connection.sendPacket(packets.UnsubAckPacket{ + err := s.Connection.sendPacket(packets.UnsubAckPacket{ PacketID: p.PacketID, Reason: packets.UnsubackReasonSuccess, }) + if err != nil { + panic("TODO") + } } func (s *Session) VisitPing(p packets.PingreqPacket) { - s.Connection.sendPacket(packets.PingrespPacket{}) + _ = s.Connection.sendPacket(packets.PingrespPacket{}) } diff --git a/session/utils.go b/session/utils.go index ea707d6..0b3f7f0 100644 --- a/session/utils.go +++ b/session/utils.go @@ -24,17 +24,14 @@ func genClientID() *string { return &id } -func (s *Session) Disconnect(code packets.DisconnectReasonCode) error { - s.Connection.sendPacket(packets.DisconnectPacket{ +func (s *Session) Disconnect(code packets.DisconnectReasonCode) { + // If disconnetion fails that means we are already disconnected, great! + _ = s.Connection.sendPacket(packets.DisconnectPacket{ ReasonCode: code, }) + _ = s.Connection.close() - err := s.Connection.close() - if err != nil { - return err - } s.onDisconnect() - return nil } diff --git a/subscription/subscription.go b/subscription/subscription.go index 4146f51..e218d19 100644 --- a/subscription/subscription.go +++ b/subscription/subscription.go @@ -52,7 +52,7 @@ func (s *SubscriptionTreeNode) findNode(fields []string) *SubscriptionTreeNode { s.NodeLock.RLock() } - child, _ := s.children[field] + child := s.children[field] s.NodeLock.RUnlock() return child.findNode(fields[1:]) }