tcp_server.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "git.swzry.com/zry/zry-go-program-framework/core"
  7. "github.com/GUAIK-ORG/go-snowflake/snowflake"
  8. "github.com/puzpuzpuz/xsync"
  9. "net"
  10. "sync"
  11. )
  12. var _ core.ISubService = (*TCPServerSubSvc)(nil)
  13. type TCPServerSubSvc struct {
  14. bindAddr string
  15. listener net.Listener
  16. snflk *snowflake.Snowflake
  17. sessions *xsync.MapOf[int64, *ClientSession]
  18. rxRoutineCtx context.Context
  19. rxRoutineCncl context.CancelFunc
  20. }
  21. func NewTCPServerSubSvc() *TCPServerSubSvc {
  22. s := &TCPServerSubSvc{
  23. sessions: xsync.NewIntegerMapOf[int64, *ClientSession](),
  24. }
  25. return s
  26. }
  27. func (s *TCPServerSubSvc) Prepare(ctx *core.SubServiceContext) error {
  28. s.bindAddr = Config.Server.TCPConfig.Bind
  29. if s.bindAddr == "" {
  30. ctx.Info("bind address not specify, will use 'localhost:9600'")
  31. s.bindAddr = "localhost:9600"
  32. }
  33. snflk, err := snowflake.NewSnowflake(0, 0)
  34. if err != nil {
  35. ctx.Error("failed init snowflake generator: ", err)
  36. return errors.New("failed init snowflake generator")
  37. }
  38. s.snflk = snflk
  39. return nil
  40. }
  41. func (s *TCPServerSubSvc) Run(ctx *core.SubServiceContext) error {
  42. s.rxRoutineCtx, s.rxRoutineCncl = context.WithCancel(ctx.GetParentContext())
  43. listener, err := net.Listen("tcp", s.bindAddr)
  44. if err != nil {
  45. ctx.ErrorF("failed listen at '%s': %v", s.bindAddr, err)
  46. return errors.New("listen error")
  47. }
  48. s.listener = listener
  49. var wg sync.WaitGroup
  50. var rxwg sync.WaitGroup
  51. rxwg.Add(1)
  52. go s.rxRoutine(&rxwg)
  53. ctx.Info("rx routine started.")
  54. ctx.InfoF("listening at '%s'", s.bindAddr)
  55. for {
  56. var conn net.Conn
  57. conn, err = listener.Accept()
  58. if err != nil {
  59. ctx.Warn("error in accepting connection: ", err)
  60. break
  61. }
  62. sid := s.snflk.NextVal()
  63. clog := ctx.GetSubLog(fmt.Sprintf("client-%16X", sid))
  64. clog.VerboseF("client '%16X' from '%s' connected.", sid, conn.RemoteAddr())
  65. sess := &ClientSession{
  66. ID: sid,
  67. Conn: conn,
  68. Logger: clog,
  69. ListenerWg: &wg,
  70. }
  71. s.sessions.Store(sid, sess)
  72. wg.Add(1)
  73. go s.handleClient(sess)
  74. }
  75. ctx.Info("listener end.")
  76. ctx.Info("wait for rx routines end...")
  77. rxwg.Wait()
  78. ctx.Info("wait for all conn routines end...")
  79. wg.Wait()
  80. ctx.Info("tcp server end.")
  81. return nil
  82. }
  83. func (s *TCPServerSubSvc) Stop(ctx *core.SubServiceContext) {
  84. if s.listener != nil {
  85. err := s.listener.Close()
  86. if err != nil {
  87. ctx.Warn("error in closing listener: ", err)
  88. }
  89. }
  90. if s.rxRoutineCncl != nil {
  91. s.rxRoutineCncl()
  92. }
  93. s.sessions.Range(func(key int64, value *ClientSession) bool {
  94. ctx.DebugF("closing client '%16X'...", key)
  95. c := value.Conn
  96. if c != nil {
  97. err := c.Close()
  98. if err != nil {
  99. if value.Logger != nil {
  100. value.Logger.Warn("error in closing client: ", err)
  101. } else {
  102. ctx.WarnF("error in closing client '%16X': %v", key, err)
  103. }
  104. }
  105. }
  106. return true
  107. })
  108. }
  109. func (s *TCPServerSubSvc) rxRoutine(wg *sync.WaitGroup) {
  110. defer wg.Done()
  111. for {
  112. select {
  113. case <-s.rxRoutineCtx.Done():
  114. return
  115. case rxdata := <-SubSvcSerial.RxChan:
  116. s.broadcastToClient(rxdata)
  117. }
  118. }
  119. }
  120. func (s *TCPServerSubSvc) broadcastToClient(data []byte) {
  121. s.sessions.Range(func(key int64, value *ClientSession) bool {
  122. c := value.Conn
  123. if c != nil {
  124. _, err := c.Write(data)
  125. if err != nil {
  126. if value.Logger != nil {
  127. value.Logger.Verbose("write data error: ", err)
  128. }
  129. err = c.Close()
  130. if err != nil && value.Logger != nil {
  131. value.Logger.Verbose("error in closing conn: ", err)
  132. }
  133. }
  134. }
  135. return true
  136. })
  137. }
  138. func (s *TCPServerSubSvc) handleClient(sess *ClientSession) {
  139. defer s.sessions.Delete(sess.ID)
  140. defer sess.ListenerWg.Done()
  141. defer sess.Conn.Close()
  142. defer sess.Logger.CloseThisLog()
  143. defer sess.Logger.VerboseF("client '%16X' end.", sess.ID)
  144. buf := make([]byte, Config.Performance.TxBufferSize)
  145. for {
  146. n, err := sess.Conn.Read(buf)
  147. if err != nil {
  148. sess.Logger.Verbose("error in read: ", err)
  149. return
  150. }
  151. data := buf[:n]
  152. if n > 0 {
  153. SubSvcSerial.TxChan <- data
  154. }
  155. }
  156. }
  157. type ClientSession struct {
  158. ID int64
  159. Conn net.Conn
  160. Logger core.IModuleLogger
  161. ListenerWg *sync.WaitGroup
  162. }