123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package main
- import (
- "context"
- "errors"
- "fmt"
- "git.swzry.com/zry/zry-go-program-framework/core"
- "github.com/GUAIK-ORG/go-snowflake/snowflake"
- "github.com/puzpuzpuz/xsync"
- "net"
- "sync"
- )
- var _ core.ISubService = (*TCPServerSubSvc)(nil)
- type TCPServerSubSvc struct {
- bindAddr string
- listener net.Listener
- snflk *snowflake.Snowflake
- sessions *xsync.MapOf[int64, *ClientSession]
- rxRoutineCtx context.Context
- rxRoutineCncl context.CancelFunc
- }
- func NewTCPServerSubSvc() *TCPServerSubSvc {
- s := &TCPServerSubSvc{
- sessions: xsync.NewIntegerMapOf[int64, *ClientSession](),
- }
- return s
- }
- func (s *TCPServerSubSvc) Prepare(ctx *core.SubServiceContext) error {
- s.bindAddr = Config.Server.TCPConfig.Bind
- if s.bindAddr == "" {
- ctx.Info("bind address not specify, will use 'localhost:9600'")
- s.bindAddr = "localhost:9600"
- }
- snflk, err := snowflake.NewSnowflake(0, 0)
- if err != nil {
- ctx.Error("failed init snowflake generator: ", err)
- return errors.New("failed init snowflake generator")
- }
- s.snflk = snflk
- return nil
- }
- func (s *TCPServerSubSvc) Run(ctx *core.SubServiceContext) error {
- s.rxRoutineCtx, s.rxRoutineCncl = context.WithCancel(ctx.GetParentContext())
- listener, err := net.Listen("tcp", s.bindAddr)
- if err != nil {
- ctx.ErrorF("failed listen at '%s': %v", s.bindAddr, err)
- return errors.New("listen error")
- }
- s.listener = listener
- var wg sync.WaitGroup
- var rxwg sync.WaitGroup
- rxwg.Add(1)
- go s.rxRoutine(&rxwg)
- ctx.Info("rx routine started.")
- ctx.InfoF("listening at '%s'", s.bindAddr)
- for {
- var conn net.Conn
- conn, err = listener.Accept()
- if err != nil {
- ctx.Warn("error in accepting connection: ", err)
- break
- }
- sid := s.snflk.NextVal()
- clog := ctx.GetSubLog(fmt.Sprintf("client-%16X", sid))
- clog.VerboseF("client '%16X' from '%s' connected.", sid, conn.RemoteAddr())
- sess := &ClientSession{
- ID: sid,
- Conn: conn,
- Logger: clog,
- ListenerWg: &wg,
- }
- s.sessions.Store(sid, sess)
- wg.Add(1)
- go s.handleClient(sess)
- }
- ctx.Info("listener end.")
- ctx.Info("wait for rx routines end...")
- rxwg.Wait()
- ctx.Info("wait for all conn routines end...")
- wg.Wait()
- ctx.Info("tcp server end.")
- return nil
- }
- func (s *TCPServerSubSvc) Stop(ctx *core.SubServiceContext) {
- if s.listener != nil {
- err := s.listener.Close()
- if err != nil {
- ctx.Warn("error in closing listener: ", err)
- }
- }
- if s.rxRoutineCncl != nil {
- s.rxRoutineCncl()
- }
- s.sessions.Range(func(key int64, value *ClientSession) bool {
- ctx.DebugF("closing client '%16X'...", key)
- c := value.Conn
- if c != nil {
- err := c.Close()
- if err != nil {
- if value.Logger != nil {
- value.Logger.Warn("error in closing client: ", err)
- } else {
- ctx.WarnF("error in closing client '%16X': %v", key, err)
- }
- }
- }
- return true
- })
- }
- func (s *TCPServerSubSvc) rxRoutine(wg *sync.WaitGroup) {
- defer wg.Done()
- for {
- select {
- case <-s.rxRoutineCtx.Done():
- return
- case rxdata := <-SubSvcSerial.RxChan:
- s.broadcastToClient(rxdata)
- }
- }
- }
- func (s *TCPServerSubSvc) broadcastToClient(data []byte) {
- s.sessions.Range(func(key int64, value *ClientSession) bool {
- c := value.Conn
- if c != nil {
- _, err := c.Write(data)
- if err != nil {
- if value.Logger != nil {
- value.Logger.Verbose("write data error: ", err)
- }
- err = c.Close()
- if err != nil && value.Logger != nil {
- value.Logger.Verbose("error in closing conn: ", err)
- }
- }
- }
- return true
- })
- }
- func (s *TCPServerSubSvc) handleClient(sess *ClientSession) {
- defer s.sessions.Delete(sess.ID)
- defer sess.ListenerWg.Done()
- defer sess.Conn.Close()
- defer sess.Logger.CloseThisLog()
- defer sess.Logger.VerboseF("client '%16X' end.", sess.ID)
- buf := make([]byte, Config.Performance.TxBufferSize)
- for {
- n, err := sess.Conn.Read(buf)
- if err != nil {
- sess.Logger.Verbose("error in read: ", err)
- return
- }
- data := buf[:n]
- if n > 0 {
- SubSvcSerial.TxChan <- data
- }
- }
- }
- type ClientSession struct {
- ID int64
- Conn net.Conn
- Logger core.IModuleLogger
- ListenerWg *sync.WaitGroup
- }
|