123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- package rpcore
- import (
- "context"
- "fmt"
- "github.com/GUAIK-ORG/go-snowflake/snowflake"
- "github.com/oklog/run"
- "sync"
- "time"
- )
- type ChildProcManager struct {
- cpt *ChildProcTable
- snowflaker *snowflake.Snowflake
- mainCtx context.Context
- mainCancelFunc context.CancelFunc
- isMgrRunning bool
- mainRunGroup *run.Group
- msgHdl ChildProcManagerMsgHandler
- mainMsgLoop *MainMsgLoop
- freezed bool
- }
- func NewChildProcManager(msgHandler ChildProcManagerMsgHandler) *ChildProcManager {
- flake, err := snowflake.NewSnowflake(int64(0), int64(0))
- if err != nil {
- panic(fmt.Errorf("failed init ran-proc ChildProcManager: snowflake error: %v", err))
- }
- cpm := &ChildProcManager{
- cpt: NewChildProcTable(),
- snowflaker: flake,
- mainCtx: nil,
- mainCancelFunc: nil,
- isMgrRunning: false,
- msgHdl: msgHandler,
- mainMsgLoop: nil,
- freezed: true,
- }
- if cpm.msgHdl == nil {
- cpm.msgHdl = &DefaultChildProcManagerMsgHandler{}
- }
- return cpm
- }
- func (m *ChildProcManager) CreateTask(cfg *NewTaskConfig) (cpid int64, rerr error) {
- cpid = m.snowflaker.NextVal()
- cpti := &ChildProcTableItem{
- ID: cpid,
- Name: cfg.Name,
- ProcessID: 0,
- IsDaemon: false,
- Status: PROCSTAT_PendingToRun,
- CmdInfo: cfg.CmdInfo,
- Cmd: nil,
- LastStartTime: time.Time{},
- Enabled: true,
- TaskDoneCallback: cfg.DoneCallback,
- TaskExecTimeout: cfg.ExecTimeout,
- ShutdownTimeout: cfg.ShutdownTimeout,
- ShutdownActor: cfg.ShutdownActor,
- Mutex: sync.Mutex{},
- }
- err := m.createProc(cpti)
- if err != nil {
- return -1, fmt.Errorf("failed create task: %v", err)
- }
- return cpid, nil
- }
- func (m *ChildProcManager) CreateDaemon(cfg *NewDaemonConfig) (cpid int64, rerr error) {
- cpid = m.snowflaker.NextVal()
- cpti := &ChildProcTableItem{
- ID: cpid,
- Name: cfg.Name,
- ProcessID: 0,
- IsDaemon: true,
- Status: PROCSTAT_ManuallyStopped,
- CmdInfo: cfg.CmdInfo,
- Cmd: nil,
- LastStartTime: time.Time{},
- Enabled: cfg.EnableAfterCreate,
- TaskDoneCallback: nil,
- TaskExecTimeout: 0,
- ShutdownTimeout: cfg.ShutdownTimeout,
- ShutdownActor: cfg.ShutdownActor,
- Mutex: sync.Mutex{},
- }
- err := m.createProc(cpti)
- if err != nil {
- return -1, fmt.Errorf("failed create daemon: %v", err)
- }
- return cpid, nil
- }
- func (m *ChildProcManager) EnableDaemonByID(CPID int64) error {
- if m.freezed {
- return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
- }
- inst, err := m.cpt.GetByID(CPID)
- if err != nil {
- return err
- }
- inst.Lock()
- inst.Enabled = true
- inst.Unlock()
- go func() {
- m.mainMsgLoop.AddMsg(&Msg{CPID: inst.ID})
- }()
- return nil
- }
- func (m *ChildProcManager) DisableDaemonByID(CPID int64) error {
- if m.freezed {
- return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
- }
- inst, err := m.cpt.GetByID(CPID)
- if err != nil {
- return err
- }
- inst.Lock()
- inst.Enabled = false
- inst.Unlock()
- go func() {
- m.mainMsgLoop.AddMsg(&Msg{CPID: inst.ID})
- }()
- return nil
- }
- func (m *ChildProcManager) ForceKill(CPID int64) error {
- inst, err := m.cpt.GetByID(CPID)
- if err != nil {
- return err
- }
- inst.Lock()
- err = inst.Cmd.Process.Kill()
- inst.Unlock()
- if err != nil {
- return err
- }
- return nil
- }
- func (m *ChildProcManager) GetStatusByID(CPID int64) (*DisplayStatusInfo, error) {
- inst, err := m.cpt.GetByID(CPID)
- if err != nil {
- return nil, err
- }
- return &DisplayStatusInfo{
- CPID: inst.ID,
- Name: inst.Name,
- Status: inst.Status,
- StatusText: inst.Status.ToString(),
- StatusAbbr: inst.Status.ToStringAbbr(),
- PID: inst.ProcessID,
- LastStartTime: inst.LastStartTime,
- }, nil
- }
- func (m *ChildProcManager) ListAllStatus() ([]*DisplayStatusInfo, error) {
- all, err := m.cpt.ListAll()
- if err != nil {
- return nil, err
- }
- ret := make([]*DisplayStatusInfo, len(all))
- for i, v := range all {
- ret[i] = &DisplayStatusInfo{
- CPID: v.ID,
- Name: v.Name,
- Status: v.Status,
- StatusText: v.Status.ToString(),
- StatusAbbr: v.Status.ToStringAbbr(),
- PID: v.ProcessID,
- LastStartTime: v.LastStartTime,
- }
- }
- return ret, nil
- }
- func (m *ChildProcManager) QueryStatus(index string, args ...interface{}) ([]*DisplayStatusInfo, error) {
- all, err := m.cpt.DoQuery(index, args...)
- if err != nil {
- return nil, err
- }
- ret := make([]*DisplayStatusInfo, len(all))
- for i, v := range all {
- ret[i] = &DisplayStatusInfo{
- CPID: v.ID,
- Name: v.Name,
- Status: v.Status,
- StatusText: v.Status.ToString(),
- StatusAbbr: v.Status.ToStringAbbr(),
- PID: v.ProcessID,
- LastStartTime: v.LastStartTime,
- }
- }
- return ret, nil
- }
- func (m *ChildProcManager) createProc(cpti *ChildProcTableItem) error {
- if m.freezed {
- return fmt.Errorf("ChildProcManager is not running or freezed for stopping, can not modify any status")
- }
- err := m.cpt.Add(cpti)
- if err != nil {
- return err
- }
- go func() {
- m.mainMsgLoop.AddMsg(&Msg{CPID: cpti.ID})
- }()
- return nil
- }
- func (m *ChildProcManager) Start() chan error {
- cherr := make(chan error, 1)
- if m.isMgrRunning {
- cherr <- fmt.Errorf("ChildProcManager is already running")
- }
- ctx, cncl := context.WithCancel(context.Background())
- m.mainCtx = ctx
- m.mainCancelFunc = cncl
- m.mainMsgLoop = NewMainMsgLoop(m, ctx)
- m.mainRunGroup = &run.Group{}
- m.mainRunGroup.Add(m.mainMsgLoop.RunMsgLoop, m.mainMsgLoop.StopMainLoop)
- m.mainRunGroup.Add(m.mainMsgLoop.RunInspectionLoop, m.mainMsgLoop.StopInspectionLoop)
- go func() {
- m.isMgrRunning = true
- m.freezed = false
- err := m.mainRunGroup.Run()
- m.mainCtx = nil
- m.mainCancelFunc = nil
- m.mainMsgLoop = nil
- m.mainRunGroup = nil
- m.isMgrRunning = false
- cherr <- err
- }()
- return cherr
- }
- func (m *ChildProcManager) Stop() error {
- if m.isMgrRunning {
- if m.mainCancelFunc == nil {
- return fmt.Errorf("ChildProcManager internal error: cancel func is nil")
- }
- m.freezed = true
- m.mainCancelFunc()
- } else {
- return fmt.Errorf("ChildProcManager not running")
- }
- return nil
- }
- func (m *ChildProcManager) IsRunning() bool {
- return m.isMgrRunning
- }
|