workflow.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package helios
  2. import (
  3. "io"
  4. "time"
  5. "git.bazzel.dev/bmallen/helios/pkg/state"
  6. "github.com/aws/smithy-go/ptr"
  7. "github.com/sirupsen/logrus"
  8. )
  9. type (
  10. Workflow struct {
  11. name string
  12. jobs jobs
  13. c Context
  14. start *time.Time
  15. end *time.Time
  16. log *logrus.Logger
  17. }
  18. job struct {
  19. name string
  20. steps steps
  21. needs []string
  22. workflow *Workflow
  23. start *time.Time
  24. end *time.Time
  25. err error
  26. }
  27. jobs []*job
  28. jobOption interface {
  29. Job(j *job)
  30. }
  31. step struct {
  32. name string
  33. f StepFunc
  34. start *time.Time
  35. end *time.Time
  36. workflow *Workflow
  37. job *job
  38. err error
  39. }
  40. steps []*step
  41. needs struct {
  42. name string
  43. }
  44. Context interface {
  45. Get(key string) any
  46. Set(key string, value any)
  47. Delete(key string)
  48. Keys() []string
  49. Range(f func(key string, value any) bool)
  50. ID() string
  51. }
  52. StepFunc func(ctx Context) error
  53. )
  54. func NewWorkflow(name string) *Workflow {
  55. w := &Workflow{
  56. name: name,
  57. jobs: jobs{},
  58. c: state.New(),
  59. log: logrus.New(),
  60. }
  61. return w
  62. }
  63. func (w *Workflow) WithState(c Context) *Workflow {
  64. w.c = c
  65. return w
  66. }
  67. func (w *Workflow) Logger(writer io.Writer) *Workflow {
  68. w.log.SetOutput(writer)
  69. return w
  70. }
  71. func (w *Workflow) Run() (err error) {
  72. w.start = ptr.Time(time.Now())
  73. defer func() {
  74. w.end = ptr.Time(time.Now())
  75. }()
  76. for _, j := range w.jobs {
  77. w.log.Printf("# Job: %s\n", j.name)
  78. skip := false
  79. for _, n := range j.needs {
  80. for _, jj := range w.jobs {
  81. if jj.name == n {
  82. if jj.end == nil || jj.err != nil {
  83. skip = true
  84. break
  85. }
  86. }
  87. }
  88. if skip {
  89. break
  90. }
  91. }
  92. if skip {
  93. w.log.Printf("- skipping\n")
  94. continue
  95. }
  96. j.start = ptr.Time(time.Now())
  97. for _, step := range j.steps {
  98. w.log.Printf("- Step: %s / %s\n", j.name, step.name)
  99. j.err = step.Run()
  100. if j.err != nil {
  101. w.log.Printf("- Error: %s\n", j.err.Error())
  102. break
  103. }
  104. w.log.Printf(" Duration: %s\n", step.Duration())
  105. }
  106. j.end = ptr.Time(time.Now())
  107. }
  108. return err
  109. }
  110. func (w *Workflow) Job(name string, opts ...jobOption) *Workflow {
  111. job := &job{
  112. name: name,
  113. steps: steps{},
  114. needs: []string{},
  115. workflow: w,
  116. }
  117. for _, opt := range opts {
  118. opt.Job(job)
  119. }
  120. w.jobs = append(w.jobs, job)
  121. return w
  122. }
  123. func Step(name string, f StepFunc) jobOption {
  124. return &step{
  125. name: name,
  126. f: f,
  127. }
  128. }
  129. func (s *step) Job(j *job) {
  130. s.job = j
  131. s.workflow = j.workflow
  132. j.steps = append(j.steps, s)
  133. }
  134. func (s *step) Run() error {
  135. s.start = ptr.Time(time.Now())
  136. err := s.f(s.workflow.c)
  137. s.end = ptr.Time(time.Now())
  138. s.err = err
  139. return err
  140. }
  141. func (s *step) Duration() time.Duration {
  142. if s.end == nil || s.start == nil {
  143. return 0
  144. }
  145. return s.end.Sub(ptr.ToTime(s.start))
  146. }
  147. func Needs(name string) jobOption {
  148. return &needs{
  149. name: name,
  150. }
  151. }
  152. func (o *needs) Job(j *job) {
  153. j.needs = append(j.needs, o.name)
  154. }