workflow.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package helios
  2. import (
  3. "io"
  4. "time"
  5. "git.bazzel.dev/bmallen/helios/pkg/state"
  6. "github.com/aws/smithy-go/ptr"
  7. "github.com/sirupsen/logrus"
  8. )
  9. type (
  10. Workflow struct {
  11. name string
  12. jobs jobs
  13. c Context
  14. start *time.Time
  15. end *time.Time
  16. log *logrus.Logger
  17. }
  18. job struct {
  19. name string
  20. steps steps
  21. needs []string
  22. workflow *Workflow
  23. start *time.Time
  24. end *time.Time
  25. err error
  26. }
  27. jobs []*job
  28. jobOption interface {
  29. Job(j *job)
  30. }
  31. step struct {
  32. name string
  33. f StepFunc
  34. start *time.Time
  35. end *time.Time
  36. workflow *Workflow
  37. job *job
  38. err error
  39. }
  40. steps []*step
  41. needs struct {
  42. name string
  43. }
  44. Context interface {
  45. Get(key string) any
  46. Set(key string, value any)
  47. Delete(key string)
  48. Keys() []string
  49. Range(f func(key string, value any) bool)
  50. ID() string
  51. }
  52. StepFunc func(ctx Context) error
  53. )
  54. func NewWorkflow(name string) *Workflow {
  55. w := &Workflow{
  56. name: name,
  57. jobs: jobs{},
  58. c: state.New(),
  59. log: logrus.New(),
  60. }
  61. return w
  62. }
  63. func (w *Workflow) Logger(writer io.Writer) *Workflow {
  64. w.log.SetOutput(writer)
  65. return w
  66. }
  67. func (w *Workflow) Run() (err error) {
  68. w.start = ptr.Time(time.Now())
  69. defer func() {
  70. w.end = ptr.Time(time.Now())
  71. }()
  72. for _, j := range w.jobs {
  73. w.log.Printf("# Job: %s\n", j.name)
  74. skip := false
  75. for _, n := range j.needs {
  76. for _, jj := range w.jobs {
  77. if jj.name == n {
  78. if jj.end == nil || jj.err != nil {
  79. skip = true
  80. break
  81. }
  82. }
  83. }
  84. if skip {
  85. break
  86. }
  87. }
  88. if skip {
  89. w.log.Printf("- skipping\n")
  90. continue
  91. }
  92. j.start = ptr.Time(time.Now())
  93. for _, step := range j.steps {
  94. w.log.Printf("- Step: %s / %s\n", j.name, step.name)
  95. j.err = step.Run()
  96. if j.err != nil {
  97. w.log.Printf("- Error: %s\n", j.err.Error())
  98. break
  99. }
  100. w.log.Printf(" Duration: %s\n", step.Duration())
  101. }
  102. j.end = ptr.Time(time.Now())
  103. }
  104. return err
  105. }
  106. func (w *Workflow) Job(name string, opts ...jobOption) *Workflow {
  107. job := &job{
  108. name: name,
  109. steps: steps{},
  110. needs: []string{},
  111. workflow: w,
  112. }
  113. for _, opt := range opts {
  114. opt.Job(job)
  115. }
  116. w.jobs = append(w.jobs, job)
  117. return w
  118. }
  119. func Step(name string, f StepFunc) jobOption {
  120. return &step{
  121. name: name,
  122. f: f,
  123. }
  124. }
  125. func (s *step) Job(j *job) {
  126. s.job = j
  127. s.workflow = j.workflow
  128. j.steps = append(j.steps, s)
  129. }
  130. func (s *step) Run() error {
  131. s.start = ptr.Time(time.Now())
  132. err := s.f(s.workflow.c)
  133. s.end = ptr.Time(time.Now())
  134. s.err = err
  135. return err
  136. }
  137. func (s *step) Duration() time.Duration {
  138. if s.end == nil || s.start == nil {
  139. return 0
  140. }
  141. return s.end.Sub(ptr.ToTime(s.start))
  142. }
  143. func Needs(name string) jobOption {
  144. return &needs{
  145. name: name,
  146. }
  147. }
  148. func (o *needs) Job(j *job) {
  149. j.needs = append(j.needs, o.name)
  150. }