package rpcore import ( "context" "fmt" "github.com/GUAIK-ORG/go-snowflake/snowflake" "github.com/oklog/run" "sync" "time" ) type ChildProcManager struct { cpt *ChildProcTable snowflaker *snowflake.Snowflake mainCtx context.Context mainCancelFunc context.CancelFunc isMgrRunning bool mainRunGroup *run.Group msgHdl ChildProcManagerMsgHandler mainMsgLoop *MainMsgLoop freezed bool } func NewChildProcManager(msgHandler ChildProcManagerMsgHandler) *ChildProcManager { flake, err := snowflake.NewSnowflake(int64(0), int64(0)) if err != nil { panic(fmt.Errorf("failed init ran-proc ChildProcManager: snowflake error: %v", err)) } cpm := &ChildProcManager{ cpt: NewChildProcTable(), snowflaker: flake, mainCtx: nil, mainCancelFunc: nil, isMgrRunning: false, msgHdl: msgHandler, mainMsgLoop: nil, freezed: true, } if cpm.msgHdl == nil { cpm.msgHdl = &DefaultChildProcManagerMsgHandler{} } return cpm } func (m *ChildProcManager) CreateTask(cfg *NewTaskConfig) (cpid int64, rerr error) { cpid = m.snowflaker.NextVal() cpti := &ChildProcTableItem{ ID: cpid, Name: cfg.Name, ProcessID: 0, IsDaemon: false, Status: PROCSTAT_PendingToRun, CmdInfo: cfg.CmdInfo, Cmd: nil, LastStartTime: time.Time{}, Enabled: true, TaskDoneCallback: cfg.DoneCallback, TaskExecTimeout: cfg.ExecTimeout, ShutdownTimeout: cfg.ShutdownTimeout, ShutdownActor: cfg.ShutdownActor, Mutex: sync.Mutex{}, } err := m.createProc(cpti) if err != nil { return -1, fmt.Errorf("failed create task: %v", err) } return cpid, nil } func (m *ChildProcManager) CreateDaemon(cfg *NewDaemonConfig) (cpid int64, rerr error) { cpid = m.snowflaker.NextVal() cpti := &ChildProcTableItem{ ID: cpid, Name: cfg.Name, ProcessID: 0, IsDaemon: true, Status: PROCSTAT_ManuallyStopped, CmdInfo: cfg.CmdInfo, Cmd: nil, LastStartTime: time.Time{}, Enabled: cfg.EnableAfterCreate, TaskDoneCallback: nil, TaskExecTimeout: 0, ShutdownTimeout: cfg.ShutdownTimeout, ShutdownActor: cfg.ShutdownActor, Mutex: sync.Mutex{}, } err := m.createProc(cpti) if err != nil { return -1, fmt.Errorf("failed create daemon: %v", err) } return cpid, nil } func (m *ChildProcManager) EnableDaemonByID(CPID int64) error { if m.freezed { return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status") } inst, err := m.cpt.GetByID(CPID) if err != nil { return err } inst.Lock() inst.Enabled = true inst.Unlock() go func() { m.mainMsgLoop.AddMsg(&Msg{CPID: inst.ID}) }() return nil } func (m *ChildProcManager) DisableDaemonByID(CPID int64) error { if m.freezed { return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status") } inst, err := m.cpt.GetByID(CPID) if err != nil { return err } inst.Lock() inst.Enabled = false inst.Unlock() go func() { m.mainMsgLoop.AddMsg(&Msg{CPID: inst.ID}) }() return nil } func (m *ChildProcManager) ForceKill(CPID int64) error { inst, err := m.cpt.GetByID(CPID) if err != nil { return err } inst.Lock() err = inst.Cmd.Process.Kill() inst.Unlock() if err != nil { return err } return nil } func (m *ChildProcManager) GetStatusByID(CPID int64) (*DisplayStatusInfo, error) { inst, err := m.cpt.GetByID(CPID) if err != nil { return nil, err } return &DisplayStatusInfo{ CPID: inst.ID, Name: inst.Name, Status: inst.Status, StatusText: inst.Status.ToString(), StatusAbbr: inst.Status.ToStringAbbr(), PID: inst.ProcessID, LastStartTime: inst.LastStartTime, }, nil } func (m *ChildProcManager) ListAllStatus() ([]*DisplayStatusInfo, error) { all, err := m.cpt.ListAll() if err != nil { return nil, err } ret := make([]*DisplayStatusInfo, len(all)) for i, v := range all { ret[i] = &DisplayStatusInfo{ CPID: v.ID, Name: v.Name, Status: v.Status, StatusText: v.Status.ToString(), StatusAbbr: v.Status.ToStringAbbr(), PID: v.ProcessID, LastStartTime: v.LastStartTime, } } return ret, nil } func (m *ChildProcManager) QueryStatus(index string, args ...interface{}) ([]*DisplayStatusInfo, error) { all, err := m.cpt.DoQuery(index, args...) if err != nil { return nil, err } ret := make([]*DisplayStatusInfo, len(all)) for i, v := range all { ret[i] = &DisplayStatusInfo{ CPID: v.ID, Name: v.Name, Status: v.Status, StatusText: v.Status.ToString(), StatusAbbr: v.Status.ToStringAbbr(), PID: v.ProcessID, LastStartTime: v.LastStartTime, } } return ret, nil } func (m *ChildProcManager) createProc(cpti *ChildProcTableItem) error { if m.freezed { return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status") } err := m.cpt.Add(cpti) if err != nil { return err } go func() { m.mainMsgLoop.AddMsg(&Msg{CPID: cpti.ID}) }() return nil } func (m *ChildProcManager) Start() chan error { cherr := make(chan error, 1) if m.isMgrRunning { cherr <- fmt.Errorf("ChildProcManager is already running") } ctx, cncl := context.WithCancel(context.Background()) m.mainCtx = ctx m.mainCancelFunc = cncl m.mainMsgLoop = NewMainMsgLoop(m, ctx) m.mainRunGroup = &run.Group{} m.mainRunGroup.Add(m.mainMsgLoop.RunMsgLoop, m.mainMsgLoop.StopMainLoop) m.mainRunGroup.Add(m.mainMsgLoop.RunInspectionLoop, m.mainMsgLoop.StopInspectionLoop) go func() { m.isMgrRunning = true m.freezed = false err := m.mainRunGroup.Run() m.mainCtx = nil m.mainCancelFunc = nil m.mainMsgLoop = nil m.mainRunGroup = nil m.isMgrRunning = false cherr <- err }() return cherr } func (m *ChildProcManager) Stop() error { if m.isMgrRunning { if m.mainCancelFunc == nil { return fmt.Errorf("ChildProcManager internal error: cancel func is nil") } m.freezed = true m.mainCancelFunc() } else { return fmt.Errorf("ChildProcManager not running") } return nil } func (m *ChildProcManager) IsRunning() bool { return m.isMgrRunning }