cpmgr.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package rpcore
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/GUAIK-ORG/go-snowflake/snowflake"
  6. "github.com/oklog/run"
  7. "sync"
  8. "time"
  9. )
  10. type ChildProcManager struct {
  11. cpt *ChildProcTable
  12. snowflaker *snowflake.Snowflake
  13. mainCtx context.Context
  14. mainCancelFunc context.CancelFunc
  15. isMgrRunning bool
  16. mainRunGroup *run.Group
  17. msgHdl ChildProcManagerMsgHandler
  18. mainMsgLoop *MainMsgLoop
  19. freezed bool
  20. }
  21. func NewChildProcManager(msgHandler ChildProcManagerMsgHandler) *ChildProcManager {
  22. flake, err := snowflake.NewSnowflake(int64(0), int64(0))
  23. if err != nil {
  24. panic(fmt.Errorf("failed init ran-proc ChildProcManager: snowflake error: %v", err))
  25. }
  26. cpm := &ChildProcManager{
  27. cpt: NewChildProcTable(),
  28. snowflaker: flake,
  29. mainCtx: nil,
  30. mainCancelFunc: nil,
  31. isMgrRunning: false,
  32. msgHdl: msgHandler,
  33. mainMsgLoop: nil,
  34. freezed: true,
  35. }
  36. if cpm.msgHdl == nil {
  37. cpm.msgHdl = &DefaultChildProcManagerMsgHandler{}
  38. }
  39. return cpm
  40. }
  41. func (m *ChildProcManager) CreateTask(cfg *NewTaskConfig) (cpid int64, rerr error) {
  42. cpid = m.snowflaker.NextVal()
  43. cpti := &ChildProcTableItem{
  44. ID: cpid,
  45. Name: cfg.Name,
  46. ProcessID: 0,
  47. IsDaemon: false,
  48. Status: PROCSTAT_PendingToRun,
  49. CmdInfo: cfg.CmdInfo,
  50. Cmd: nil,
  51. LastStartTime: time.Time{},
  52. Enabled: true,
  53. TaskDoneCallback: cfg.DoneCallback,
  54. TaskExecTimeout: cfg.ExecTimeout,
  55. ShutdownTimeout: cfg.ShutdownTimeout,
  56. ShutdownActor: cfg.ShutdownActor,
  57. Mutex: sync.Mutex{},
  58. }
  59. err := m.createProc(cpti)
  60. if err != nil {
  61. return -1, fmt.Errorf("failed create task: %v", err)
  62. }
  63. return cpid, nil
  64. }
  65. func (m *ChildProcManager) CreateDaemon(cfg *NewDaemonConfig) (cpid int64, rerr error) {
  66. cpid = m.snowflaker.NextVal()
  67. cpti := &ChildProcTableItem{
  68. ID: cpid,
  69. Name: cfg.Name,
  70. ProcessID: 0,
  71. IsDaemon: true,
  72. Status: PROCSTAT_ManuallyStopped,
  73. CmdInfo: cfg.CmdInfo,
  74. Cmd: nil,
  75. LastStartTime: time.Time{},
  76. Enabled: cfg.EnableAfterCreate,
  77. TaskDoneCallback: nil,
  78. TaskExecTimeout: 0,
  79. ShutdownTimeout: cfg.ShutdownTimeout,
  80. ShutdownActor: cfg.ShutdownActor,
  81. Mutex: sync.Mutex{},
  82. }
  83. err := m.createProc(cpti)
  84. if err != nil {
  85. return -1, fmt.Errorf("failed create daemon: %v", err)
  86. }
  87. return cpid, nil
  88. }
  89. func (m *ChildProcManager) EnableDaemonByID(CPID int64) error {
  90. if m.freezed {
  91. return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
  92. }
  93. inst, err := m.cpt.GetByID(CPID)
  94. if err != nil {
  95. return err
  96. }
  97. inst.Lock()
  98. inst.Enabled = true
  99. inst.Unlock()
  100. go func() {
  101. m.mainMsgLoop.AddMsg(&Msg{CPID: inst.ID})
  102. }()
  103. return nil
  104. }
  105. func (m *ChildProcManager) DisableDaemonByID(CPID int64) error {
  106. if m.freezed {
  107. return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
  108. }
  109. inst, err := m.cpt.GetByID(CPID)
  110. if err != nil {
  111. return err
  112. }
  113. inst.Lock()
  114. inst.Enabled = false
  115. inst.Unlock()
  116. go func() {
  117. m.mainMsgLoop.AddMsg(&Msg{CPID: inst.ID})
  118. }()
  119. return nil
  120. }
  121. func (m *ChildProcManager) ForceKill(CPID int64) error {
  122. inst, err := m.cpt.GetByID(CPID)
  123. if err != nil {
  124. return err
  125. }
  126. if inst == nil {
  127. return fmt.Errorf("can not force kill process (CPID=%16X): instance is nil", CPID)
  128. }
  129. inst.Lock()
  130. defer inst.Unlock()
  131. if inst.Cmd == nil {
  132. return fmt.Errorf("can not force kill process (CPID=%16X): Cmd is nil", CPID)
  133. }
  134. if inst.Cmd.Process == nil {
  135. return fmt.Errorf("can not force kill process (CPID=%16X): Cmd.Process is nil", CPID)
  136. }
  137. err = inst.Cmd.Process.Kill()
  138. if err != nil {
  139. return err
  140. }
  141. return nil
  142. }
  143. func (m *ChildProcManager) GetStatusByID(CPID int64) (*DisplayStatusInfo, error) {
  144. inst, err := m.cpt.GetByID(CPID)
  145. if err != nil {
  146. return nil, err
  147. }
  148. return &DisplayStatusInfo{
  149. CPID: inst.ID,
  150. Name: inst.Name,
  151. Status: inst.Status,
  152. StatusText: inst.Status.ToString(),
  153. StatusAbbr: inst.Status.ToStringAbbr(),
  154. PID: inst.ProcessID,
  155. LastStartTime: inst.LastStartTime,
  156. }, nil
  157. }
  158. func (m *ChildProcManager) ListAllStatus() ([]*DisplayStatusInfo, error) {
  159. all, err := m.cpt.ListAll()
  160. if err != nil {
  161. return nil, err
  162. }
  163. ret := make([]*DisplayStatusInfo, len(all))
  164. for i, v := range all {
  165. ret[i] = &DisplayStatusInfo{
  166. CPID: v.ID,
  167. Name: v.Name,
  168. Status: v.Status,
  169. StatusText: v.Status.ToString(),
  170. StatusAbbr: v.Status.ToStringAbbr(),
  171. PID: v.ProcessID,
  172. LastStartTime: v.LastStartTime,
  173. }
  174. }
  175. return ret, nil
  176. }
  177. func (m *ChildProcManager) QueryStatus(index string, args ...interface{}) ([]*DisplayStatusInfo, error) {
  178. all, err := m.cpt.DoQuery(index, args...)
  179. if err != nil {
  180. return nil, err
  181. }
  182. ret := make([]*DisplayStatusInfo, len(all))
  183. for i, v := range all {
  184. ret[i] = &DisplayStatusInfo{
  185. CPID: v.ID,
  186. Name: v.Name,
  187. Status: v.Status,
  188. StatusText: v.Status.ToString(),
  189. StatusAbbr: v.Status.ToStringAbbr(),
  190. PID: v.ProcessID,
  191. LastStartTime: v.LastStartTime,
  192. }
  193. }
  194. return ret, nil
  195. }
  196. func (m *ChildProcManager) createProc(cpti *ChildProcTableItem) error {
  197. if m.freezed {
  198. return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
  199. }
  200. err := m.cpt.Add(cpti)
  201. if err != nil {
  202. return err
  203. }
  204. go func() {
  205. m.mainMsgLoop.AddMsg(&Msg{CPID: cpti.ID})
  206. }()
  207. return nil
  208. }
  209. func (m *ChildProcManager) Start() chan error {
  210. cherr := make(chan error, 1)
  211. if m.isMgrRunning {
  212. cherr <- fmt.Errorf("ChildProcManager is already running")
  213. }
  214. ctx, cncl := context.WithCancel(context.Background())
  215. m.mainCtx = ctx
  216. m.mainCancelFunc = cncl
  217. m.mainMsgLoop = NewMainMsgLoop(m, ctx)
  218. m.mainRunGroup = &run.Group{}
  219. m.mainRunGroup.Add(m.mainMsgLoop.RunMsgLoop, m.mainMsgLoop.StopMainLoop)
  220. m.mainRunGroup.Add(m.mainMsgLoop.RunInspectionLoop, m.mainMsgLoop.StopInspectionLoop)
  221. go func() {
  222. m.isMgrRunning = true
  223. m.freezed = false
  224. err := m.mainRunGroup.Run()
  225. m.mainCtx = nil
  226. m.mainCancelFunc = nil
  227. m.mainMsgLoop = nil
  228. m.mainRunGroup = nil
  229. m.isMgrRunning = false
  230. cherr <- err
  231. }()
  232. return cherr
  233. }
  234. func (m *ChildProcManager) Stop() error {
  235. if m.isMgrRunning {
  236. if m.mainCancelFunc == nil {
  237. return fmt.Errorf("ChildProcManager internal error: cancel func is nil")
  238. }
  239. m.freezed = true
  240. m.mainCancelFunc()
  241. } else {
  242. return fmt.Errorf("ChildProcManager not running")
  243. }
  244. return nil
  245. }
  246. func (m *ChildProcManager) IsRunning() bool {
  247. return m.isMgrRunning
  248. }