|
@@ -0,0 +1,173 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "context"
|
|
|
+ "errors"
|
|
|
+ "fmt"
|
|
|
+ "git.swzry.com/zry/zry-go-program-framework/core"
|
|
|
+ "github.com/GUAIK-ORG/go-snowflake/snowflake"
|
|
|
+ "github.com/puzpuzpuz/xsync"
|
|
|
+ "net"
|
|
|
+ "sync"
|
|
|
+)
|
|
|
+
|
|
|
+var _ core.ISubService = (*TCPServerSubSvc)(nil)
|
|
|
+
|
|
|
+type TCPServerSubSvc struct {
|
|
|
+ bindAddr string
|
|
|
+ listener net.Listener
|
|
|
+ snflk *snowflake.Snowflake
|
|
|
+ sessions *xsync.MapOf[int64, *ClientSession]
|
|
|
+ rxRoutineCtx context.Context
|
|
|
+ rxRoutineCncl context.CancelFunc
|
|
|
+}
|
|
|
+
|
|
|
+func NewTCPServerSubSvc() *TCPServerSubSvc {
|
|
|
+ s := &TCPServerSubSvc{
|
|
|
+ sessions: xsync.NewIntegerMapOf[int64, *ClientSession](),
|
|
|
+ }
|
|
|
+ return s
|
|
|
+}
|
|
|
+
|
|
|
+func (s *TCPServerSubSvc) Prepare(ctx *core.SubServiceContext) error {
|
|
|
+ s.bindAddr = Config.Server.TCPConfig.Bind
|
|
|
+ if s.bindAddr == "" {
|
|
|
+ ctx.Info("bind address not specify, will use 'localhost:9600'")
|
|
|
+ s.bindAddr = "localhost:9600"
|
|
|
+ }
|
|
|
+ snflk, err := snowflake.NewSnowflake(0, 0)
|
|
|
+ if err != nil {
|
|
|
+ ctx.Error("failed init snowflake generator: ", err)
|
|
|
+ return errors.New("failed init snowflake generator")
|
|
|
+ }
|
|
|
+ s.snflk = snflk
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (s *TCPServerSubSvc) Run(ctx *core.SubServiceContext) error {
|
|
|
+ s.rxRoutineCtx, s.rxRoutineCncl = context.WithCancel(ctx.GetParentContext())
|
|
|
+ listener, err := net.Listen("tcp", s.bindAddr)
|
|
|
+ if err != nil {
|
|
|
+ ctx.ErrorF("failed listen at '%s': %v", s.bindAddr, err)
|
|
|
+ return errors.New("listen error")
|
|
|
+ }
|
|
|
+ s.listener = listener
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ var rxwg sync.WaitGroup
|
|
|
+ rxwg.Add(1)
|
|
|
+ go s.rxRoutine(&rxwg)
|
|
|
+ ctx.Info("rx routine started.")
|
|
|
+ ctx.InfoF("listening at '%s'", s.bindAddr)
|
|
|
+ for {
|
|
|
+ var conn net.Conn
|
|
|
+ conn, err = listener.Accept()
|
|
|
+ if err != nil {
|
|
|
+ ctx.Warn("error in accepting connection: ", err)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ sid := s.snflk.NextVal()
|
|
|
+ clog := ctx.GetSubLog(fmt.Sprintf("client-%16X", sid))
|
|
|
+ clog.VerboseF("client '%16X' from '%s' connected.", sid, conn.RemoteAddr())
|
|
|
+ sess := &ClientSession{
|
|
|
+ ID: sid,
|
|
|
+ Conn: conn,
|
|
|
+ Logger: clog,
|
|
|
+ ListenerWg: &wg,
|
|
|
+ }
|
|
|
+ s.sessions.Store(sid, sess)
|
|
|
+ wg.Add(1)
|
|
|
+ go s.handleClient(sess)
|
|
|
+ }
|
|
|
+ ctx.Info("listener end.")
|
|
|
+ ctx.Info("wait for rx routines end...")
|
|
|
+ rxwg.Wait()
|
|
|
+ ctx.Info("wait for all conn routines end...")
|
|
|
+ wg.Wait()
|
|
|
+ ctx.Info("tcp server end.")
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (s *TCPServerSubSvc) Stop(ctx *core.SubServiceContext) {
|
|
|
+ if s.listener != nil {
|
|
|
+ err := s.listener.Close()
|
|
|
+ if err != nil {
|
|
|
+ ctx.Warn("error in closing listener: ", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if s.rxRoutineCncl != nil {
|
|
|
+ s.rxRoutineCncl()
|
|
|
+ }
|
|
|
+ s.sessions.Range(func(key int64, value *ClientSession) bool {
|
|
|
+ ctx.DebugF("closing client '%16X'...", key)
|
|
|
+ c := value.Conn
|
|
|
+ if c != nil {
|
|
|
+ err := c.Close()
|
|
|
+ if err != nil {
|
|
|
+ if value.Logger != nil {
|
|
|
+ value.Logger.Warn("error in closing client: ", err)
|
|
|
+ } else {
|
|
|
+ ctx.WarnF("error in closing client '%16X': %v", key, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func (s *TCPServerSubSvc) rxRoutine(wg *sync.WaitGroup) {
|
|
|
+ defer wg.Done()
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-s.rxRoutineCtx.Done():
|
|
|
+ return
|
|
|
+ case rxdata := <-SubSvcSerial.RxChan:
|
|
|
+ s.broadcastToClient(rxdata)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (s *TCPServerSubSvc) broadcastToClient(data []byte) {
|
|
|
+ s.sessions.Range(func(key int64, value *ClientSession) bool {
|
|
|
+ c := value.Conn
|
|
|
+ if c != nil {
|
|
|
+ _, err := c.Write(data)
|
|
|
+ if err != nil {
|
|
|
+ if value.Logger != nil {
|
|
|
+ value.Logger.Verbose("write data error: ", err)
|
|
|
+ }
|
|
|
+ err = c.Close()
|
|
|
+ if err != nil && value.Logger != nil {
|
|
|
+ value.Logger.Verbose("error in closing conn: ", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func (s *TCPServerSubSvc) handleClient(sess *ClientSession) {
|
|
|
+ defer s.sessions.Delete(sess.ID)
|
|
|
+ defer sess.ListenerWg.Done()
|
|
|
+ defer sess.Conn.Close()
|
|
|
+ defer sess.Logger.CloseThisLog()
|
|
|
+ defer sess.Logger.VerboseF("client '%16X' end.", sess.ID)
|
|
|
+ buf := make([]byte, Config.Performance.TxBufferSize)
|
|
|
+ for {
|
|
|
+ n, err := sess.Conn.Read(buf)
|
|
|
+ if err != nil {
|
|
|
+ sess.Logger.Verbose("error in read: ", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ data := buf[:n]
|
|
|
+ if n > 0 {
|
|
|
+ SubSvcSerial.TxChan <- data
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+type ClientSession struct {
|
|
|
+ ID int64
|
|
|
+ Conn net.Conn
|
|
|
+ Logger core.IModuleLogger
|
|
|
+ ListenerWg *sync.WaitGroup
|
|
|
+}
|