123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- package httppm
- import (
- "context"
- "fmt"
- "git.swzry.com/zry/GoHiedaLogger/hiedalog"
- "git.swzry.com/zry/ran-proc/rpcore"
- "github.com/gin-gonic/gin"
- "github.com/gorilla/websocket"
- "net/http"
- "time"
- )
- type ServerHandlerConfig struct {
- GinGrp *gin.RouterGroup
- BaseURL string
- TimeoutStopFunc StopFunc
- Logger *hiedalog.HiedaLogger
- LogModuleName string
- WebsocketPingDuration time.Duration
- }
- func NewServerHandler(cfg *ServerHandlerConfig) *HPMServerHandler {
- wsup := &websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- EnableCompression: false,
- }
- h := &HPMServerHandler{
- ginGrp: cfg.GinGrp,
- baseURL: cfg.BaseURL,
- processMapByAuthID: map[string]*ProcessInfoItem{},
- processMapByCPID: map[int64]*ProcessInfoItem{},
- stopFunc: cfg.TimeoutStopFunc,
- logger: cfg.Logger,
- logModuleName: cfg.LogModuleName,
- upgrader: wsup,
- wsPingDur: cfg.WebsocketPingDuration,
- }
- h.ginGrp.GET("/attach.nagae", h.whAttach)
- return h
- }
- type StopFunc func(cpid int64)
- type NewProcessConfig struct {
- AuthID string
- ConnectTimeout time.Duration
- }
- type ProcessInfoItem struct {
- CPID int64
- AuthID string
- chAttached chan int
- isAttached bool
- wsCancel context.CancelFunc
- }
- type HPMServerHandler struct {
- ginGrp *gin.RouterGroup
- baseURL string
- processMapByAuthID map[string]*ProcessInfoItem
- processMapByCPID map[int64]*ProcessInfoItem
- stopFunc StopFunc
- logger *hiedalog.HiedaLogger
- logModuleName string
- upgrader *websocket.Upgrader
- wsPingDur time.Duration
- }
- func (h *HPMServerHandler) whAttach(ctx *gin.Context) {
- authID := ctx.Query("authid")
- pii, ok := h.processMapByAuthID[authID]
- if !ok {
- h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_VERBOSE, "attach authID not found: authID=%d", authID)
- ctx.JSON(404, &gin.H{
- "status": "auth-id-not-found",
- "err_msg": "authID not found",
- })
- return
- }
- wsconn, err := h.upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
- if err != nil {
- h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_VERBOSE, "failed upgrading to websocket (cpid=%d): %v", pii.CPID, err)
- ctx.JSON(500, gin.H{
- "err_msg": "failed upgrading to websocket",
- "status": "ws-upgrade-error",
- })
- return
- }
- defer wsconn.Close()
- ticker := time.NewTicker(h.wsPingDur)
- defer ticker.Stop()
- pii.chAttached <- 1
- wsctx, wscncl := context.WithCancel(context.Background())
- pii.wsCancel = wscncl
- pii.isAttached = true
- go func() {
- for {
- _, _, rerr := wsconn.ReadMessage()
- if rerr != nil {
- h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_VERBOSE, "websocket read failed (cpid=%d): %v", pii.CPID, rerr)
- _ = wsconn.Close()
- wscncl()
- return
- }
- }
- }()
- for {
- select {
- case <-wsctx.Done():
- _ = wsconn.Close()
- return
- case <-ticker.C:
- xerr := wsconn.WriteMessage(websocket.PingMessage, nil)
- if xerr != nil {
- _ = wsconn.Close()
- return
- }
- }
- }
- }
- func (h *HPMServerHandler) NewProcess(npcfg *NewProcessConfig) (string, error) {
- pii := &ProcessInfoItem{
- CPID: 0,
- AuthID: npcfg.AuthID,
- chAttached: make(chan int),
- isAttached: false,
- }
- h.processMapByAuthID[npcfg.AuthID] = pii
- uri := fmt.Sprintf("%s/attach.nagae?authid=%s", h.baseURL, npcfg.AuthID)
- go func() {
- select {
- case <-pii.chAttached:
- h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_INFO, "HPM client attached (CPID=%d).", pii.CPID)
- return
- case <-time.After(npcfg.ConnectTimeout):
- h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_WARN, "HPM client attach timeout (CPID=%d).", pii.CPID)
- if pii.CPID != 0 {
- h.stopFunc(pii.CPID)
- delete(h.processMapByCPID, pii.CPID)
- }
- delete(h.processMapByAuthID, npcfg.AuthID)
- return
- }
- }()
- return uri, nil
- }
- func (h *HPMServerHandler) UpdateCPID(authID string, CPID int64) error {
- pii, ok := h.processMapByAuthID[authID]
- if !ok {
- return fmt.Errorf("authID not found")
- }
- pii.CPID = CPID
- h.processMapByCPID[CPID] = pii
- return nil
- }
- func (h *HPMServerHandler) Shutdown(info *rpcore.ShutdownInfo, ctx context.Context) bool {
- pii, ok := h.processMapByCPID[info.CPID]
- if !ok {
- h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_WARN, "can not shutdown (CPID=%d): CPID not found.", pii.CPID)
- return false
- }
- if !pii.isAttached {
- h.logger.LogPrintf(h.logModuleName, hiedalog.DLN_WARN, "can not shutdown (CPID=%d): not attached.", pii.CPID)
- return false
- }
- pii.wsCancel()
- if pii.CPID != 0 {
- delete(h.processMapByCPID, pii.CPID)
- }
- delete(h.processMapByAuthID, pii.AuthID)
- return true
- }
|