Skip to content

Commit 26c98a2

Browse files
authored
feat: support tls (#179)
1 parent fd9100a commit 26c98a2

4 files changed

Lines changed: 30 additions & 4 deletions

File tree

fs/contube/contube.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import (
2525
)
2626

2727
var (
28-
ErrSinkTubeNotImplemented = errors.New("sink tube not implemented")
28+
ErrTubeNotImplemented = errors.New("tube not implemented")
29+
ErrSinkTubeNotImplemented = errors.Wrap(ErrTubeNotImplemented, "sink tube not implemented")
30+
ErrSourceTubeNotImplemented = errors.Wrap(ErrTubeNotImplemented, "source tube not implemented")
2931
)
3032

3133
type Record interface {

server/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ type Config struct {
5353

5454
// FunctionStore is the path to the function store
5555
FunctionStore string `mapstructure:"function_store"`
56+
57+
EnableTLS bool `mapstructure:"enable_tls"`
58+
TLSCertFile string `mapstructure:"tls_cert_file"`
59+
TLSKeyFile string `mapstructure:"tls_key_file"`
5660
}
5761

5862
func init() {

server/function_store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ func (f *FunctionStoreImpl) Load() error {
5050
f.loadingFunctions = make(map[string]*model.Function)
5151
info, err := os.Stat(f.path)
5252
if err != nil {
53+
if os.IsNotExist(err) {
54+
slog.Info("the path to the function store does not exist. skip loading functions")
55+
return nil
56+
}
5357
return errors.Wrapf(err, "the path to the function store %s is invalid", f.path)
5458
}
5559
if !info.IsDir() {

server/server.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ type serverOptions struct {
6565
runtimeLoader RuntimeLoaderType
6666
stateStoreLoader StateStoreLoaderType
6767
functionStore string
68+
enableTls bool
69+
tlsCertFile string
70+
tlsKeyFile string
6871
}
6972

7073
type ServerOption interface {
@@ -205,6 +208,14 @@ func WithConfig(config *Config) ServerOption {
205208
return nil, err
206209
}
207210
o.httpListener = ln
211+
o.enableTls = config.EnableTLS
212+
if o.enableTls {
213+
if config.TLSCertFile == "" || config.TLSKeyFile == "" {
214+
return nil, errors.New("TLS certificate and key file must be provided")
215+
}
216+
o.tlsCertFile = config.TLSCertFile
217+
o.tlsKeyFile = config.TLSKeyFile
218+
}
208219
err = initFactories[contube.TubeFactory](config.TubeFactory, o.tubeLoader, func(n string, f contube.TubeFactory) {
209220
o.managerOpts = append(o.managerOpts, fs.WithTubeFactory(n, f))
210221
})
@@ -326,7 +337,8 @@ func (s *Server) Run(context context.Context) {
326337
func (s *Server) startRESTHandlers() error {
327338

328339
statusSvr := new(restful.WebService)
329-
statusSvr.Route(statusSvr.GET("/api/v1/status").To(func(request *restful.Request, response *restful.Response) {
340+
statusSvr.Path("/api/v1/status")
341+
statusSvr.Route(statusSvr.GET("/").To(func(request *restful.Request, response *restful.Response) {
330342
response.WriteHeader(http.StatusOK)
331343
}).
332344
Doc("Get the status of the Function Stream").
@@ -360,7 +372,11 @@ func (s *Server) startRESTHandlers() error {
360372
}
361373
s.httpSvr.Store(httpSvr)
362374

363-
return httpSvr.Serve(s.options.httpListener)
375+
if s.options.enableTls {
376+
return httpSvr.ServeTLS(s.options.httpListener, s.options.tlsCertFile, s.options.tlsKeyFile)
377+
} else {
378+
return httpSvr.Serve(s.options.httpListener)
379+
}
364380
}
365381

366382
func enrichSwaggerObject(swo *spec.Swagger) {
@@ -432,6 +448,7 @@ func (s *Server) WaitForReady(ctx context.Context) <-chan struct{} {
432448
if err != nil {
433449
s.log.InfoContext(ctx, "Detect connection to server failed", slog.Any("error", err))
434450
}
451+
s.log.Info("Server is ready", slog.String("address", s.options.httpListener.Addr().String()))
435452
return true
436453
}
437454
go func() {
@@ -447,7 +464,6 @@ func (s *Server) WaitForReady(ctx context.Context) <-chan struct{} {
447464
return
448465
case <-time.After(1 * time.Second):
449466
if detect() {
450-
s.log.Info("Server is ready", slog.String("address", s.options.httpListener.Addr().String()))
451467
return
452468
}
453469
}

0 commit comments

Comments
 (0)