Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 78 additions & 58 deletions internal/cache/file/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package downloader
import (
"context"
"os"
"path"
"sync"
"testing"
"time"
Expand All @@ -32,16 +31,18 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
testutil "github.com/googlecloudplatform/gcsfuse/v3/internal/util"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations"
. "github.com/jacobsa/ogletest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

var cacheDir = path.Join(os.Getenv("HOME"), "cache/dir")

func TestDownloader(t *testing.T) { RunTests(t) }
func TestDownloaderSuite(t *testing.T) {
suite.Run(t, new(downloaderTest))
}

type downloaderTest struct {
suite.Suite
defaultFileCacheConfig *cfg.FileCacheConfig
job *Job
bucket gcs.Bucket
Expand All @@ -50,70 +51,86 @@ type downloaderTest struct {
fakeStorage storage.FakeStorage
fileSpec data.FileSpec
jm *JobManager
cacheDir string
}

func init() { RegisterTestSuite(&downloaderTest{}) }

func (dt *downloaderTest) setupHelper() {
locker.EnableInvariantsCheck()
operations.RemoveDir(cacheDir)
// Create unique temp directory per test
var err error
dt.cacheDir, err = os.MkdirTemp("", "gcsfuse-test-*")
require.NoError(dt.T(), err)

// Create bucket in fake storage.
var err error
mockClient := new(storage.MockStorageControlClient)
dt.fakeStorage = storage.NewFakeStorageWithMockClient(mockClient, cfg.HTTP2)
storageHandle := dt.fakeStorage.CreateStorageHandle()
mockClient.On("GetStorageLayout", mock.Anything, mock.Anything, mock.Anything).
Return(&controlpb.StorageLayout{}, nil)
ctx := context.Background()
dt.bucket, err = storageHandle.BucketHandle(ctx, storage.TestBucketName, "", false)
ExpectEq(nil, err)
require.NoError(dt.T(), err)

dt.initJobTest(DefaultObjectName, []byte("taco"), DefaultSequentialReadSizeMb, CacheMaxSize, func() {})
dt.jm = NewJobManager(dt.cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, DefaultSequentialReadSizeMb, dt.defaultFileCacheConfig, metrics.NewNoopMetrics())
dt.jm = NewJobManager(dt.cache, util.DefaultFilePerm, util.DefaultDirPerm, dt.cacheDir, DefaultSequentialReadSizeMb, dt.defaultFileCacheConfig, metrics.NewNoopMetrics())
}

func (dt *downloaderTest) SetUp(*TestInfo) {
func (dt *downloaderTest) SetupTest() {
dt.defaultFileCacheConfig = &cfg.FileCacheConfig{EnableCrc: true, ExperimentalParallelDownloadsDefaultOn: true}
dt.setupHelper()
}

func (dt *downloaderTest) TearDown() {
func (dt *downloaderTest) TearDownTest() {
dt.fakeStorage.ShutDown()
operations.RemoveDir(cacheDir)
// Clean up temp dir
if dt.cacheDir != "" {
os.RemoveAll(dt.cacheDir)
}
}

func (dt *downloaderTest) waitForCrcCheckToBeCompleted() {
// Last notification is sent after the entire file is downloaded and before the CRC check is done.
// Hence, explicitly waiting till the CRC check is done.
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

for {
dt.job.mu.Lock()
if dt.job.status.Name == Completed || dt.job.status.Name == Failed || dt.job.status.Name == Invalid {
select {
case <-timeout:
dt.job.mu.Lock()
status := dt.job.status
dt.job.mu.Unlock()
require.Failf(dt.T(), "Timeout waiting for CRC check", "Current status: %v", status.Name)
return
case <-ticker.C:
dt.job.mu.Lock()
if dt.job.status.Name == Completed || dt.job.status.Name == Failed || dt.job.status.Name == Invalid {
dt.job.mu.Unlock()
return
}
dt.job.mu.Unlock()
break
}
dt.job.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
}

func (dt *downloaderTest) verifyJob(job *Job, object *gcs.MinObject, bucket gcs.Bucket, sequentialReadSizeMb int32) {
job.mu.Lock()
defer job.mu.Unlock()
ExpectEq(object.Generation, job.object.Generation)
ExpectEq(object.Name, job.object.Name)
ExpectEq(bucket.Name(), job.bucket.Name())
assert.Equal(dt.T(), object.Generation, job.object.Generation)
assert.Equal(dt.T(), object.Name, job.object.Name)
assert.Equal(dt.T(), bucket.Name(), job.bucket.Name())
downloadPath := util.GetDownloadPath(dt.jm.cacheDir, util.GetObjectPath(bucket.Name(), object.Name))
ExpectEq(downloadPath, job.fileSpec.Path)
ExpectEq(sequentialReadSizeMb, job.sequentialReadSizeMb)
ExpectNe(nil, job.removeJobCallback)
assert.Equal(dt.T(), downloadPath, job.fileSpec.Path)
assert.Equal(dt.T(), sequentialReadSizeMb, job.sequentialReadSizeMb)
assert.NotNil(dt.T(), job.removeJobCallback)
}

func (dt *downloaderTest) Test_CreateJobIfNotExists_NotExisting() {
dt.jm.mu.Lock()
objectPath := util.GetObjectPath(dt.bucket.Name(), dt.object.Name)
_, ok := dt.jm.jobs[objectPath]
AssertFalse(ok)
assert.False(dt.T(), ok)
dt.jm.mu.Unlock()

// Call CreateJobIfNotExists for job which doesn't exist.
Expand All @@ -123,8 +140,8 @@ func (dt *downloaderTest) Test_CreateJobIfNotExists_NotExisting() {
defer dt.jm.mu.Unlock()
dt.verifyJob(job, &dt.object, dt.bucket, dt.jm.sequentialReadSizeMb)
actualJob, ok := dt.jm.jobs[objectPath]
AssertTrue(ok)
AssertEq(job, actualJob)
assert.True(dt.T(), ok)
assert.Equal(dt.T(), job, actualJob)
}

func (dt *downloaderTest) Test_CreateJobIfNotExists_Existing() {
Expand All @@ -137,44 +154,44 @@ func (dt *downloaderTest) Test_CreateJobIfNotExists_Existing() {
// Call CreateJobIfNotExists for existing job.
job := dt.jm.CreateJobIfNotExists(&dt.object, dt.bucket)

AssertEq(dt.job, job)
assert.Equal(dt.T(), dt.job, job)
dt.jm.mu.Lock()
defer dt.jm.mu.Unlock()
dt.verifyJob(job, &dt.object, dt.bucket, dt.jm.sequentialReadSizeMb)
actualJob, ok := dt.jm.jobs[objectPath]
AssertTrue(ok)
AssertEq(job, actualJob)
assert.True(dt.T(), ok)
assert.Equal(dt.T(), job, actualJob)
}

func (dt *downloaderTest) Test_CreateJobIfNotExists_NotExisting_WithDefaultFileAndDirPerm() {
dt.jm.mu.Lock()
objectPath := util.GetObjectPath(dt.bucket.Name(), dt.object.Name)
_, ok := dt.jm.jobs[objectPath]
AssertFalse(ok)
assert.False(dt.T(), ok)
dt.jm.mu.Unlock()

// Call CreateJobIfNotExists for job which doesn't exist.
job := dt.jm.CreateJobIfNotExists(&dt.object, dt.bucket)

ExpectEq(0700, job.fileSpec.DirPerm.Perm())
ExpectEq(0600, job.fileSpec.FilePerm.Perm())
assert.Equal(dt.T(), os.FileMode(0700), job.fileSpec.DirPerm.Perm())
assert.Equal(dt.T(), os.FileMode(0600), job.fileSpec.FilePerm.Perm())
}

func (dt *downloaderTest) Test_GetJob_NotExisting() {
dt.jm.mu.Lock()
objectPath := util.GetObjectPath(dt.bucket.Name(), dt.object.Name)
_, ok := dt.jm.jobs[objectPath]
AssertFalse(ok)
assert.False(dt.T(), ok)
dt.jm.mu.Unlock()

// Call GetJob for job which doesn't exist.
job := dt.jm.GetJob(dt.object.Name, dt.bucket.Name())

AssertEq(nil, job)
assert.Nil(dt.T(), job)
dt.jm.mu.Lock()
defer dt.jm.mu.Unlock()
_, ok = dt.jm.jobs[objectPath]
AssertFalse(ok)
assert.False(dt.T(), ok)
}

func (dt *downloaderTest) Test_GetJob_Existing() {
Expand All @@ -187,7 +204,7 @@ func (dt *downloaderTest) Test_GetJob_Existing() {
// Call GetJob for existing job.
job := dt.jm.GetJob(dt.object.Name, dt.bucket.Name())

AssertEq(dt.job, job)
assert.Equal(dt.T(), dt.job, job)
dt.verifyJob(job, &dt.object, dt.bucket, dt.jm.sequentialReadSizeMb)
}

Expand All @@ -197,12 +214,15 @@ func (dt *downloaderTest) Test_GetJob_Concurrent() {
objectPath := util.GetObjectPath(dt.bucket.Name(), dt.object.Name)
dt.jm.jobs[objectPath] = dt.job
dt.jm.mu.Unlock()
jobs := [5]*Job{}
jobs := make([]*Job, 5)
var jobsMu sync.Mutex
wg := sync.WaitGroup{}
getFunc := func(i int) {
defer wg.Done()
job := dt.jm.GetJob(dt.object.Name, dt.bucket.Name())
jobsMu.Lock()
jobs[i] = job
jobsMu.Unlock()
}

// make concurrent requests
Expand All @@ -215,19 +235,19 @@ func (dt *downloaderTest) Test_GetJob_Concurrent() {
dt.verifyJob(dt.job, &dt.object, dt.bucket, dt.jm.sequentialReadSizeMb)
// Verify all jobs
for i := range 5 {
ExpectEq(dt.job, jobs[i])
assert.Equal(dt.T(), dt.job, jobs[i])
}
}

func (dt *downloaderTest) Test_InvalidateAndRemoveJob_NotExisting() {
expectedJob := dt.jm.GetJob(dt.object.Name, dt.bucket.Name())
AssertEq(nil, expectedJob)
assert.Nil(dt.T(), expectedJob)

dt.jm.InvalidateAndRemoveJob(dt.object.Name, dt.bucket.Name())

// Verify that job is invalidated and removed from job manager.
expectedJob = dt.jm.GetJob(dt.object.Name, dt.bucket.Name())
AssertEq(nil, expectedJob)
assert.Nil(dt.T(), expectedJob)
}

func (dt *downloaderTest) Test_InvalidateAndRemoveJob_Existing() {
Expand All @@ -238,15 +258,15 @@ func (dt *downloaderTest) Test_InvalidateAndRemoveJob_Existing() {
dt.jm.mu.Unlock()
// Start the job
_, err := expectedJob.Download(context.Background(), 0, false)
AssertEq(nil, err)
assert.Nil(dt.T(), err)

// InvalidateAndRemove the job
dt.jm.InvalidateAndRemoveJob(dt.object.Name, dt.bucket.Name())

// Verify no job existing
AssertEq(Invalid, expectedJob.GetStatus().Name)
assert.Equal(dt.T(), Invalid, expectedJob.GetStatus().Name)
expectedJob = dt.jm.GetJob(dt.object.Name, dt.bucket.Name())
AssertEq(nil, expectedJob)
assert.Nil(dt.T(), expectedJob)
}

func (dt *downloaderTest) Test_InvalidateAndRemoveJob_Concurrent() {
Expand All @@ -257,7 +277,7 @@ func (dt *downloaderTest) Test_InvalidateAndRemoveJob_Concurrent() {
dt.jm.mu.Unlock()
// Start the job
_, err := expectedJob.Download(context.Background(), 0, false)
AssertEq(nil, err)
assert.Nil(dt.T(), err)
wg := sync.WaitGroup{}

// Make concurrent requests
Expand All @@ -272,9 +292,9 @@ func (dt *downloaderTest) Test_InvalidateAndRemoveJob_Concurrent() {
wg.Wait()

// Verify job in invalidated and removed from job manager.
AssertEq(Invalid, expectedJob.GetStatus().Name)
assert.Equal(dt.T(), Invalid, expectedJob.GetStatus().Name)
expectedJob = dt.jm.GetJob(dt.object.Name, dt.bucket.Name())
AssertEq(nil, expectedJob)
assert.Nil(dt.T(), expectedJob)
}

func (dt *downloaderTest) Test_Destroy() {
Expand All @@ -291,32 +311,32 @@ func (dt *downloaderTest) Test_Destroy() {
job2 := dt.jm.CreateJobIfNotExists(&object2, dt.bucket)
// Start the job
_, err := job2.Download(context.Background(), 2, false)
AssertEq(nil, err)
assert.Nil(dt.T(), err)
objectName3 := "path/in/gcs/foo3.txt"
dt.initJobTest(objectName3, objectContent, DefaultSequentialReadSizeMb, uint64(objectSize), func() {})
object3 := dt.object
job3 := dt.jm.CreateJobIfNotExists(&object3, dt.bucket)
// Start the job
_, err = job3.Download(context.Background(), 2, false)
AssertEq(nil, err)
assert.Nil(dt.T(), err)

dt.jm.Destroy()

// Verify all jobs are invalidated
AssertEq(Invalid, job1.GetStatus().Name)
AssertEq(Invalid, job2.GetStatus().Name)
AssertEq(Invalid, job3.GetStatus().Name)
assert.Equal(dt.T(), Invalid, job1.GetStatus().Name)
assert.Equal(dt.T(), Invalid, job2.GetStatus().Name)
assert.Equal(dt.T(), Invalid, job3.GetStatus().Name)
// Verify all jobs are removed
AssertEq(nil, dt.jm.GetJob(objectName1, dt.bucket.Name()))
AssertEq(nil, dt.jm.GetJob(objectName2, dt.bucket.Name()))
AssertEq(nil, dt.jm.GetJob(objectName3, dt.bucket.Name()))
assert.Nil(dt.T(), dt.jm.GetJob(objectName1, dt.bucket.Name()))
assert.Nil(dt.T(), dt.jm.GetJob(objectName2, dt.bucket.Name()))
assert.Nil(dt.T(), dt.jm.GetJob(objectName3, dt.bucket.Name()))
}

func (dt *downloaderTest) Test_CreateJobIfNotExists_InvalidateAndRemoveJob_Concurrent() {
wg := sync.WaitGroup{}
createNewJob := func() {
job := dt.jm.CreateJobIfNotExists(&dt.object, dt.bucket)
AssertNe(nil, job)
assert.NotNil(dt.T(), job)
wg.Done()
}
invalidateJob := func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/cache/file/downloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (dt *downloaderTest) getFileInfo() lru.ValueType {
}

func (dt *downloaderTest) fileCachePath(bucketName string, objectName string) string {
return path.Join(cacheDir, bucketName, objectName)
return path.Join(dt.cacheDir, bucketName, objectName)
}

func (dt *downloaderTest) Test_init() {
Expand Down
Loading