Skip to content

vgarvardt/gue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

gue

GoDev Coverage Status ReportCard License

Gue is Go queue on top of PostgreSQL that uses transaction-level locks.

Originally this project used to be a fork of bgentry/que-go but because of some backward-compatibility breaking changes and original library author not being very responsive for PRs I turned fork into standalone project. Version 2 breaks internal backward-compatibility with the original project - DB table and all the internal logic (queries, algorithms) is completely rewritten.

The name Gue is yet another silly word transformation: Queue -> Que, Go + Que -> Gue.

Install

go get -u github.com/vgarvardt/gue/v6

Additionally, you need to apply DB migration.

Usage Example

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "os"
  "time"

  "github.com/jackc/pgx/v5/pgxpool"
  "github.com/jackc/pgx/v5/stdlib"
  "golang.org/x/sync/errgroup"

  "github.com/vgarvardt/gue/v6"
)

const (
  printerQueue   = "name_printer"
  jobTypePrinter = "PrintName"
)

type printNameArgs struct {
  Name string
}

func main() {
  printName := func(ctx context.Context, j *gue.Job) error {
    var args printNameArgs
    if err := json.Unmarshal(j.Args, &args); err != nil {
      return err
    }
    fmt.Printf("Hello %s!\n", args.Name)
    return nil
  }

  pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
  if err != nil {
    log.Fatal(err)
  }

  pgxPool, err := pgxpool.NewWithConfig(context.Background(), pgxCfg)
  if err != nil {
    log.Fatal(err)
  }
  defer pgxPool.Close()

  db := stdlib.OpenDBFromPool(pgxPool)

  gc, err := gue.NewClient(db)
  if err != nil {
    log.Fatal(err)
  }
  wm := gue.WorkMap{
    jobTypePrinter: printName,
  }

  finishedJobsLog := func(ctx context.Context, j *gue.Job, err error) {
    if err != nil {
      return
    }

    j.Tx().Exec(
      ctx,
      "INSERT INTO finished_jobs_log (queue, type, run_at) VALUES ($1, $2, now())",
      j.Queue,
      j.Type,
    )
  }

  // create a pool w/ 2 workers
  workers, err := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue(printerQueue), gue.WithPoolHooksJobDone(finishedJobsLog))
  if err != nil {
    log.Fatal(err)
  }

  ctx, shutdown := context.WithCancel(context.Background())

  // work jobs in goroutine
  g, gctx := errgroup.WithContext(ctx)
  g.Go(func() error {
    err := workers.Run(gctx)
    if err != nil {
      // In a real-world applications, use a better way to shut down
      // application on unrecoverable error. E.g. fx.Shutdowner from
      // go.uber.org/fx module.
      log.Fatal(err)
    }
    return err
  })

  args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
  if err != nil {
    log.Fatal(err)
  }

  j := &gue.Job{
    Type:  jobTypePrinter,
    Queue: printerQueue,
    Args:  args,
  }
  if err := gc.Enqueue(context.Background(), j); err != nil {
    log.Fatal(err)
  }

  j = &gue.Job{
    Type:  jobTypePrinter,
    Queue: printerQueue,
    RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
    Args:  args,
  }
  if err := gc.Enqueue(context.Background(), j); err != nil {
    log.Fatal(err)
  }

  time.Sleep(30 * time.Second) // wait for while

  // send shutdown signal to worker
  shutdown()
  if err := g.Wait(); err != nil {
    log.Fatal(err)
  }
}

PostgreSQL drivers

Package is using stdlib database/sql types internally and is tested with the following drivers:

pgx/v5

package main

import (
  "log"
  "os"

  "github.com/jackc/pgx/v5/pgxpool"
  "github.com/jackc/pgx/v5/stdlib"

  "github.com/vgarvardt/gue/v6"
)

func main() {
  pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
  if err != nil {
    log.Fatal(err)
  }

  pgxPool, err := pgxpool.NewConfig(context.Background(), pgxCfg)
  if err != nil {
    log.Fatal(err)
  }
  defer pgxPool.Close()

  db := stdlib.OpenDBFromPool(pgxPool)

  gc, err := gue.NewClient(db)
  ...
}

lib/pq

package main

import (
  "database/sql"
  "log"
  "os"

  _ "github.com/lib/pq" // register postgres driver

  "github.com/vgarvardt/gue/v6"
)

func main() {
  db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
  if err != nil {
    log.Fatal(err)
  }
  defer db.Close()

  gc, err := gue.NewClient(db)
  ...
}

Logging

Package is using stdlib log/slog logger internally. You can use existing adapters like go.uber.org/zap/exp/zapslog to wrap a logger used by your application or implement your own.

About

Golang queue on top of PostgreSQL

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

Contributors 18