main_msg_loop.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package rpcore
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/smallnest/chanx"
  6. "time"
  7. )
  8. const InitialQueueCapacity = 10
  9. const InspectionInterval = 10 * time.Second
  10. const RestartWaitTime = 3 * time.Second
  11. type MainMsgLoop struct {
  12. mgr *ChildProcManager
  13. msgLoopCtx context.Context
  14. msgLoopCncl context.CancelFunc
  15. msgQueue *chanx.UnboundedChan[*Msg]
  16. inspLoopCtx context.Context
  17. inspLoopCncl context.CancelFunc
  18. }
  19. func NewMainMsgLoop(p *ChildProcManager, pctx context.Context) *MainMsgLoop {
  20. mlctx, mlcncl := context.WithCancel(pctx)
  21. ilctx, ilcncl := context.WithCancel(pctx)
  22. return &MainMsgLoop{
  23. mgr: p,
  24. msgLoopCtx: mlctx,
  25. msgLoopCncl: mlcncl,
  26. msgQueue: chanx.NewUnboundedChan[*Msg](InitialQueueCapacity),
  27. inspLoopCtx: ilctx,
  28. inspLoopCncl: ilcncl,
  29. }
  30. }
  31. func (l *MainMsgLoop) AddMsg(msg *Msg) {
  32. l.msgQueue.In <- msg
  33. }
  34. func (l *MainMsgLoop) RunInspectionLoop() error {
  35. LabelInspLoop:
  36. for {
  37. select {
  38. case <-l.inspLoopCtx.Done():
  39. {
  40. break LabelInspLoop
  41. }
  42. case <-time.After(InspectionInterval):
  43. {
  44. allinst, err := l.mgr.cpt.ListAll()
  45. if err == nil {
  46. for _, v := range allinst {
  47. l.AddMsg(&Msg{CPID: v.ID})
  48. }
  49. }
  50. }
  51. }
  52. }
  53. return nil
  54. }
  55. func (l *MainMsgLoop) RunMsgLoop() error {
  56. LabelMainMsgLoop:
  57. for {
  58. select {
  59. case <-l.msgLoopCtx.Done():
  60. {
  61. break LabelMainMsgLoop
  62. }
  63. case m := <-l.msgQueue.Out:
  64. {
  65. l.doMsg(m)
  66. }
  67. }
  68. }
  69. if !l.mgr.freezed {
  70. l.mgr.freezed = true
  71. }
  72. spmlCtx, spmlCncl := context.WithCancel(context.Background())
  73. spmlDoneCh := make(chan int, 0)
  74. go l.doStopProcedureMsgLoop(spmlCtx, spmlDoneCh)
  75. LabelCheckStopProcedureOkLoop:
  76. for {
  77. allinst, err := l.mgr.cpt.ListAll()
  78. if err == nil {
  79. if len(allinst) <= 0 {
  80. break LabelCheckStopProcedureOkLoop
  81. }
  82. for _, v := range allinst {
  83. v.Enabled = false
  84. v.Status = PROCSTAT_PendingToStop
  85. go l.AddMsg(&Msg{CPID: v.ID})
  86. }
  87. } else {
  88. break LabelCheckStopProcedureOkLoop
  89. }
  90. time.Sleep(RestartWaitTime)
  91. }
  92. spmlCncl()
  93. <-spmlDoneCh
  94. return nil
  95. }
  96. func (l *MainMsgLoop) doKillProcess(inst *ChildProcTableItem) {
  97. if inst.Cmd == nil {
  98. return
  99. }
  100. p := inst.Cmd.Process
  101. if p == nil {
  102. return
  103. }
  104. err := p.Kill()
  105. if err != nil {
  106. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("error in kill process: %v", err))
  107. go func() {
  108. time.Sleep(RestartWaitTime)
  109. l.AddMsg(&Msg{CPID: inst.ID})
  110. }()
  111. } else {
  112. inst.Status = PROCSTAT_Exited
  113. go func() {
  114. l.AddMsg(&Msg{CPID: inst.ID})
  115. }()
  116. }
  117. }
  118. func (l *MainMsgLoop) doStopProcedureMsgLoop(ctx context.Context, doneCh chan int) {
  119. LabelStopProcedureMsgLoop:
  120. for {
  121. select {
  122. case <-ctx.Done():
  123. {
  124. break LabelStopProcedureMsgLoop
  125. }
  126. case m := <-l.msgQueue.Out:
  127. {
  128. l.doMsg(m)
  129. }
  130. }
  131. }
  132. doneCh <- 0
  133. }
  134. func (l *MainMsgLoop) doMsg(msg *Msg) {
  135. inst, err := l.mgr.cpt.GetByID(msg.CPID)
  136. if err != nil || inst == nil {
  137. return
  138. }
  139. defer inst.Unlock()
  140. inst.Lock()
  141. switch inst.Status {
  142. case PROCSTAT_PendingToDelete:
  143. {
  144. err = l.mgr.cpt.Delete(inst)
  145. if err != nil {
  146. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("delete: error: %s", err))
  147. } else {
  148. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("delete: ok"))
  149. }
  150. break
  151. }
  152. case PROCSTAT_Exited:
  153. {
  154. if inst.IsDaemon {
  155. if l.mgr.freezed {
  156. inst.Status = PROCSTAT_PendingToDelete
  157. go func() {
  158. l.AddMsg(&Msg{CPID: msg.CPID})
  159. }()
  160. } else {
  161. if inst.Enabled {
  162. inst.Status = PROCSTAT_PendingToRun
  163. inst.Cmd = nil
  164. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("pending to restart"))
  165. go func() {
  166. time.Sleep(RestartWaitTime)
  167. l.AddMsg(&Msg{CPID: msg.CPID})
  168. }()
  169. } else {
  170. inst.Status = PROCSTAT_ManuallyStopped
  171. }
  172. }
  173. /* if inst.Enabled {
  174. inst.Status = PROCSTAT_PendingToRun
  175. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("pending to restart"))
  176. go func() {
  177. time.Sleep(RestartWaitTime)
  178. l.AddMsg(&Msg{CPID: msg.CPID})
  179. }()
  180. } else {
  181. inst.Status = PROCSTAT_ManuallyStopped
  182. }
  183. */
  184. } else {
  185. if inst.TaskDoneCallback != nil {
  186. go inst.TaskDoneCallback(inst.ID, inst.Name)
  187. }
  188. inst.Status = PROCSTAT_PendingToDelete
  189. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("pending to delete"))
  190. go func() {
  191. l.AddMsg(&Msg{CPID: msg.CPID})
  192. }()
  193. }
  194. break
  195. }
  196. case PROCSTAT_Running:
  197. {
  198. if inst.IsDaemon {
  199. if !inst.Enabled {
  200. inst.Status = PROCSTAT_PendingToStop
  201. go func() {
  202. l.AddMsg(&Msg{CPID: msg.CPID})
  203. }()
  204. }
  205. } else {
  206. if inst.TaskExecTimeout > 0 && (!inst.LastStartTime.IsZero()) {
  207. if time.Now().After(inst.LastStartTime.Add(inst.TaskExecTimeout)) {
  208. inst.Status = PROCSTAT_PendingToStop
  209. go func() {
  210. l.AddMsg(&Msg{CPID: msg.CPID})
  211. }()
  212. }
  213. }
  214. }
  215. break
  216. }
  217. case PROCSTAT_ManuallyStopped:
  218. {
  219. if inst.IsDaemon {
  220. if inst.Enabled {
  221. inst.Status = PROCSTAT_PendingToRun
  222. go func() {
  223. l.AddMsg(&Msg{CPID: msg.CPID})
  224. }()
  225. }
  226. }
  227. break
  228. }
  229. case PROCSTAT_PendingToRun:
  230. {
  231. if inst.CmdInfo == nil {
  232. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("no command to start, pending to delete"))
  233. inst.Status = PROCSTAT_PendingToDelete
  234. go func() {
  235. l.AddMsg(&Msg{CPID: msg.CPID})
  236. }()
  237. } else {
  238. if inst.Cmd != nil {
  239. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("seemed already running, pending to stop for restarting..."))
  240. inst.Status = PROCSTAT_PendingToStop
  241. go func() {
  242. l.AddMsg(&Msg{CPID: msg.CPID})
  243. }()
  244. } else {
  245. inst.Cmd = GetExecCmdFromCmdInfo(inst.CmdInfo)
  246. err = inst.Cmd.Start()
  247. if err != nil {
  248. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("start failed: %v", err))
  249. inst.Status = PROCSTAT_Exited
  250. go func() {
  251. l.AddMsg(&Msg{CPID: msg.CPID})
  252. }()
  253. } else {
  254. inst.Status = PROCSTAT_Running
  255. inst.LastStartTime = time.Now()
  256. inst.ProcessID = inst.Cmd.Process.Pid
  257. go func() {
  258. l.mgr.msgHdl.ProcessStarted(inst.ID)
  259. err = inst.Cmd.Wait()
  260. inst.Status = PROCSTAT_Exited
  261. l.mgr.msgHdl.ProcessQuit(inst.ID, err)
  262. l.AddMsg(&Msg{CPID: msg.CPID})
  263. }()
  264. }
  265. }
  266. }
  267. break
  268. }
  269. case PROCSTAT_PendingToStop:
  270. {
  271. if inst.Cmd == nil {
  272. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("no command to stop"))
  273. inst.Status = PROCSTAT_Exited
  274. go func() {
  275. l.AddMsg(&Msg{CPID: msg.CPID})
  276. }()
  277. } else {
  278. p := inst.Cmd.Process
  279. if p == nil {
  280. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprintf("no command to stop"))
  281. inst.Status = PROCSTAT_Exited
  282. go func() {
  283. l.AddMsg(&Msg{CPID: msg.CPID})
  284. }()
  285. } else {
  286. if inst.ShutdownActor == nil {
  287. l.doKillProcess(inst)
  288. } else {
  289. gsactx, gsacncl := context.WithCancel(context.Background())
  290. donech := make(chan int, 0)
  291. go func() {
  292. ok := inst.ShutdownActor.Shutdown(&ShutdownInfo{
  293. CPID: inst.ID,
  294. Name: inst.Name,
  295. PID: inst.ProcessID,
  296. Cmd: inst.Cmd,
  297. LogEmitFunc: func(text string) {
  298. l.mgr.msgHdl.MsgLoopVerboseLog(inst.ID, fmt.Sprint("graceful shutdown actor: ", text))
  299. },
  300. }, gsactx)
  301. if !ok {
  302. l.doKillProcess(inst)
  303. }
  304. close(donech)
  305. }()
  306. if inst.ShutdownTimeout > 0 {
  307. go func() {
  308. select {
  309. case <-donech:
  310. return
  311. case <-time.After(inst.TaskExecTimeout):
  312. gsacncl()
  313. l.doKillProcess(inst)
  314. return
  315. }
  316. }()
  317. }
  318. }
  319. }
  320. }
  321. break
  322. }
  323. }
  324. }
  325. func (l *MainMsgLoop) StopMainLoop(err error) {
  326. if l.msgLoopCncl != nil {
  327. l.msgLoopCncl()
  328. }
  329. }
  330. func (l *MainMsgLoop) StopInspectionLoop(err error) {
  331. if l.inspLoopCncl != nil {
  332. l.inspLoopCncl()
  333. }
  334. }