Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/go-chi/render v1.0.1
github.com/go-git/go-git/v5 v5.2.0
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.1.2
github.com/gorilla/mux v1.8.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OI
github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down
55 changes: 55 additions & 0 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"errors"
"github.com/go-git/go-git/v5/plumbing"
log "github.com/sirupsen/logrus"
"net/http"
"time"
)
Expand Down Expand Up @@ -69,3 +70,57 @@ func (u *User) Bind(*http.Request) error {

return nil
}

type Job struct {
PackageName string
Status BuildStatus
Logs BuildLog `json:",omitempty"`
Uuid string
Time time.Time
}

// logsToKeep are the number of log lines to keep when sending a job.
const logsToKeep = 10

func (j Job) Render(w http.ResponseWriter, r *http.Request) error {
// Remove everything but the last 10 log lines. To get all
// logs the /job/{uuid}/logs route can be used. This is because
// the logs can get quite large, and if you want information about a single
// job it's not really useful to get all the logs. This is especially true
// when retrieving *all* jobs. In that case you really don't want all logs to
// be sent over as well
if len(j.Logs) > logsToKeep {
j.Logs = j.Logs[len(j.Logs)-logsToKeep:]
}

return nil
}

type BuildStatus int

const (
BuildStatusPending BuildStatus = iota
BuildStatusPullingRepo
BuildStatusRunning
BuildStatusUploading
BuildStatusDone

BuildStatusErrored
)

type LogLine struct {
Time time.Time
Level log.Level
message string
}

func (j LogLine) Bind(r *http.Request) error {
return nil
}

type BuildLog []LogLine

func (j LogLine) Render(w http.ResponseWriter, r *http.Request) error {

return nil
}
217 changes: 217 additions & 0 deletions pkg/store/jobstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package store

import (
"bytes"
"encoding/gob"
"github.com/dgraph-io/badger/v2"
"github.com/finitum/AAAAA/pkg/models"
"github.com/google/uuid"
"github.com/pkg/errors"
"sync"
"time"
)

type JobStoreWrapper struct {
*Badger

sync.Mutex
callbacks map[string][]func(line *models.LogLine)
}

func NewJobStore(badger *Badger) JobStore {
return &JobStoreWrapper{
Badger: badger,
callbacks: make(map[string][]func(line *models.LogLine)),
}
}

func (b *JobStoreWrapper) NewJob(name string) (*models.Job, error) {
jid, err := uuid.NewUUID()
if err != nil {
return nil, errors.Wrap(err, "uuid")
}

job := models.Job{
PackageName: name,
Status: models.BuildStatusPending,
Logs: nil,
Uuid: jid.String(),
Time: time.Now(),
}

err = b.db.Update(func(txn *badger.Txn) error {
var value bytes.Buffer

enc := gob.NewEncoder(&value)
err := enc.Encode(job)
if err != nil {
return errors.Wrap(err, "gob encode")
}

entry := badger.NewEntry([]byte(jobPrefix+jid.String()), value.Bytes()).WithTTL(jobTTL)
return errors.Wrap(txn.SetEntry(entry), "badger transaction")
})

return &job, err
}

func (b *JobStoreWrapper) AppendToJobLog(jid string, l *models.LogLine) error {
for _, cb := range b.callbacks[jid] {
cb(l)
}

return b.db.Update(func(txn *badger.Txn) error {
var job models.Job

// Get the job
item, err := txn.Get([]byte(jobPrefix + jid))
if err == badger.ErrKeyNotFound {
return ErrNotExists
} else if err != nil {
return errors.Wrap(err, "badger get")
}
err = item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
if err != nil {
return err
}

// Update the job
job.Logs = append(job.Logs, *l)

// Put the job back
var value bytes.Buffer
enc := gob.NewEncoder(&value)
err = enc.Encode(job)
if err != nil {
return errors.Wrap(err, "gob encode")
}

entry := badger.NewEntry([]byte(jobPrefix+jid), value.Bytes()).WithTTL(jobTTL)
return errors.Wrap(txn.SetEntry(entry), "badger transaction")
})
}

func (b *JobStoreWrapper) SetJobStatus(jid string, status models.BuildStatus) error {
return b.db.Update(func(txn *badger.Txn) error {
var job models.Job

// Get the job
item, err := txn.Get([]byte(jobPrefix + jid))
if err == badger.ErrKeyNotFound {
return ErrNotExists
} else if err != nil {
return errors.Wrap(err, "badger get")
}
err = item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
if err != nil {
return err
}

// Update the job
job.Status = status

// Put the job back
var value bytes.Buffer
enc := gob.NewEncoder(&value)
err = enc.Encode(job)
if err != nil {
return errors.Wrap(err, "gob encode")
}

entry := badger.NewEntry([]byte(jobPrefix+jid), value.Bytes()).WithTTL(jobTTL)
return errors.Wrap(txn.SetEntry(entry), "badger transaction")
})
}

func (b *JobStoreWrapper) GetLogs(jid string) (logs []models.LogLine, _ error) {
return logs, b.db.View(func(txn *badger.Txn) error {
var job models.Job

// Get the job
item, err := txn.Get([]byte(jobPrefix + jid))
if err == badger.ErrKeyNotFound {
return ErrNotExists
} else if err != nil {
return errors.Wrap(err, "badger get")
}
err = item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
if err != nil {
return err
}

logs = job.Logs

return nil
})
}

func (b *JobStoreWrapper) GetJobs() (jobs []models.Job, _ error) {
return jobs, b.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
prefix := []byte(jobPrefix)
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
var job models.Job
err := item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
jobs = append(jobs, job)
if err != nil {
return errors.Wrap(err, "badger iteration")
}
}
return nil
})
}

func (b *JobStoreWrapper) AddLogListener(uuid string, cb func(line *models.LogLine)) {
b.Lock()
defer b.Unlock()

b.callbacks[uuid] = append(b.callbacks[uuid], cb)
}

func (b *JobStoreWrapper) GetJob(jid string) (*models.Job, error) {
var job models.Job

return &job, b.db.View(func(txn *badger.Txn) error {

// Get the job
item, err := txn.Get([]byte(jobPrefix + jid))
if err == badger.ErrKeyNotFound {
return ErrNotExists
} else if err != nil {
return errors.Wrap(err, "badger get")
}
err = item.Value(func(val []byte) error {
buf := bytes.NewBuffer(val)

dec := gob.NewDecoder(buf)
return errors.Wrap(dec.Decode(&job), "gob decode")
})
if err != nil {
return err
}

return nil
})
}
32 changes: 32 additions & 0 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Store interface {
}

const pkgPrefix = "pkg_"

type PackageStore interface {
// GetPackage gets a package definition from the store MUST return ErrNotExists if the package does not exist
GetPackage(name string) (*models.Pkg, error)
Expand All @@ -32,6 +33,7 @@ type PackageStore interface {
}

const userPrefix = "user_"

type UserStore interface {
// GetUser gets a user from the store MUST return ErrNotExists if the user does not exist
GetUser(name string) (*models.User, error)
Expand Down Expand Up @@ -88,3 +90,33 @@ func GetPartialCacheEntry(cache Cache, term string) (aur.Results, bool, error) {

return nil, false, ErrNotExists
}

const jobPrefix = "job_"

// Keep job logs for 10 days
const jobTTL = 10 * 24 * time.Hour

type JobStore interface {
// NewJob creates a new job. It returns the newly created job, with in it the
// uuid of the job which can be used for further lookup.
NewJob(name string) (*models.Job, error)

// AppendToJobLog appends a line to a job's log
AppendToJobLog(uuid string, l *models.LogLine) error

// SetJobStatus updates the status of this job
SetJobStatus(uuid string, status models.BuildStatus) error

// GetLogs returns the entire log of this job
GetLogs(uuid string) ([]models.LogLine, error)

// GetJobs returns all jobs
GetJobs() ([]models.Job, error)

// AddLogListener takes a function which will be called every time a new logline is
// added the job targeted with the uuid
AddLogListener(uuid string, cb func(line *models.LogLine))

// GetJob gets a job by uuid
GetJob(uuid string) (*models.Job, error)
}
Loading