cpmgr.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package rpcore
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/GUAIK-ORG/go-snowflake/snowflake"
  6. "github.com/oklog/run"
  7. "sync"
  8. "time"
  9. )
  10. type ChildProcManager struct {
  11. cpt *ChildProcTable
  12. snowflaker *snowflake.Snowflake
  13. mainCtx context.Context
  14. mainCancelFunc context.CancelFunc
  15. isMgrRunning bool
  16. mainRunGroup *run.Group
  17. msgHdl ChildProcManagerMsgHandler
  18. mainMsgLoop *MainMsgLoop
  19. freezed bool
  20. }
  21. func NewChildProcManager(msgHandler ChildProcManagerMsgHandler) *ChildProcManager {
  22. flake, err := snowflake.NewSnowflake(int64(0), int64(0))
  23. if err != nil {
  24. panic(fmt.Errorf("failed init ran-proc ChildProcManager: snowflake error: %v", err))
  25. }
  26. cpm := &ChildProcManager{
  27. cpt: NewChildProcTable(),
  28. snowflaker: flake,
  29. mainCtx: nil,
  30. mainCancelFunc: nil,
  31. isMgrRunning: false,
  32. msgHdl: msgHandler,
  33. mainMsgLoop: nil,
  34. freezed: true,
  35. }
  36. if cpm.msgHdl == nil {
  37. cpm.msgHdl = &DefaultChildProcManagerMsgHandler{}
  38. }
  39. return cpm
  40. }
  41. func (m *ChildProcManager) CreateTask(cfg *NewTaskConfig) (cpid int64, rerr error) {
  42. cpid = m.snowflaker.NextVal()
  43. cpti := &ChildProcTableItem{
  44. ID: cpid,
  45. Name: cfg.Name,
  46. ProcessID: 0,
  47. IsDaemon: false,
  48. Status: PROCSTAT_PendingToRun,
  49. CmdInfo: cfg.CmdInfo,
  50. Cmd: nil,
  51. LastStartTime: time.Time{},
  52. Enabled: true,
  53. TaskDoneCallback: cfg.DoneCallback,
  54. TaskExecTimeout: cfg.ExecTimeout,
  55. ShutdownTimeout: cfg.ShutdownTimeout,
  56. ShutdownActor: cfg.ShutdownActor,
  57. Mutex: sync.Mutex{},
  58. }
  59. err := m.createProc(cpti)
  60. if err != nil {
  61. return -1, fmt.Errorf("failed create task: %v", err)
  62. }
  63. return cpid, nil
  64. }
  65. func (m *ChildProcManager) CreateDaemon(cfg *NewDaemonConfig) (cpid int64, rerr error) {
  66. cpid = m.snowflaker.NextVal()
  67. cpti := &ChildProcTableItem{
  68. ID: cpid,
  69. Name: cfg.Name,
  70. ProcessID: 0,
  71. IsDaemon: true,
  72. Status: PROCSTAT_ManuallyStopped,
  73. CmdInfo: cfg.CmdInfo,
  74. Cmd: nil,
  75. LastStartTime: time.Time{},
  76. Enabled: cfg.EnableAfterCreate,
  77. TaskDoneCallback: nil,
  78. TaskExecTimeout: 0,
  79. ShutdownTimeout: cfg.ShutdownTimeout,
  80. ShutdownActor: cfg.ShutdownActor,
  81. Mutex: sync.Mutex{},
  82. }
  83. err := m.createProc(cpti)
  84. if err != nil {
  85. return -1, fmt.Errorf("failed create daemon: %v", err)
  86. }
  87. return cpid, nil
  88. }
  89. func (m *ChildProcManager) EnableDaemonByID(CPID int64) error {
  90. if m.freezed {
  91. return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
  92. }
  93. inst, err := m.cpt.GetByID(CPID)
  94. if err != nil {
  95. return err
  96. }
  97. inst.Lock()
  98. inst.Enabled = true
  99. inst.Unlock()
  100. go func() {
  101. m.mainMsgLoop.AddMsg(&Msg{CPID: inst.ID})
  102. }()
  103. return nil
  104. }
  105. func (m *ChildProcManager) DisableDaemonByID(CPID int64) error {
  106. if m.freezed {
  107. return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
  108. }
  109. inst, err := m.cpt.GetByID(CPID)
  110. if err != nil {
  111. return err
  112. }
  113. inst.Lock()
  114. inst.Enabled = false
  115. inst.Unlock()
  116. go func() {
  117. m.mainMsgLoop.AddMsg(&Msg{CPID: inst.ID})
  118. }()
  119. return nil
  120. }
  121. func (m *ChildProcManager) ForceKill(CPID int64) error {
  122. inst, err := m.cpt.GetByID(CPID)
  123. if err != nil {
  124. return err
  125. }
  126. inst.Lock()
  127. err = inst.Cmd.Process.Kill()
  128. inst.Unlock()
  129. if err != nil {
  130. return err
  131. }
  132. return nil
  133. }
  134. func (m *ChildProcManager) GetStatusByID(CPID int64) (*DisplayStatusInfo, error) {
  135. inst, err := m.cpt.GetByID(CPID)
  136. if err != nil {
  137. return nil, err
  138. }
  139. return &DisplayStatusInfo{
  140. CPID: inst.ID,
  141. Name: inst.Name,
  142. Status: inst.Status,
  143. StatusText: inst.Status.ToString(),
  144. StatusAbbr: inst.Status.ToStringAbbr(),
  145. PID: inst.ProcessID,
  146. LastStartTime: inst.LastStartTime,
  147. }, nil
  148. }
  149. func (m *ChildProcManager) ListAllStatus() ([]*DisplayStatusInfo, error) {
  150. all, err := m.cpt.ListAll()
  151. if err != nil {
  152. return nil, err
  153. }
  154. ret := make([]*DisplayStatusInfo, len(all))
  155. for i, v := range all {
  156. ret[i] = &DisplayStatusInfo{
  157. CPID: v.ID,
  158. Name: v.Name,
  159. Status: v.Status,
  160. StatusText: v.Status.ToString(),
  161. StatusAbbr: v.Status.ToStringAbbr(),
  162. PID: v.ProcessID,
  163. LastStartTime: v.LastStartTime,
  164. }
  165. }
  166. return ret, nil
  167. }
  168. func (m *ChildProcManager) QueryStatus(index string, args ...interface{}) ([]*DisplayStatusInfo, error) {
  169. all, err := m.cpt.DoQuery(index, args...)
  170. if err != nil {
  171. return nil, err
  172. }
  173. ret := make([]*DisplayStatusInfo, len(all))
  174. for i, v := range all {
  175. ret[i] = &DisplayStatusInfo{
  176. CPID: v.ID,
  177. Name: v.Name,
  178. Status: v.Status,
  179. StatusText: v.Status.ToString(),
  180. StatusAbbr: v.Status.ToStringAbbr(),
  181. PID: v.ProcessID,
  182. LastStartTime: v.LastStartTime,
  183. }
  184. }
  185. return ret, nil
  186. }
  187. func (m *ChildProcManager) createProc(cpti *ChildProcTableItem) error {
  188. if m.freezed {
  189. return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
  190. }
  191. err := m.cpt.Add(cpti)
  192. if err != nil {
  193. return err
  194. }
  195. go func() {
  196. m.mainMsgLoop.AddMsg(&Msg{CPID: cpti.ID})
  197. }()
  198. return nil
  199. }
  200. func (m *ChildProcManager) Start() chan error {
  201. cherr := make(chan error, 1)
  202. if m.isMgrRunning {
  203. cherr <- fmt.Errorf("ChildProcManager is already running")
  204. }
  205. ctx, cncl := context.WithCancel(context.Background())
  206. m.mainCtx = ctx
  207. m.mainCancelFunc = cncl
  208. m.mainMsgLoop = NewMainMsgLoop(m, ctx)
  209. m.mainRunGroup = &run.Group{}
  210. m.mainRunGroup.Add(m.mainMsgLoop.RunMsgLoop, m.mainMsgLoop.StopMainLoop)
  211. m.mainRunGroup.Add(m.mainMsgLoop.RunInspectionLoop, m.mainMsgLoop.StopInspectionLoop)
  212. go func() {
  213. m.isMgrRunning = true
  214. m.freezed = false
  215. err := m.mainRunGroup.Run()
  216. m.mainCtx = nil
  217. m.mainCancelFunc = nil
  218. m.mainMsgLoop = nil
  219. m.mainRunGroup = nil
  220. m.isMgrRunning = false
  221. cherr <- err
  222. }()
  223. return cherr
  224. }
  225. func (m *ChildProcManager) Stop() error {
  226. if m.isMgrRunning {
  227. if m.mainCancelFunc == nil {
  228. return fmt.Errorf("ChildProcManager internal error: cancel func is nil")
  229. }
  230. m.freezed = true
  231. m.mainCancelFunc()
  232. } else {
  233. return fmt.Errorf("ChildProcManager not running")
  234. }
  235. return nil
  236. }
  237. func (m *ChildProcManager) IsRunning() bool {
  238. return m.isMgrRunning
  239. }