package main import ( "log" "net" "runtime/debug" "badat.dev/maeqtt/v2/mqtt/packets" "badat.dev/maeqtt/v2/session" ) func main() { listenAddr := ":1883" listener, err := net.Listen("tcp", listenAddr) if err != nil { log.Fatalf("Coulde't start a listener on tcp %v. Error: %e", listenAddr, err) } var sessions map[string]*session.Session = make(map[string]*session.Session) removeSessChan := make(session.RemoveSessionChannel) connChan := make(chan net.Conn) go func() { for { conn, err := listener.Accept() if err != nil { log.Println("Failed accepting connection ", err) } else { connChan <- conn } } }() for { select { case con := <-connChan: handleConnection(con, sessions, removeSessChan) case sesID := <-removeSessChan: delete(sessions, sesID) } } } func handleConnection(con net.Conn, sessions map[string]*session.Session, rmSessChan session.RemoveSessionChannel) { defer handlePanic(con) conReq, err := session.NewConnection(con) if err != nil { log.Println("Failed to create connection ", err) return } var sess *session.Session if conReq.ConnectPakcet.ClientId != nil { exists := false sess, exists = sessions[*conReq.ConnectPakcet.ClientId] if exists { log.Printf("Resuming session %v", *sess.ClientID) sess.ConnecionChannel <- conReq } } if sess == nil { newSess := session.NewSession(conReq, rmSessChan) sess = &newSess sessions[*sess.ClientID] = sess log.Printf("New session %v", *sess.ClientID) go func() { defer handlePanic(con) sess.HandlerLoop() }() } } func handlePanic(con net.Conn) { if r := recover(); r != nil { log.Println("Recovering from panic:", r) log.Println("Stack Trace:") debug.PrintStack() err := packets.DisconnectPacket{ ReasonCode: packets.DisconnectReasonCodeImplErorr, }.Write(con) if err != nil { log.Println("Failed to send a disconnect packet after recovering from panic", err) } } }