Skip to content

Upload image to dataset poc #5089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,5 @@ require (
github.com/ziutek/mymysql v1.5.4 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
)

replace go.viam.com/api => ../api
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1511,8 +1511,6 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.viam.com/api v0.1.450 h1:AiWmIJeFlr0TnrtTg2KVEd7JJVgp+aALUxdMAlRVTrc=
go.viam.com/api v0.1.450/go.mod h1:gwJriv6EVWe97uFzzzWjzP3NPfpCrKtRAdWtYglUpqs=
go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=
go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI=
go.viam.com/utils v0.1.147 h1:1z2mo/8GVR1jRRckr4qC8FLTps5kpI9fkCeHL4P1yw4=
Expand Down
14 changes: 14 additions & 0 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ func (b *builtIn) Sync(ctx context.Context, extra map[string]interface{}) error
return b.sync.Sync(ctx, extra)
}

func (b *builtIn) UploadImageToDataset(ctx context.Context,
image []byte,
datasetIDs []string,
tags []string,
extra map[string]interface{},
) error {
b.logger.Info("UploadImageToDataset START")
defer b.logger.Info("UploadImageToDataset END")
b.mu.Lock()
defer b.mu.Unlock()
b.sync.UploadImageToDataset(ctx, image, datasetIDs, tags)
return nil
}

// Reconfigure updates the data manager service when the config has changed.
// At time of writing Reconfigure only returns an error in one of the following unrecoverable error cases:
// 1. There is some static (aka compile time) error which we currently are only able to detected at runtime:
Expand Down
25 changes: 21 additions & 4 deletions services/datamanager/builtin/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/benbjohnson/clock"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/multierr"
v1 "go.viam.com/api/app/datasync/v1"
Expand Down Expand Up @@ -362,10 +363,26 @@ func (s *Sync) syncFile(config Config, filePath string) {
if data.IsDataCaptureFile(f) {
s.syncDataCaptureFile(f, config.CaptureDir, s.logger)
} else {
s.syncArbitraryFile(f, config.Tags, config.FileLastModifiedMillis, s.logger)
s.syncArbitraryFile(f, config.Tags, []string{}, config.FileLastModifiedMillis, s.logger)
}
}

func (s *Sync) UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string) error {
filename := uuid.NewString()
err := os.WriteFile(filename, image, os.ModeAppend)
if err != nil {
s.logger.Errorw("error writing file", "err", err)
return err
}
f, err := os.Open(filename)
if err != nil {
s.logger.Errorw("error reading file", "err", err)
return err
}
s.syncArbitraryFile(f, tags, datasetIDs, 0, s.logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this sync or async?
if we're writing to disk anyway, we can make async.
otherwise, let's not write to disk

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be async. The reason to write to disk is because all of our functionality is written to take in a os.File and in the name of not rewriting everything just to account for this, thought it would be easier just to write to disk and delete it afterwards.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, just mark it as TODO please.

return nil
}

func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging.Logger) {
captureFile, err := data.ReadCaptureFile(f)
// if you can't read the capture file's metadata field, close & move it to the failed directory
Expand Down Expand Up @@ -434,10 +451,10 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
}
}

func (s *Sync) syncArbitraryFile(f *os.File, tags []string, fileLastModifiedMillis int, logger logging.Logger) {
func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLastModifiedMillis int, logger logging.Logger) {
retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) {
errMetadata := fmt.Sprintf("error uploading arbitrary file %s", f.Name())
bytesUploaded, err := uploadArbitraryFile(ctx, f, s.cloudConn, tags, fileLastModifiedMillis, s.clock, logger)
bytesUploaded, err := uploadArbitraryFile(ctx, f, s.cloudConn, tags, datasetIDs, fileLastModifiedMillis, s.clock, logger)
if err != nil {
return 0, errors.Wrap(err, errMetadata)
}
Expand All @@ -448,7 +465,7 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags []string, fileLastModifiedMill
bytesUploaded, err := retry.run()
if err != nil {
if closeErr := f.Close(); closeErr != nil {
logger.Error(errors.Wrap(closeErr, "error closing data capture file").Error())
logger.Error(errors.Wrap(closeErr, "error closing arbitrary file").Error())
}

// if we stopped due to a cancelled context,
Expand Down
22 changes: 13 additions & 9 deletions services/datamanager/builtin/sync/upload_arbitrary_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ var (
errFileModifiedTooRecently = errors.New("file modified too recently")
)

// uploadArbitraryFile uploads files which were not writted by the builtin datamanager's data capture package.
// UploadArbitraryFile uploads files which were not writted by the builtin datamanager's data capture package.
// They are frequently files written by 3rd party programs such as images, videos, logs, written to
// the capture directory or a subdirectory or to additional sync paths (or their sub directories).
// Note: the bytes size returned is the size of the input file. It only returns a non 0 value in the success case.
func uploadArbitraryFile(
ctx context.Context,
f *os.File,
conn cloudConn,
tags []string,
tags, datasetIDs []string,
fileLastModifiedMillis int,
clock clock.Clock,
logger logging.Logger,
Expand Down Expand Up @@ -78,15 +78,19 @@ func uploadArbitraryFile(

// Send metadata FileUploadRequest.
logger.Debugf("datasync.FileUpload request sending metadata for arbitrary file: %s", path)
md := &v1.UploadMetadata{
PartId: conn.partID,
Type: v1.DataType_DATA_TYPE_FILE,
FileName: path,
FileExtension: filepath.Ext(f.Name()),
Tags: tags,
}
if len(datasetIDs) > 0 {
md.DatasetIds = datasetIDs
}
if err := stream.Send(&v1.FileUploadRequest{
UploadPacket: &v1.FileUploadRequest_Metadata{
Metadata: &v1.UploadMetadata{
PartId: conn.partID,
Type: v1.DataType_DATA_TYPE_FILE,
FileName: path,
FileExtension: filepath.Ext(f.Name()),
Tags: tags,
},
Metadata: md,
},
}); err != nil {
return 0, errors.Wrap(err, "FileUpload failed sending metadata")
Expand Down
17 changes: 17 additions & 0 deletions services/datamanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@ func (c *client) Sync(ctx context.Context, extra map[string]interface{}) error {
return nil
}

func (c *client) UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string, extra map[string]interface{}) error {
ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return err
}
_, err = c.client.UploadImageToDataset(ctx, &pb.UploadImageToDatasetRequest{
Image: image,
DatasetIds: datasetIDs,
Tags: tags,
Extra: ext,
})
if err != nil {
return err
}
return nil
}

func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) {
return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd)
}
2 changes: 2 additions & 0 deletions services/datamanager/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Service interface {
resource.Resource
// Sync will sync data stored on the machine to the cloud.
Sync(ctx context.Context, extra map[string]interface{}) error
UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need the mime type?
can we also have an image.Image version even if just for the go sdk?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! we definitely should have the MIME type. Lemme add that.

Do you want the image.Image version instead or in addition? If we pass in MIME type then it’s not an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd like both.

extra map[string]interface{}) error
}

// SubtypeName is the name of the type of service.
Expand Down
14 changes: 14 additions & 0 deletions services/datamanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ func (server *serviceServer) Sync(ctx context.Context, req *pb.SyncRequest) (*pb
return &pb.SyncResponse{}, nil
}

func (server *serviceServer) UploadImageToDataset(
ctx context.Context,
req *pb.UploadImageToDatasetRequest,
) (*pb.UploadImageToDatasetResponse, error) {
svc, err := server.coll.Resource(req.Name)
if err != nil {
return nil, err
}
if err := svc.UploadImageToDataset(ctx, req.Image, req.DatasetIds, req.Tags, req.Extra.AsMap()); err != nil {
return nil, err
}
return &pb.UploadImageToDatasetResponse{}, nil
}

// DoCommand receives arbitrary commands.
func (server *serviceServer) DoCommand(ctx context.Context,
req *commonpb.DoCommandRequest,
Expand Down
Loading