Skip to content

fadhelmurphy/dagsflow-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

22 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

dagsflow-go

dagsflow-go adalah tools CLI sederhana untuk membuat dan menjalankan DAG scheduler mirip seperti Airflow menggunakan Go.


Fitur

  • Schedule berbasis cron (robfig/cron)
  • Support DAG dengan dependency dan branching
  • Detach process (background)
  • List status DAG (Is Running / Schedule)
  • Rerun DAG / job (dengan upstream / downstream)
  • Trigger DAG lain (blocking / non-blocking)
  • Configurable DAG (mirip Airflow)
  • Cross-platform tanpa HTTP server / REST API

Instalasi

git clone <repo-url>
cd dagsflow-go
go build -o dagsflow-go

Cara Pakai

Jalankan 1 DAG

./dagsflow-go run <dagName>

Jalankan semua DAG

./dagsflow-go run-all

Stop 1 DAG

./dagsflow-go stop <dagName>

Stop semua DAG

./dagsflow-go stop-all

Lihat status DAG

./dagsflow-go list
DAG Name        | Is Running | Schedule
----------------|------------|---------------------
dag1            | true       | */1 * * * *
dag2            | false      | */2 * * * *

Lihat Graph DAG

./dagsflow-go graph <dagName>

Struktur PID / Marker

  • PID file: dagsflow-pid/{dagName}.pid

  • Running marker: dagsflow-pid/{dagName}.running

Contoh DAG

package dags

import (
	"fmt"
	"dagsflow-go/dag"
)

func init() {
	d := dag.NewDAG("dag1", "*/1 * * * *")
	a := d.NewJob("a", func(ctx *dag.Context) {
		fmt.Println("[DAG1] Run A")
	})
	b := d.NewJob("b", func(ctx *dag.Context) {
		fmt.Println("[DAG1] Run B")
	})
	a.Then(b)
	dag.Register(d)
}

Rerun DAG

./dagsflow-go rerun-dag <dagName>

Rerun Job

./dagsflow-go rerun-job <dagName> <jobID>

Rerun Job dengan upstream / downstream

./dagsflow-go rerun-job <dagName> <jobID> --upstream
./dagsflow-go rerun-job <dagName> <jobID> --downstream
./dagsflow-go rerun-job <dagName> <jobID> --upstream --downstream

Config

d := dag.NewDAG("custom_dag", "*/1 * * * *", map[string]interface{}{
	"threshold": 50,
	"region":    "APAC",
})

di dalam job confignya dapat diakses :

val := ctx.DAG.Config["threshold"].(int)
fmt.Println("Threshold is", val)

Trigger DAG lain sebagai job

Non-blocking trigger

	triggerJob := d.NewJob("trigger_dag_branch", func(ctx *dag.Context) {
	config := map[string]interface{}{
		"param1": "value1",
		"param2": 42,
	}

	// ctx.DAG.TriggerDAG("dag_branch") // tanpa config
	ctx.DAG.TriggerDAGWithConfig("dag_branch",config, false) // dengan config 
	}) // Non Blocking example (tidak perlu nunggu dag nya kelar)

Blocking trigger

	triggerBlockingJob := d.NewJob("trigger_dag1", func(ctx *dag.Context) {
			config := map[string]any{
		"param1": "value1",
		"param2": 42,
	}
	// ctx.DAG.TriggerDAGBlocking("dag1") // tanpa config
	ctx.DAG.TriggerDAGWithConfig("dag1", config, true) // dengan config 
	}) // Blocking example (perlu nunggu dag nya kelar)

About

a workflow orchestrator inspired by Airflow #SoftwareXDataEngineering

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages