server.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package httppm
  2. import (
  3. "context"
  4. "fmt"
  5. "git.swzry.com/zry/GoHiedaLogger/hiedalog"
  6. "git.swzry.com/zry/ran-proc/rpcore"
  7. "github.com/gin-gonic/gin"
  8. "github.com/gorilla/websocket"
  9. "net/http"
  10. "time"
  11. )
  12. type ServerHandlerConfig struct {
  13. GinGrp *gin.RouterGroup
  14. BaseURL string
  15. TimeoutStopFunc StopFunc
  16. Logger *hiedalog.HiedaLogger
  17. LogModuleName string
  18. WebsocketPingDuration time.Duration
  19. }
  20. func NewServerHandler(cfg *ServerHandlerConfig) *HPMServerHandler {
  21. wsup := &websocket.Upgrader{
  22. CheckOrigin: func(r *http.Request) bool {
  23. return true
  24. },
  25. EnableCompression: false,
  26. }
  27. h := &HPMServerHandler{
  28. ginGrp: cfg.GinGrp,
  29. baseURL: cfg.BaseURL,
  30. processMapByAuthID: map[string]*ProcessInfoItem{},
  31. processMapByCPID: map[int64]*ProcessInfoItem{},
  32. stopFunc: cfg.TimeoutStopFunc,
  33. logger: cfg.Logger,
  34. logModuleName: cfg.LogModuleName,
  35. upgrader: wsup,
  36. wsPingDur: cfg.WebsocketPingDuration,
  37. }
  38. h.ginGrp.GET("/attach.nagae", h.whAttach)
  39. return h
  40. }
  41. type StopFunc func(cpid int64)
  42. type NewProcessConfig struct {
  43. AuthID string
  44. ConnectTimeout time.Duration
  45. }
  46. type ProcessInfoItem struct {
  47. CPID int64
  48. AuthID string
  49. chAttached chan int
  50. isAttached bool
  51. wsCancel context.CancelFunc
  52. }
  53. type HPMServerHandler struct {
  54. ginGrp *gin.RouterGroup
  55. baseURL string
  56. processMapByAuthID map[string]*ProcessInfoItem
  57. processMapByCPID map[int64]*ProcessInfoItem
  58. stopFunc StopFunc
  59. logger *hiedalog.HiedaLogger
  60. logModuleName string
  61. upgrader *websocket.Upgrader
  62. wsPingDur time.Duration
  63. }
  64. func (h *HPMServerHandler) whAttach(ctx *gin.Context) {
  65. authID := ctx.Query("authid")
  66. pii, ok := h.processMapByAuthID[authID]
  67. if !ok {
  68. h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_VERBOSE, "attach authID not found: authID=%d", authID)
  69. ctx.JSON(404, &gin.H{
  70. "status": "auth-id-not-found",
  71. "err_msg": "authID not found",
  72. })
  73. return
  74. }
  75. wsconn, err := h.upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
  76. if err != nil {
  77. h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_VERBOSE, "failed upgrading to websocket (cpid=%d): %v", pii.CPID, err)
  78. ctx.JSON(500, gin.H{
  79. "err_msg": "failed upgrading to websocket",
  80. "status": "ws-upgrade-error",
  81. })
  82. return
  83. }
  84. defer wsconn.Close()
  85. ticker := time.NewTicker(h.wsPingDur)
  86. defer ticker.Stop()
  87. pii.chAttached <- 1
  88. wsctx, wscncl := context.WithCancel(context.Background())
  89. pii.wsCancel = wscncl
  90. pii.isAttached = true
  91. go func() {
  92. for {
  93. _, _, rerr := wsconn.ReadMessage()
  94. if rerr != nil {
  95. h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_VERBOSE, "websocket read failed (cpid=%d): %v", pii.CPID, rerr)
  96. _ = wsconn.Close()
  97. wscncl()
  98. return
  99. }
  100. }
  101. }()
  102. for {
  103. select {
  104. case <-wsctx.Done():
  105. _ = wsconn.Close()
  106. return
  107. case <-ticker.C:
  108. xerr := wsconn.WriteMessage(websocket.PingMessage, nil)
  109. if xerr != nil {
  110. _ = wsconn.Close()
  111. return
  112. }
  113. }
  114. }
  115. }
  116. func (h *HPMServerHandler) NewProcess(npcfg *NewProcessConfig) (string, error) {
  117. pii := &ProcessInfoItem{
  118. CPID: 0,
  119. AuthID: npcfg.AuthID,
  120. chAttached: make(chan int),
  121. isAttached: false,
  122. }
  123. h.processMapByAuthID[npcfg.AuthID] = pii
  124. uri := fmt.Sprintf("%s/attach.nagae?authid=%s", h.baseURL, npcfg.AuthID)
  125. go func() {
  126. select {
  127. case <-pii.chAttached:
  128. h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_INFO, "HPM client attached (CPID=%d).", pii.CPID)
  129. return
  130. case <-time.After(npcfg.ConnectTimeout):
  131. h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_WARN, "HPM client attach timeout (CPID=%d).", pii.CPID)
  132. if pii.CPID != 0 {
  133. h.stopFunc(pii.CPID)
  134. delete(h.processMapByCPID, pii.CPID)
  135. }
  136. delete(h.processMapByAuthID, npcfg.AuthID)
  137. return
  138. }
  139. }()
  140. return uri, nil
  141. }
  142. func (h *HPMServerHandler) UpdateCPID(authID string, CPID int64) error {
  143. pii, ok := h.processMapByAuthID[authID]
  144. if !ok {
  145. return fmt.Errorf("authID not found")
  146. }
  147. pii.CPID = CPID
  148. h.processMapByCPID[CPID] = pii
  149. return nil
  150. }
  151. func (h *HPMServerHandler) Shutdown(info *rpcore.ShutdownInfo, ctx context.Context) bool {
  152. pii, ok := h.processMapByCPID[info.CPID]
  153. if !ok {
  154. h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_WARN, "can not shutdown (CPID=%d): CPID not found.", pii.CPID)
  155. return false
  156. }
  157. if !pii.isAttached {
  158. h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_WARN, "can not shutdown (CPID=%d): not attached.", pii.CPID)
  159. return false
  160. }
  161. pii.wsCancel()
  162. if pii.CPID != 0 {
  163. delete(h.processMapByCPID, pii.CPID)
  164. }
  165. delete(h.processMapByAuthID, pii.AuthID)
  166. return true
  167. }