| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- package helios
- import (
- "io"
- "time"
- "git.bazzel.dev/bmallen/helios/pkg/state"
- "github.com/aws/smithy-go/ptr"
- "github.com/sirupsen/logrus"
- )
- type (
- Workflow struct {
- name string
- jobs jobs
- c Context
- start *time.Time
- end *time.Time
- log *logrus.Logger
- }
- job struct {
- name string
- steps steps
- needs []string
- workflow *Workflow
- start *time.Time
- end *time.Time
- err error
- }
- jobs []*job
- jobOption interface {
- Job(j *job)
- }
- step struct {
- name string
- f StepFunc
- start *time.Time
- end *time.Time
- workflow *Workflow
- job *job
- err error
- }
- steps []*step
- needs struct {
- name string
- }
- Context interface {
- Get(key string) any
- Set(key string, value any)
- Delete(key string)
- Keys() []string
- Range(f func(key string, value any) bool)
- ID() string
- }
- StepFunc func(ctx Context) error
- )
- func NewWorkflow(name string) *Workflow {
- w := &Workflow{
- name: name,
- jobs: jobs{},
- c: state.New(),
- log: logrus.New(),
- }
- return w
- }
- func (w *Workflow) Logger(writer io.Writer) *Workflow {
- w.log.SetOutput(writer)
- return w
- }
- func (w *Workflow) Run() (err error) {
- w.start = ptr.Time(time.Now())
- defer func() {
- w.end = ptr.Time(time.Now())
- }()
- for _, j := range w.jobs {
- w.log.Printf("# Job: %s\n", j.name)
- skip := false
- for _, n := range j.needs {
- for _, jj := range w.jobs {
- if jj.name == n {
- if jj.end == nil || jj.err != nil {
- skip = true
- break
- }
- }
- }
- if skip {
- break
- }
- }
- if skip {
- w.log.Printf("- skipping\n")
- continue
- }
- j.start = ptr.Time(time.Now())
- for _, step := range j.steps {
- w.log.Printf("- Step: %s / %s\n", j.name, step.name)
- j.err = step.Run()
- if j.err != nil {
- w.log.Printf("- Error: %s\n", j.err.Error())
- break
- }
- w.log.Printf(" Duration: %s\n", step.Duration())
- }
- j.end = ptr.Time(time.Now())
- }
- return err
- }
- func (w *Workflow) Job(name string, opts ...jobOption) *Workflow {
- job := &job{
- name: name,
- steps: steps{},
- needs: []string{},
- workflow: w,
- }
- for _, opt := range opts {
- opt.Job(job)
- }
- w.jobs = append(w.jobs, job)
- return w
- }
- func Step(name string, f StepFunc) jobOption {
- return &step{
- name: name,
- f: f,
- }
- }
- func (s *step) Job(j *job) {
- s.job = j
- s.workflow = j.workflow
- j.steps = append(j.steps, s)
- }
- func (s *step) Run() error {
- s.start = ptr.Time(time.Now())
- err := s.f(s.workflow.c)
- s.end = ptr.Time(time.Now())
- s.err = err
- return err
- }
- func (s *step) Duration() time.Duration {
- if s.end == nil || s.start == nil {
- return 0
- }
- return s.end.Sub(ptr.ToTime(s.start))
- }
- func Needs(name string) jobOption {
- return &needs{
- name: name,
- }
- }
- func (o *needs) Job(j *job) {
- j.needs = append(j.needs, o.name)
- }
|