Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 85 additions & 45 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -45,6 +46,8 @@ const (
// creation time is around 5 minutes, and update time varies depending on
// target file system values
PollCheckTimeout = 10 * time.Minute
// CachePollInterval specifies the interval to poll for filesystem changes to update the cache
CachePollInterval = 1 * time.Minute
)

// Tags
Expand Down Expand Up @@ -124,8 +127,10 @@ type Cloud interface {
}

type cloud struct {
region string
fsx FSx
region string
fsx FSx
volumeCache map[string]*FileSystem
cacheMutex sync.RWMutex
}

// NewCloud returns a new instance of AWS cloud
Expand All @@ -141,10 +146,13 @@ func NewCloud(region string) (Cloud, error) {
awsConfig.RetryMaxAttempts = 8

svc := fsx.NewFromConfig(awsConfig)
return &cloud{
region: region,
fsx: svc,
}, nil
c := &cloud{
region: region,
fsx: svc,
volumeCache: make(map[string]*FileSystem),
}
go c.pollFileSystems()
return c, nil
}

func (c *cloud) CreateFileSystem(ctx context.Context, volumeName string, fileSystemOptions *FileSystemOptions) (fs *FileSystem, err error) {
Expand Down Expand Up @@ -262,15 +270,22 @@ func (c *cloud) CreateFileSystem(ctx context.Context, volumeName string, fileSys
perUnitStorageThroughput = *output.FileSystem.LustreConfiguration.PerUnitStorageThroughput
}

return &FileSystem{
fs = &FileSystem{
FileSystemId: *output.FileSystem.FileSystemId,
CapacityGiB: *output.FileSystem.StorageCapacity,
DnsName: *output.FileSystem.DNSName,
MountName: mountName,
StorageType: string(output.FileSystem.StorageType),
DeploymentType: string(output.FileSystem.LustreConfiguration.DeploymentType),
PerUnitStorageThroughput: perUnitStorageThroughput,
}, nil
}

c.cacheMutex.Lock()
c.volumeCache[volumeName] = fs
c.cacheMutex.Unlock()
klog.V(4).InfoS("CreateFileSystem: added to cache", "volumeName", volumeName)

return fs, nil
}

// ResizeFileSystem makes a request to the FSx API to update the storage capacity of the filesystem.
Expand Down Expand Up @@ -313,6 +328,17 @@ func (c *cloud) DeleteFileSystem(ctx context.Context, fileSystemId string) (err
}
return fmt.Errorf("DeleteFileSystem failed: %v", err)
}

c.cacheMutex.Lock()
defer c.cacheMutex.Unlock()
for volName, fs := range c.volumeCache {
if fs.FileSystemId == fileSystemId {
delete(c.volumeCache, volName)
klog.V(4).InfoS("DeleteFileSystem: removed from cache", "volumeName", volName, "fileSystemId", fileSystemId)
break
}
}

return nil
}

Expand Down Expand Up @@ -448,42 +474,53 @@ func isBadRequestUpdateInProgress(err error) bool {
}

func (c *cloud) FindFileSystemByVolumeName(ctx context.Context, volumeName string) (*FileSystem, error) {
var nextToken *string
const maxResults = 100
c.cacheMutex.RLock()
defer c.cacheMutex.RUnlock()

klog.V(4).InfoS("Searching for existing filesystem", "volumeName", volumeName)
if fs, ok := c.volumeCache[volumeName]; ok {
klog.V(4).InfoS("FindFileSystemByVolumeName: found in cache", "volumeName", volumeName)
return fs, nil
}

klog.V(4).InfoS("FindFileSystemByVolumeName: not found in cache", "volumeName", volumeName)
return nil, ErrNotFound
}

// AWS FSx DescribeFileSystems API doesn't support filtering by tags,
// so we paginate through all filesystems and filter client-side
func (c *cloud) pollFileSystems() {
for {
input := &fsx.DescribeFileSystemsInput{
MaxResults: aws.Int32(maxResults),
NextToken: nextToken,
}
newCache := make(map[string]*FileSystem)
var nextToken *string
const maxResults = 1000

output, err := c.fsx.DescribeFileSystems(ctx, input)
if err != nil {
return nil, fmt.Errorf("failed to describe filesystems: %v", err)
}
ctx := context.Background()

klog.V(5).InfoS("Checking batch of filesystems", "count", len(output.FileSystems))
for {
input := &fsx.DescribeFileSystemsInput{
MaxResults: aws.Int32(maxResults),
NextToken: nextToken,
}

// Search current batch
for _, fs := range output.FileSystems {
// Skip if filesystem is being deleted
if fs.Lifecycle != types.FileSystemLifecycleAvailable &&
fs.Lifecycle != types.FileSystemLifecycleCreating {
continue
output, err := c.fsx.DescribeFileSystems(ctx, input)
if err != nil {
klog.ErrorS(err, "pollFileSystems: failed to describe filesystems")
break // break inner loop, sleep and retry
}

// Check tags for volume name match
for _, tag := range fs.Tags {
if *tag.Key == VolumeNameTagKey && *tag.Value == volumeName {
klog.V(2).InfoS("Found existing filesystem",
"volumeName", volumeName,
"fileSystemId", *fs.FileSystemId,
"lifecycle", string(fs.Lifecycle))
for _, fs := range output.FileSystems {
if fs.Lifecycle != types.FileSystemLifecycleAvailable &&
fs.Lifecycle != types.FileSystemLifecycleCreating {
continue
}

var volumeName string
for _, tag := range fs.Tags {
if *tag.Key == VolumeNameTagKey {
volumeName = *tag.Value
break
}
}

if volumeName != "" {
mountName := "fsx"
if fs.LustreConfiguration.MountName != nil {
mountName = *fs.LustreConfiguration.MountName
Expand All @@ -494,26 +531,29 @@ func (c *cloud) FindFileSystemByVolumeName(ctx context.Context, volumeName strin
perUnitStorageThroughput = *fs.LustreConfiguration.PerUnitStorageThroughput
}

return &FileSystem{
newCache[volumeName] = &FileSystem{
FileSystemId: *fs.FileSystemId,
CapacityGiB: *fs.StorageCapacity,
DnsName: *fs.DNSName,
MountName: mountName,
StorageType: string(fs.StorageType),
DeploymentType: string(fs.LustreConfiguration.DeploymentType),
PerUnitStorageThroughput: perUnitStorageThroughput,
}, nil
}
}
}
}

// Check if more results exist
if output.NextToken == nil {
break
if output.NextToken == nil {
break
}
nextToken = output.NextToken
}
nextToken = output.NextToken
}

klog.V(2).InfoS("No existing filesystem found", "volumeName", volumeName)
return nil, ErrNotFound
c.cacheMutex.Lock()
c.volumeCache = newCache
c.cacheMutex.Unlock()
klog.V(4).InfoS("pollFileSystems: cache updated", "itemCount", len(newCache))

time.Sleep(CachePollInterval)
}
}
Loading