Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
defaultServerResultStreamMaxWait = 20 * time.Second
defaultServerMaxRequestBodySize int64 = 8 << 10 // 8KiB
defaultServerCascadeLabels string = "" // 8KiB
defaultServerShutdownTimeout = 40 * time.Second

defaultCircuitHalfOpenSuccesses = 10
defaultCircuitOpenTimeout = 0
Expand Down Expand Up @@ -56,6 +57,7 @@ var config struct {
ResultStreamMaxWait time.Duration
MaxRequestBodySize int64
CascadeLabels string
ShutdownTimeout time.Duration
TopProviderCardinality int
TopProviderReportInterval time.Duration
}
Expand Down Expand Up @@ -83,6 +85,7 @@ func init() {
config.Server.ResultStreamMaxWait = getEnvOrDefault[time.Duration]("SERVER_RESULT_STREAM_MAX_WAIT", defaultServerResultStreamMaxWait)
config.Server.MaxRequestBodySize = getEnvOrDefault[int64]("SERVER_MAX_REQUEST_BODY_SIZE", defaultServerMaxRequestBodySize)
config.Server.CascadeLabels = getEnvOrDefault[string]("SERVER_CASCADE_LABELS", defaultServerCascadeLabels)
config.Server.ShutdownTimeout = getEnvOrDefault[time.Duration]("SERVER_SHUTDOWN_TIMEOUT", defaultServerShutdownTimeout)

config.Server.TopProviderCardinality = getEnvOrDefault[int]("SERVER_TOP_PROVIDER_CARDINALITY", defaultStatMaxProviders)
config.Server.TopProviderReportInterval = getEnvOrDefault[time.Duration]("SERVER_TOP_PROVIDER_REPORT_INVERVAL", defaultStatProviderReportUpdate)
Expand Down
83 changes: 54 additions & 29 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"errors"
"fmt"
"os"
Expand All @@ -17,6 +18,29 @@ import (
const configCheckInterval = 5 * time.Second

func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()

reloadSig := make(chan os.Signal, 1)
signal.Notify(reloadSig, syscall.SIGHUP)
defer signal.Stop(reloadSig)

err := runApp(ctx, os.Args, reloadSig, configCheckInterval, NewServer)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}

os.Exit(0)
}

func runApp(
ctx context.Context,
args []string,
reloadSig chan os.Signal,
configCheckInterval time.Duration,
newServer func(c *cli.Context) (serverInterface, error),
) error {
app := &cli.App{
Name: "indexstar",
Usage: "indexstar is a point in the content routing galaxy - routes requests in a star topology",
Expand Down Expand Up @@ -64,17 +88,15 @@ func main() {
},
},
Action: func(c *cli.Context) error {
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
ctx, ctxCancel := context.WithCancel(c.Context)
defer ctxCancel()
c.Context = ctx

s, err := NewServer(c)
s, err := newServer(c)
if err != nil {
return err
}

sighup := make(chan os.Signal, 1)
signal.Notify(sighup, syscall.SIGHUP)

done := s.Serve()

var (
Expand All @@ -84,7 +106,7 @@ func main() {
timeChan <-chan time.Time
)
if configCheckInterval != 0 {
cfgPath = s.cfgBase
cfgPath = s.GetCfgBase()
if cfgPath == "" {
cfgPath, err = Path("", "")
if err != nil {
Expand All @@ -104,46 +126,49 @@ func main() {
}
}

reloadSig := make(chan struct{}, 1)
for {
select {
case <-sighup:
select {
case reloadSig <- struct{}{}:
default:
}
case <-exit:
return nil
case err := <-done:
return err
case <-reloadSig:
err := s.Reload(c)
if err != nil {
log.Warnf("couldn't reload servers: %s", err)
// Ensure we've started the shutdown sequence
ctxCancel()

// All errors must be collected to ensure the shutdown sequence is complete
allErrs := []error{err}
for err = range done {
allErrs = append(allErrs, err)
}

return errors.Join(allErrs...)

case <-timeChan:
// Detect config file changes and reload config if needed.
var changed bool
modTime, changed, err = fileChanged(s.cfgBase, modTime)
modTime, changed, err = fileChanged(s.GetCfgBase(), modTime)
if err != nil {
log.Errorw("Cannot stat config file", "err", err, "path", cfgPath)
ticker.Stop()
ticker = nil
timeChan = nil // reading from nil channel blocks forever
timeChan = nil // disable timeChan from the select statement
continue
}
if changed {
reloadSig <- struct{}{}
select {
case reloadSig <- syscall.SIGHUP:
default:
}
}

case <-reloadSig:
err := s.Reload(c)
if err != nil {
log.Warnf("couldn't reload servers: %s", err)
}
}
}
},
}
err := app.Run(os.Args)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
os.Exit(0)
err := app.RunContext(ctx, args)
return err
}

func fileChanged(filePath string, modTime time.Time) (time.Time, bool, error) {
Expand Down
Loading
Loading