workflow.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package workflow
  2. import (
  3. "time"
  4. "git.bazzel.dev/bmallen/helios/pkg/state"
  5. "github.com/sirupsen/logrus"
  6. )
  7. type (
  8. Workflow struct {
  9. name string
  10. jobs jobs
  11. c Context
  12. start time.Time
  13. end time.Time
  14. log *logrus.Logger
  15. }
  16. job struct {
  17. name string
  18. steps steps
  19. needs []string
  20. workflow *Workflow
  21. start time.Time
  22. end time.Time
  23. }
  24. jobs []*job
  25. jobOption interface {
  26. Job(j *job)
  27. }
  28. step struct {
  29. name string
  30. f StepFunc
  31. start time.Time
  32. end time.Time
  33. workflow *Workflow
  34. job *job
  35. }
  36. steps []*step
  37. needs struct {
  38. name string
  39. }
  40. Context interface {
  41. Get(key string) any
  42. Set(key string, value any)
  43. Delete(key string)
  44. Keys() []string
  45. Range(f func(key string, value any) bool)
  46. ID() string
  47. }
  48. StepFunc func(ctx Context) error
  49. )
  50. func NewWorkflow(name string) *Workflow {
  51. return &Workflow{
  52. name: name,
  53. jobs: jobs{},
  54. c: state.New(),
  55. log: logrus.New(),
  56. }
  57. }
  58. func (w *Workflow) Run() (err error) {
  59. w.start = time.Now()
  60. defer func() {
  61. w.end = time.Now()
  62. }()
  63. for _, j := range w.jobs {
  64. j.start = time.Now()
  65. w.log.Printf("# Job: %s\n", j.name)
  66. for _, step := range j.steps {
  67. w.log.Printf("- Step: %s / %s\n", j.name, step.name)
  68. err := step.Run()
  69. w.log.Printf(" Duration: %s\n", step.Duration())
  70. if err != nil {
  71. break
  72. }
  73. }
  74. j.end = time.Now()
  75. if err != nil {
  76. break
  77. }
  78. }
  79. return err
  80. }
  81. func (w *Workflow) Job(name string, opts ...jobOption) *Workflow {
  82. job := &job{
  83. name: name,
  84. steps: steps{},
  85. needs: []string{},
  86. workflow: w,
  87. }
  88. for _, opt := range opts {
  89. opt.Job(job)
  90. }
  91. w.jobs = append(w.jobs, job)
  92. return w
  93. }
  94. func Step(name string, f StepFunc) jobOption {
  95. return &step{
  96. name: name,
  97. f: f,
  98. }
  99. }
  100. func (s *step) Job(j *job) {
  101. s.job = j
  102. s.workflow = j.workflow
  103. j.steps = append(j.steps, s)
  104. }
  105. func (s *step) Run() error {
  106. s.start = time.Now()
  107. err := s.f(s.workflow.c)
  108. s.end = time.Now()
  109. return err
  110. }
  111. func (s *step) Duration() time.Duration {
  112. return s.end.Sub(s.start)
  113. }
  114. func Needs(name string) jobOption {
  115. return &needs{
  116. name: name,
  117. }
  118. }
  119. func (o *needs) Job(j *job) {
  120. j.needs = append(j.needs, o.name)
  121. }