diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 49178823..851d53c7 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -22,6 +22,7 @@ import ( "fmt" "os" "strings" + "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -45,6 +46,8 @@ const ( // creation time is around 5 minutes, and update time varies depending on // target file system values PollCheckTimeout = 10 * time.Minute + // CachePollInterval specifies the interval to poll for filesystem changes to update the cache + CachePollInterval = 1 * time.Minute ) // Tags @@ -124,8 +127,10 @@ type Cloud interface { } type cloud struct { - region string - fsx FSx + region string + fsx FSx + volumeCache map[string]*FileSystem + cacheMutex sync.RWMutex } // NewCloud returns a new instance of AWS cloud @@ -141,10 +146,13 @@ func NewCloud(region string) (Cloud, error) { awsConfig.RetryMaxAttempts = 8 svc := fsx.NewFromConfig(awsConfig) - return &cloud{ - region: region, - fsx: svc, - }, nil + c := &cloud{ + region: region, + fsx: svc, + volumeCache: make(map[string]*FileSystem), + } + go c.pollFileSystems() + return c, nil } func (c *cloud) CreateFileSystem(ctx context.Context, volumeName string, fileSystemOptions *FileSystemOptions) (fs *FileSystem, err error) { @@ -262,7 +270,7 @@ func (c *cloud) CreateFileSystem(ctx context.Context, volumeName string, fileSys perUnitStorageThroughput = *output.FileSystem.LustreConfiguration.PerUnitStorageThroughput } - return &FileSystem{ + fs = &FileSystem{ FileSystemId: *output.FileSystem.FileSystemId, CapacityGiB: *output.FileSystem.StorageCapacity, DnsName: *output.FileSystem.DNSName, @@ -270,7 +278,14 @@ func (c *cloud) CreateFileSystem(ctx context.Context, volumeName string, fileSys StorageType: string(output.FileSystem.StorageType), DeploymentType: string(output.FileSystem.LustreConfiguration.DeploymentType), PerUnitStorageThroughput: perUnitStorageThroughput, - }, nil + } + + c.cacheMutex.Lock() + c.volumeCache[volumeName] = fs + c.cacheMutex.Unlock() + klog.V(4).InfoS("CreateFileSystem: added to cache", "volumeName", volumeName) + + return fs, nil } // ResizeFileSystem makes a request to the FSx API to update the storage capacity of the filesystem. @@ -313,6 +328,17 @@ func (c *cloud) DeleteFileSystem(ctx context.Context, fileSystemId string) (err } return fmt.Errorf("DeleteFileSystem failed: %v", err) } + + c.cacheMutex.Lock() + defer c.cacheMutex.Unlock() + for volName, fs := range c.volumeCache { + if fs.FileSystemId == fileSystemId { + delete(c.volumeCache, volName) + klog.V(4).InfoS("DeleteFileSystem: removed from cache", "volumeName", volName, "fileSystemId", fileSystemId) + break + } + } + return nil } @@ -448,42 +474,53 @@ func isBadRequestUpdateInProgress(err error) bool { } func (c *cloud) FindFileSystemByVolumeName(ctx context.Context, volumeName string) (*FileSystem, error) { - var nextToken *string - const maxResults = 100 + c.cacheMutex.RLock() + defer c.cacheMutex.RUnlock() - klog.V(4).InfoS("Searching for existing filesystem", "volumeName", volumeName) + if fs, ok := c.volumeCache[volumeName]; ok { + klog.V(4).InfoS("FindFileSystemByVolumeName: found in cache", "volumeName", volumeName) + return fs, nil + } + + klog.V(4).InfoS("FindFileSystemByVolumeName: not found in cache", "volumeName", volumeName) + return nil, ErrNotFound +} - // AWS FSx DescribeFileSystems API doesn't support filtering by tags, - // so we paginate through all filesystems and filter client-side +func (c *cloud) pollFileSystems() { for { - input := &fsx.DescribeFileSystemsInput{ - MaxResults: aws.Int32(maxResults), - NextToken: nextToken, - } + newCache := make(map[string]*FileSystem) + var nextToken *string + const maxResults = 1000 - output, err := c.fsx.DescribeFileSystems(ctx, input) - if err != nil { - return nil, fmt.Errorf("failed to describe filesystems: %v", err) - } + ctx := context.Background() - klog.V(5).InfoS("Checking batch of filesystems", "count", len(output.FileSystems)) + for { + input := &fsx.DescribeFileSystemsInput{ + MaxResults: aws.Int32(maxResults), + NextToken: nextToken, + } - // Search current batch - for _, fs := range output.FileSystems { - // Skip if filesystem is being deleted - if fs.Lifecycle != types.FileSystemLifecycleAvailable && - fs.Lifecycle != types.FileSystemLifecycleCreating { - continue + output, err := c.fsx.DescribeFileSystems(ctx, input) + if err != nil { + klog.ErrorS(err, "pollFileSystems: failed to describe filesystems") + break // break inner loop, sleep and retry } - // Check tags for volume name match - for _, tag := range fs.Tags { - if *tag.Key == VolumeNameTagKey && *tag.Value == volumeName { - klog.V(2).InfoS("Found existing filesystem", - "volumeName", volumeName, - "fileSystemId", *fs.FileSystemId, - "lifecycle", string(fs.Lifecycle)) + for _, fs := range output.FileSystems { + if fs.Lifecycle != types.FileSystemLifecycleAvailable && + fs.Lifecycle != types.FileSystemLifecycleCreating { + continue + } + + var volumeName string + for _, tag := range fs.Tags { + if *tag.Key == VolumeNameTagKey { + volumeName = *tag.Value + break + } + } + if volumeName != "" { mountName := "fsx" if fs.LustreConfiguration.MountName != nil { mountName = *fs.LustreConfiguration.MountName @@ -494,7 +531,7 @@ func (c *cloud) FindFileSystemByVolumeName(ctx context.Context, volumeName strin perUnitStorageThroughput = *fs.LustreConfiguration.PerUnitStorageThroughput } - return &FileSystem{ + newCache[volumeName] = &FileSystem{ FileSystemId: *fs.FileSystemId, CapacityGiB: *fs.StorageCapacity, DnsName: *fs.DNSName, @@ -502,18 +539,21 @@ func (c *cloud) FindFileSystemByVolumeName(ctx context.Context, volumeName strin StorageType: string(fs.StorageType), DeploymentType: string(fs.LustreConfiguration.DeploymentType), PerUnitStorageThroughput: perUnitStorageThroughput, - }, nil + } } } - } - // Check if more results exist - if output.NextToken == nil { - break + if output.NextToken == nil { + break + } + nextToken = output.NextToken } - nextToken = output.NextToken - } - klog.V(2).InfoS("No existing filesystem found", "volumeName", volumeName) - return nil, ErrNotFound + c.cacheMutex.Lock() + c.volumeCache = newCache + c.cacheMutex.Unlock() + klog.V(4).InfoS("pollFileSystems: cache updated", "itemCount", len(newCache)) + + time.Sleep(CachePollInterval) + } } diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index c0da8f92..f5c239b9 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -70,7 +70,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -132,7 +133,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -190,7 +192,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -249,7 +252,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -310,7 +314,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -336,7 +341,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -404,7 +410,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -430,7 +437,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -458,7 +466,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -485,7 +494,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -512,7 +522,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -535,7 +546,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -560,7 +572,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -626,7 +639,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -685,7 +699,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -744,7 +759,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -770,7 +786,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -837,7 +854,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -906,7 +924,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -975,7 +994,8 @@ func TestCreateFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } req := &FileSystemOptions{ @@ -1019,7 +1039,8 @@ func TestDeleteFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } output := &fsx.DeleteFileSystemOutput{} @@ -1039,7 +1060,8 @@ func TestDeleteFileSystem(t *testing.T) { mockCtl := gomock.NewController(t) mockFSx := mocks.NewMockFSx(mockCtl) c := &cloud{ - fsx: mockFSx, + fsx: mockFSx, + volumeCache: make(map[string]*FileSystem), } ctx := context.Background()