package main import ( "context" "errors" "fmt" "git.swzry.com/zry/zry-go-program-framework/core" "github.com/GUAIK-ORG/go-snowflake/snowflake" "github.com/puzpuzpuz/xsync" "net" "sync" ) var _ core.ISubService = (*TCPServerSubSvc)(nil) type TCPServerSubSvc struct { bindAddr string listener net.Listener snflk *snowflake.Snowflake sessions *xsync.MapOf[int64, *ClientSession] rxRoutineCtx context.Context rxRoutineCncl context.CancelFunc } func NewTCPServerSubSvc() *TCPServerSubSvc { s := &TCPServerSubSvc{ sessions: xsync.NewIntegerMapOf[int64, *ClientSession](), } return s } func (s *TCPServerSubSvc) Prepare(ctx *core.SubServiceContext) error { s.bindAddr = Config.Server.TCPConfig.Bind if s.bindAddr == "" { ctx.Info("bind address not specify, will use 'localhost:9600'") s.bindAddr = "localhost:9600" } snflk, err := snowflake.NewSnowflake(0, 0) if err != nil { ctx.Error("failed init snowflake generator: ", err) return errors.New("failed init snowflake generator") } s.snflk = snflk return nil } func (s *TCPServerSubSvc) Run(ctx *core.SubServiceContext) error { s.rxRoutineCtx, s.rxRoutineCncl = context.WithCancel(ctx.GetParentContext()) listener, err := net.Listen("tcp", s.bindAddr) if err != nil { ctx.ErrorF("failed listen at '%s': %v", s.bindAddr, err) return errors.New("listen error") } s.listener = listener var wg sync.WaitGroup var rxwg sync.WaitGroup rxwg.Add(1) go s.rxRoutine(&rxwg) ctx.Info("rx routine started.") ctx.InfoF("listening at '%s'", s.bindAddr) for { var conn net.Conn conn, err = listener.Accept() if err != nil { ctx.Warn("error in accepting connection: ", err) break } sid := s.snflk.NextVal() clog := ctx.GetSubLog(fmt.Sprintf("client-%16X", sid)) clog.VerboseF("client '%16X' from '%s' connected.", sid, conn.RemoteAddr()) sess := &ClientSession{ ID: sid, Conn: conn, Logger: clog, ListenerWg: &wg, } s.sessions.Store(sid, sess) wg.Add(1) go s.handleClient(sess) } ctx.Info("listener end.") ctx.Info("wait for rx routines end...") rxwg.Wait() ctx.Info("wait for all conn routines end...") wg.Wait() ctx.Info("tcp server end.") return nil } func (s *TCPServerSubSvc) Stop(ctx *core.SubServiceContext) { if s.listener != nil { err := s.listener.Close() if err != nil { ctx.Warn("error in closing listener: ", err) } } if s.rxRoutineCncl != nil { s.rxRoutineCncl() } s.sessions.Range(func(key int64, value *ClientSession) bool { ctx.DebugF("closing client '%16X'...", key) c := value.Conn if c != nil { err := c.Close() if err != nil { if value.Logger != nil { value.Logger.Warn("error in closing client: ", err) } else { ctx.WarnF("error in closing client '%16X': %v", key, err) } } } return true }) } func (s *TCPServerSubSvc) rxRoutine(wg *sync.WaitGroup) { defer wg.Done() for { select { case <-s.rxRoutineCtx.Done(): return case rxdata := <-SubSvcSerial.RxChan: s.broadcastToClient(rxdata) } } } func (s *TCPServerSubSvc) broadcastToClient(data []byte) { s.sessions.Range(func(key int64, value *ClientSession) bool { c := value.Conn if c != nil { _, err := c.Write(data) if err != nil { if value.Logger != nil { value.Logger.Verbose("write data error: ", err) } err = c.Close() if err != nil && value.Logger != nil { value.Logger.Verbose("error in closing conn: ", err) } } } return true }) } func (s *TCPServerSubSvc) handleClient(sess *ClientSession) { defer s.sessions.Delete(sess.ID) defer sess.ListenerWg.Done() defer sess.Conn.Close() defer sess.Logger.CloseThisLog() defer sess.Logger.VerboseF("client '%16X' end.", sess.ID) buf := make([]byte, Config.Performance.TxBufferSize) for { n, err := sess.Conn.Read(buf) if err != nil { sess.Logger.Verbose("error in read: ", err) return } data := buf[:n] if n > 0 { SubSvcSerial.TxChan <- data } } } type ClientSession struct { ID int64 Conn net.Conn Logger core.IModuleLogger ListenerWg *sync.WaitGroup }