builtin_fn_flow.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package eval
  2. import (
  3. "errors"
  4. "sync"
  5. "sync/atomic"
  6. "src.elv.sh/pkg/diag"
  7. "src.elv.sh/pkg/eval/vals"
  8. )
  9. // Flow control.
  10. // TODO(xiaq): Document "multi-error".
  11. func init() {
  12. addBuiltinFns(map[string]any{
  13. "run-parallel": runParallel,
  14. // Exception and control
  15. "fail": fail,
  16. "multi-error": multiErrorFn,
  17. "return": returnFn,
  18. "break": breakFn,
  19. "continue": continueFn,
  20. "defer": deferFn,
  21. // Iterations.
  22. "each": each,
  23. "peach": peach,
  24. })
  25. }
  26. func runParallel(fm *Frame, functions ...Callable) error {
  27. var wg sync.WaitGroup
  28. wg.Add(len(functions))
  29. exceptions := make([]Exception, len(functions))
  30. for i, function := range functions {
  31. go func(fm2 *Frame, function Callable, pexc *Exception) {
  32. err := function.Call(fm2, NoArgs, NoOpts)
  33. if err != nil {
  34. *pexc = err.(Exception)
  35. }
  36. wg.Done()
  37. }(fm.Fork("[run-parallel function]"), function, &exceptions[i])
  38. }
  39. wg.Wait()
  40. return MakePipelineError(exceptions)
  41. }
  42. func each(fm *Frame, f Callable, inputs Inputs) error {
  43. broken := false
  44. var err error
  45. inputs(func(v any) {
  46. if broken {
  47. return
  48. }
  49. newFm := fm.Fork("closure of each")
  50. ex := f.Call(newFm, []any{v}, NoOpts)
  51. newFm.Close()
  52. if ex != nil {
  53. switch Reason(ex) {
  54. case nil, Continue:
  55. // nop
  56. case Break:
  57. broken = true
  58. default:
  59. broken = true
  60. err = ex
  61. }
  62. }
  63. })
  64. return err
  65. }
  66. func peach(fm *Frame, f Callable, inputs Inputs) error {
  67. var wg sync.WaitGroup
  68. var broken int32
  69. var errMu sync.Mutex
  70. var err error
  71. inputs(func(v any) {
  72. if atomic.LoadInt32(&broken) != 0 {
  73. return
  74. }
  75. wg.Add(1)
  76. go func() {
  77. newFm := fm.Fork("closure of peach")
  78. newFm.ports[0] = DummyInputPort
  79. ex := f.Call(newFm, []any{v}, NoOpts)
  80. newFm.Close()
  81. if ex != nil {
  82. switch Reason(ex) {
  83. case nil, Continue:
  84. // nop
  85. case Break:
  86. atomic.StoreInt32(&broken, 1)
  87. default:
  88. errMu.Lock()
  89. err = diag.Errors(err, ex)
  90. defer errMu.Unlock()
  91. atomic.StoreInt32(&broken, 1)
  92. }
  93. }
  94. wg.Done()
  95. }()
  96. })
  97. wg.Wait()
  98. return err
  99. }
  100. // FailError is an error returned by the "fail" command.
  101. type FailError struct{ Content any }
  102. // Error returns the string representation of the cause.
  103. func (e FailError) Error() string { return vals.ToString(e.Content) }
  104. // Fields returns a structmap for accessing fields from Elvish.
  105. func (e FailError) Fields() vals.StructMap { return failFields{e} }
  106. type failFields struct{ e FailError }
  107. func (failFields) IsStructMap() {}
  108. func (f failFields) Type() string { return "fail" }
  109. func (f failFields) Content() any { return f.e.Content }
  110. func fail(v any) error {
  111. if e, ok := v.(error); ok {
  112. // MAYBE TODO: if v is an exception, attach a "rethrown" stack trace,
  113. // like Java
  114. return e
  115. }
  116. return FailError{v}
  117. }
  118. func multiErrorFn(excs ...Exception) error {
  119. return PipelineError{excs}
  120. }
  121. func returnFn() error {
  122. return Return
  123. }
  124. func breakFn() error {
  125. return Break
  126. }
  127. func continueFn() error {
  128. return Continue
  129. }
  130. var errDeferNotInClosure = errors.New("defer must be called from within a closure")
  131. func deferFn(fm *Frame, fn Callable) error {
  132. if fm.defers == nil {
  133. return errDeferNotInClosure
  134. }
  135. deferTraceback := fm.traceback
  136. fm.addDefer(func(fm *Frame) Exception {
  137. err := fn.Call(fm, NoArgs, NoOpts)
  138. if exc, ok := err.(Exception); ok {
  139. return exc
  140. }
  141. return &exception{err, deferTraceback}
  142. })
  143. return nil
  144. }