job.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. package job
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "path/filepath"
  8. "reflect"
  9. "strings"
  10. "time"
  11. "git.bazzel.dev/bmallen/helios/pkg/state"
  12. "git.bazzel.dev/bmallen/helios/pkg/symbols"
  13. "github.com/go-git/go-git/v6"
  14. "github.com/go-git/go-git/v6/plumbing"
  15. "github.com/go-git/go-git/v6/plumbing/transport/http"
  16. "github.com/otiai10/copy"
  17. "github.com/sirupsen/logrus"
  18. "github.com/traefik/yaegi/interp"
  19. "github.com/traefik/yaegi/stdlib"
  20. "golang.org/x/mod/modfile"
  21. "gopkg.in/yaml.v3"
  22. )
  23. type Job struct {
  24. state *state.State
  25. sourceDir string
  26. files []string
  27. source string
  28. ref string
  29. user string
  30. password string
  31. globals map[string]any
  32. timeout time.Duration
  33. autoimport bool
  34. autocleanup bool
  35. module string
  36. errors []error
  37. log *logrus.Logger
  38. logs []string
  39. workspaceCreated bool
  40. workspacedir string
  41. i *interp.Interpreter
  42. }
  43. type BlankFormatter struct{}
  44. // Format renders a log entry to only the message part.
  45. func (f *BlankFormatter) Format(entry *logrus.Entry) ([]byte, error) {
  46. // Return the message string as a byte slice, adding a newline character.
  47. return []byte(entry.Message + "\n"), nil
  48. }
  49. func New() *Job {
  50. j := &Job{
  51. state: state.New(),
  52. timeout: 10 * time.Second,
  53. log: logrus.New(),
  54. globals: map[string]any{},
  55. workspacedir: "workspace",
  56. }
  57. j.log.Formatter = &BlankFormatter{}
  58. j.log.AddHook(j)
  59. return j
  60. }
  61. func NewWithState(state *state.State) *Job {
  62. j := New()
  63. j.state = state
  64. return j
  65. }
  66. func (j *Job) Levels() []logrus.Level {
  67. return logrus.AllLevels
  68. }
  69. func (j *Job) Fire(e *logrus.Entry) error {
  70. // j.logs = append(j.logs, fmt.Sprintf("%s: %s", e.Level.String(), e.Message))
  71. j.logs = append(j.logs, e.Message)
  72. return nil
  73. }
  74. func (j *Job) error(err error) *Job {
  75. if err != nil {
  76. j.log.Error(err)
  77. if j.errors == nil {
  78. j.errors = []error{}
  79. }
  80. j.errors = append(j.errors, err)
  81. }
  82. return j
  83. }
  84. func (j *Job) SourceDir(dir string) *Job {
  85. j.sourceDir = dir
  86. return j
  87. }
  88. func (j *Job) AutoCleanup() *Job {
  89. j.autocleanup = true
  90. return j
  91. }
  92. func (j *Job) Module(module string) *Job {
  93. j.module = module
  94. return j
  95. }
  96. func (j *Job) Source(url, ref string) *Job {
  97. if j.source != "" {
  98. return j.error(errors.New("source has already been set"))
  99. }
  100. j.source = url
  101. j.ref = url
  102. err := os.MkdirAll(filepath.Join(j.workspacedir, j.ID()), 0770)
  103. if err != nil {
  104. return j.error(err)
  105. }
  106. j.log.Info("Cloning...")
  107. _, err = git.PlainClone(
  108. filepath.Join(j.workspacedir, j.ID(), "repo"),
  109. &git.CloneOptions{
  110. URL: j.source,
  111. ReferenceName: plumbing.ReferenceName(ref),
  112. Progress: j.log.WriterLevel(logrus.DebugLevel),
  113. Auth: &http.BasicAuth{
  114. Username: j.user, // yes, this can be anything except an empty string
  115. Password: j.password,
  116. },
  117. })
  118. j.error(err)
  119. j.sourceDir = filepath.Join(j.workspacedir, j.ID(), "repo")
  120. return j
  121. }
  122. func (j *Job) User(user string) *Job {
  123. j.user = user
  124. return j
  125. }
  126. func (j *Job) Password(password string) *Job {
  127. j.password = password
  128. return j
  129. }
  130. func (j *Job) Timeout(timeout time.Duration) *Job {
  131. j.timeout = timeout
  132. return j
  133. }
  134. func (j *Job) AutoImport() *Job {
  135. j.autoimport = true
  136. return j
  137. }
  138. func (j *Job) Run(f string) *Job {
  139. if j.sourceDir == "" {
  140. return j.error(errors.New("must provide source"))
  141. }
  142. start := time.Now()
  143. workdir := filepath.Join(j.workspacedir, j.state.ID(), "src", j.module)
  144. if !j.workspaceCreated {
  145. if j.module == "" {
  146. modfiledata, err := os.ReadFile(filepath.Join(j.sourceDir, "go.mod"))
  147. if err != nil {
  148. j.module = "pipeline"
  149. } else {
  150. fff, err := modfile.Parse("go.mod", modfiledata, nil)
  151. if err != nil {
  152. return j.error(err)
  153. }
  154. // for _, r := range fff.Require {
  155. // j.log.Info(" ", r.Mod.String())
  156. // }
  157. j.module = fff.Module.Mod.Path
  158. }
  159. }
  160. j.log.Info("Creating workspace...")
  161. err := os.MkdirAll(filepath.Join(j.workspacedir, j.state.ID(), "src"), 0770)
  162. if err != nil {
  163. return j.error(err)
  164. }
  165. workdir = filepath.Join(j.workspacedir, j.state.ID(), "src", j.module)
  166. err = copy.Copy(j.sourceDir, workdir)
  167. if err != nil {
  168. return j.error(err)
  169. }
  170. j.workspaceCreated = true
  171. }
  172. if j.autocleanup {
  173. defer func() {
  174. }()
  175. }
  176. j.log.Info("Setting up new interpreter...")
  177. workspace, err := filepath.Abs(filepath.Join(j.workspacedir, j.state.ID()))
  178. if err != nil {
  179. return j.error(err)
  180. }
  181. if j.autocleanup {
  182. defer func() {
  183. err := os.RemoveAll(workspace)
  184. if err != nil {
  185. j.error(err)
  186. }
  187. }()
  188. }
  189. j.i = interp.New(interp.Options{
  190. GoPath: workspace,
  191. Unrestricted: true,
  192. Stdin: nil,
  193. Stdout: j.log.WriterLevel(logrus.InfoLevel),
  194. Stderr: j.log.WriterLevel(logrus.WarnLevel),
  195. })
  196. err = j.i.Use(stdlib.Symbols)
  197. if err != nil {
  198. return j.error(err)
  199. }
  200. err = j.i.Use(symbols.Symbols)
  201. if err != nil {
  202. return j.error(err)
  203. }
  204. err = j.i.Use(j.state.Symbols())
  205. if err != nil {
  206. return j.error(err)
  207. }
  208. if j.autoimport {
  209. j.i.ImportUsed()
  210. }
  211. j.log.Info("Starting job...")
  212. err = os.Chdir(workdir)
  213. if err != nil {
  214. return j.error(err)
  215. }
  216. j.files = append(j.files, f)
  217. if strings.HasSuffix(f, ".go") {
  218. data, err := os.ReadFile(f)
  219. if err != nil {
  220. return j.error(err)
  221. }
  222. ctx, _ := context.WithTimeout(context.Background(), j.timeout)
  223. _, err = j.i.EvalWithContext(ctx, string(data))
  224. if err != nil {
  225. j.error(err)
  226. }
  227. } else {
  228. ctx, _ := context.WithTimeout(context.Background(), j.timeout)
  229. _, err := j.i.EvalPathWithContext(ctx, filepath.Join(j.module, f))
  230. if err != nil {
  231. j.error(err)
  232. }
  233. }
  234. // for p, vv := range j.i.Symbols("pipeline") {
  235. // for k, v := range vv {
  236. // fmt.Println(p, k, v.Type(), v)
  237. // if v.Type() == reflect.TypeOf(func() *helios.Workflow { return nil }) {
  238. // fmt.Println(p, k)
  239. // _, err := j.i.Eval(`import "log"`)
  240. // if err != nil {
  241. // j.error(err)
  242. // }
  243. // fmt.Println(vv)
  244. // vv, err := j.i.Eval("pipeline."+k + "().Logger(log.Writer()).Run()")
  245. // if err != nil {
  246. // j.error(err)
  247. // }
  248. // fmt.Println(vv)
  249. // }
  250. // }
  251. // }
  252. for k, v := range j.i.Globals() {
  253. if v.CanInterface() {
  254. if v.Kind() == reflect.Func {
  255. continue
  256. }
  257. j.globals[k] = v.Interface()
  258. }
  259. }
  260. j.log.Info("Job finished in: ", time.Now().Sub(start))
  261. return j
  262. }
  263. func (j *Job) Globals() map[string]any {
  264. return j.globals
  265. }
  266. func (j *Job) State() *state.State {
  267. return j.state
  268. }
  269. func (j *Job) ID() string {
  270. return j.state.ID()
  271. }
  272. func (j *Job) Error() error {
  273. if j.errors != nil {
  274. err := []string{}
  275. if len(j.errors) > 1 {
  276. err = append(err, fmt.Sprintf("!!! %d: errors in stack !!!", len(j.errors)))
  277. }
  278. for i, e := range j.errors {
  279. err = append(err, fmt.Sprintf("%d: %s", i, e.Error()))
  280. }
  281. return errors.New(strings.Join(err, "\n"))
  282. }
  283. return nil
  284. }
  285. type Report struct {
  286. Module string
  287. ID string
  288. Source string
  289. Files []string
  290. Globals map[string]any
  291. State map[string]any
  292. Errors []string
  293. Logs []string
  294. }
  295. func (j *Job) Report() *Job {
  296. report := Report{
  297. ID: j.ID(),
  298. Source: j.source,
  299. Files: j.files,
  300. Module: j.module,
  301. Globals: map[string]any{},
  302. State: map[string]any{},
  303. Logs: j.logs,
  304. }
  305. for k, v := range j.Globals() {
  306. report.Globals[k] = v
  307. }
  308. for _, v := range j.errors {
  309. report.Errors = append(report.Errors, v.Error())
  310. }
  311. j.state.Range(func(key string, value any) bool {
  312. report.State[key] = value
  313. return true
  314. })
  315. out, err := yaml.Marshal(report)
  316. if err != nil {
  317. j.log.Error(err)
  318. } else {
  319. fmt.Print(string(out))
  320. }
  321. return j
  322. }