123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- package rpcore
- import (
- "context"
- "fmt"
- "github.com/smallnest/chanx"
- "time"
- )
- const InitialQueueCapacity = 10
- const InspectionInterval = 10 * time.Second
- const RestartWaitTime = 3 * time.Second
- type MainMsgLoop struct {
- mgr *ChildProcManager
- msgLoopCtx context.Context
- msgLoopCncl context.CancelFunc
- msgQueue *chanx.UnboundedChan[*Msg]
- inspLoopCtx context.Context
- inspLoopCncl context.CancelFunc
- }
- func NewMainMsgLoop(p *ChildProcManager, pctx context.Context) *MainMsgLoop {
- mlctx, mlcncl := context.WithCancel(pctx)
- ilctx, ilcncl := context.WithCancel(pctx)
- return &MainMsgLoop{
- mgr: p,
- msgLoopCtx: mlctx,
- msgLoopCncl: mlcncl,
- msgQueue: chanx.NewUnboundedChan[*Msg](InitialQueueCapacity),
- inspLoopCtx: ilctx,
- inspLoopCncl: ilcncl,
- }
- }
- func (l *MainMsgLoop) AddMsg(msg *Msg) {
- l.msgQueue.In <- msg
- }
- func (l *MainMsgLoop) RunInspectionLoop() error {
- LabelInspLoop:
- for {
- select {
- case <-l.inspLoopCtx.Done():
- {
- break LabelInspLoop
- }
- case <-time.After(InspectionInterval):
- {
- allinst, err := l.mgr.cpt.ListAll()
- if err == nil {
- for _, v := range allinst {
- l.AddMsg(&Msg{CPID: v.ID})
- }
- }
- }
- }
- }
- return nil
- }
- func (l *MainMsgLoop) RunMsgLoop() error {
- LabelMainMsgLoop:
- for {
- select {
- case <-l.msgLoopCtx.Done():
- {
- break LabelMainMsgLoop
- }
- case m := <-l.msgQueue.Out:
- {
- l.doMsg(m)
- }
- }
- }
- if !l.mgr.freezed {
- l.mgr.freezed = true
- }
- spmlCtx, spmlCncl := context.WithCancel(context.Background())
- spmlDoneCh := make(chan int, 0)
- go l.doStopProcedureMsgLoop(spmlCtx, spmlDoneCh)
- LabelCheckStopProcedureOkLoop:
- for {
- allinst, err := l.mgr.cpt.ListAll()
- if err == nil {
- if len(allinst) <= 0 {
- break LabelCheckStopProcedureOkLoop
- }
- for _, v := range allinst {
- v.Enabled = false
- v.Status = PROCSTAT_PendingToStop
- go l.AddMsg(&Msg{CPID: v.ID})
- }
- } else {
- break LabelCheckStopProcedureOkLoop
- }
- time.Sleep(RestartWaitTime)
- }
- spmlCncl()
- <-spmlDoneCh
- return nil
- }
- func (l *MainMsgLoop) doKillProcess(inst *ChildProcTableItem) {
- if inst.Cmd == nil {
- return
- }
- p := inst.Cmd.Process
- if p == nil {
- return
- }
- err := p.Kill()
- if err != nil {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("error in kill process: %v", err))
- go func() {
- time.Sleep(RestartWaitTime)
- l.AddMsg(&Msg{CPID: inst.ID})
- }()
- } else {
- inst.Status = PROCSTAT_Exited
- go func() {
- l.AddMsg(&Msg{CPID: inst.ID})
- }()
- }
- }
- func (l *MainMsgLoop) doStopProcedureMsgLoop(ctx context.Context, doneCh chan int) {
- LabelStopProcedureMsgLoop:
- for {
- select {
- case <-ctx.Done():
- {
- break LabelStopProcedureMsgLoop
- }
- case m := <-l.msgQueue.Out:
- {
- l.doMsg(m)
- }
- }
- }
- doneCh <- 0
- }
- func (l *MainMsgLoop) doMsg(msg *Msg) {
- inst, err := l.mgr.cpt.GetByID(msg.CPID)
- if err != nil || inst == nil {
- return
- }
- defer inst.Unlock()
- inst.Lock()
- switch inst.Status {
- case PROCSTAT_PendingToDelete:
- {
- err = l.mgr.cpt.Delete(inst)
- if err != nil {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("delete: error: %s", err))
- } else {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("delete: ok"))
- }
- break
- }
- case PROCSTAT_Exited:
- {
- if inst.IsDaemon {
- if l.mgr.freezed {
- inst.Status = PROCSTAT_PendingToDelete
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- } else {
- if inst.Enabled {
- inst.Status = PROCSTAT_PendingToRun
- inst.Cmd = nil
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("pending to restart"))
- go func() {
- time.Sleep(RestartWaitTime)
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- } else {
- inst.Status = PROCSTAT_ManuallyStopped
- }
- }
- /* if inst.Enabled {
- inst.Status = PROCSTAT_PendingToRun
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("pending to restart"))
- go func() {
- time.Sleep(RestartWaitTime)
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- } else {
- inst.Status = PROCSTAT_ManuallyStopped
- }
- */
- } else {
- if inst.TaskDoneCallback != nil {
- go inst.TaskDoneCallback(inst.ID, inst.Name)
- }
- inst.Status = PROCSTAT_PendingToDelete
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("pending to delete"))
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- }
- break
- }
- case PROCSTAT_Running:
- {
- if inst.IsDaemon {
- if !inst.Enabled {
- inst.Status = PROCSTAT_PendingToStop
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- }
- } else {
- if inst.TaskExecTimeout > 0 && (!inst.LastStartTime.IsZero()) {
- if time.Now().After(inst.LastStartTime.Add(inst.TaskExecTimeout)) {
- inst.Status = PROCSTAT_PendingToStop
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- }
- }
- }
- break
- }
- case PROCSTAT_ManuallyStopped:
- {
- if inst.IsDaemon {
- if inst.Enabled {
- inst.Status = PROCSTAT_PendingToRun
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- }
- }
- break
- }
- case PROCSTAT_PendingToRun:
- {
- if inst.CmdInfo == nil {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("no command to start, pending to delete"))
- inst.Status = PROCSTAT_PendingToDelete
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- } else {
- if inst.Cmd != nil {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("seemed already running, pending to stop for restarting..."))
- inst.Status = PROCSTAT_PendingToStop
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- } else {
- inst.Cmd = GetExecCmdFromCmdInfo(inst.CmdInfo)
- err = inst.Cmd.Start()
- if err != nil {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("start failed: %v", err))
- inst.Status = PROCSTAT_Exited
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- } else {
- inst.Status = PROCSTAT_Running
- inst.LastStartTime = time.Now()
- inst.ProcessID = inst.Cmd.Process.Pid
- go func() {
- l.mgr.msgHdl.ProcessStarted(inst.ID)
- err = inst.Cmd.Wait()
- inst.Status = PROCSTAT_Exited
- l.mgr.msgHdl.ProcessQuit(inst.ID, err)
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- }
- }
- }
- break
- }
- case PROCSTAT_PendingToStop:
- {
- if inst.Cmd == nil {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("no command to stop"))
- inst.Status = PROCSTAT_Exited
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- } else {
- p := inst.Cmd.Process
- if p == nil {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("no command to stop"))
- inst.Status = PROCSTAT_Exited
- go func() {
- l.AddMsg(&Msg{CPID: msg.CPID})
- }()
- } else {
- if inst.ShutdownActor == nil {
- l.doKillProcess(inst)
- } else {
- gsactx, gsacncl := context.WithCancel(context.Background())
- donech := make(chan int, 0)
- go func() {
- ok := inst.ShutdownActor.Shutdown(&ShutdownInfo{
- CPID: inst.ID,
- Name: inst.Name,
- PID: inst.ProcessID,
- Cmd: inst.Cmd,
- LogEmitFunc: func(text string) {
- l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprint("graceful shutdown actor: ", text))
- },
- }, gsactx)
- if !ok {
- l.doKillProcess(inst)
- }
- close(donech)
- }()
- if inst.ShutdownTimeout > 0 {
- go func() {
- select {
- case <-donech:
- return
- case <-time.After(inst.TaskExecTimeout):
- gsacncl()
- l.doKillProcess(inst)
- return
- }
- }()
- }
- }
- }
- }
- break
- }
- }
- }
- func (l *MainMsgLoop) StopMainLoop(err error) {
- if l.msgLoopCncl != nil {
- l.msgLoopCncl()
- }
- }
- func (l *MainMsgLoop) StopInspectionLoop(err error) {
- if l.inspLoopCncl != nil {
- l.inspLoopCncl()
- }
- }
|