maeqtt/main.go
2021-10-16 23:49:49 +02:00

86 lines
1.8 KiB
Go

package main
import (
"log"
"net"
"runtime/debug"
"badat.dev/maeqtt/v2/mqtt/packets"
"badat.dev/maeqtt/v2/session"
)
func main() {
listen_addr := ":1883"
listener, err := net.Listen("tcp", listen_addr)
if err != nil {
log.Fatalf("Coulde't start a listener on tcp %v. Error: %e", listen_addr, err)
}
var sessions map[string]*session.Session = make(map[string]*session.Session)
removeSessChan := make(session.RemoveSessionChannel)
connChan := make(chan net.Conn)
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Println("Failed accepting connection ", err)
} else {
connChan <- conn
}
}
}()
for {
select {
case con := <- connChan:
handleConnection(con, sessions, removeSessChan)
case sesId := <- removeSessChan:
delete(sessions, sesId)
}
}
}
func handleConnection(con net.Conn, sessions map[string]*session.Session, rmSessChan session.RemoveSessionChannel) {
defer handlePanic(con)
conReq, err := session.NewConnection(con)
if err != nil {
log.Println("Failed to create connection ", err)
return
}
var sess *session.Session
if conReq.ConnectPakcet.ClientId != nil {
sess, exists := sessions[*conReq.ConnectPakcet.ClientId]
if exists {
sess.ConnecionChannel <- conReq
}
}
if sess == nil {
newSess := session.NewSession(conReq, rmSessChan)
sess = &newSess
go func() {
defer handlePanic(con)
sess.HandlerLoop()
}()
}
}
func handlePanic(con net.Conn) {
if r := recover(); r != nil {
log.Println("Recovering from panic:", r)
log.Println("Stack Trace:")
debug.PrintStack()
err := packets.DisconnectPacket{
ReasonCode: packets.DisconnectReasonCodeImplErorr,
}.Write(con)
if err != nil {
log.Println("Failed to send a disconnect packet after recovering from panic", err)
}
}
}