8888)
8989
9090const (
91- cacheForgetDelay = 1 * time .Hour
91+ cacheForgetDelay = 1 * time .Hour
92+ volInitCacheForgetDelay = 6 * time .Hour
9293)
9394
9495// AWS provisioning limits.
@@ -106,6 +107,12 @@ const (
106107 MaxTagValueLength = 256
107108)
108109
110+ // VolumeStatusInitializingState is const reported by EC2 DescribeVolumeStatus which AWS SDK does not have type for.
111+ const (
112+ VolumeStatusInitializingState = "initializing"
113+ VolumeStatusInitializedState = "completed"
114+ )
115+
109116// Defaults.
110117const (
111118 // DefaultVolumeSize represents the default volume size.
@@ -135,7 +142,13 @@ const (
135142 snapshotTagBatcher
136143
137144 batchDescribeTimeout = 30 * time .Second
138- batchMaxDelay = 500 * time .Millisecond // Minimizes RPC latency and EC2 API calls. Tuned via scalability tests.
145+
146+ // Minimizes RPC latency and EC2 API calls. Tuned via scalability tests.
147+ batchMaxDelay = 500 * time .Millisecond
148+
149+ // Tuned for EC2 DescribeVolumeStatus -- as of July 2025 it takes up to 5 min for initialization info to be updated.
150+ slowVolumeStatusBatchMaxDelay = 2 * time .Minute
151+ fastVolumeStatusBatchMaxDelay = 500 * time .Millisecond
139152)
140153
141154var (
@@ -315,17 +328,20 @@ type batcherManager struct {
315328 snapshotIDBatcher * batcher.Batcher [string , * types.Snapshot ]
316329 snapshotTagBatcher * batcher.Batcher [string , * types.Snapshot ]
317330 volumeModificationIDBatcher * batcher.Batcher [string , * types.VolumeModification ]
331+ volumeStatusIDBatcherSlow * batcher.Batcher [string , * types.VolumeStatusItem ]
332+ volumeStatusIDBatcherFast * batcher.Batcher [string , * types.VolumeStatusItem ]
318333}
319334
320335type cloud struct {
321- region string
322- ec2 EC2API
323- dm dm.DeviceManager
324- bm * batcherManager
325- rm * retryManager
326- vwp volumeWaitParameters
327- likelyBadDeviceNames expiringcache.ExpiringCache [string , sync.Map ]
328- latestClientTokens expiringcache.ExpiringCache [string , int ]
336+ region string
337+ ec2 EC2API
338+ dm dm.DeviceManager
339+ bm * batcherManager
340+ rm * retryManager
341+ vwp volumeWaitParameters
342+ likelyBadDeviceNames expiringcache.ExpiringCache [string , sync.Map ]
343+ latestClientTokens expiringcache.ExpiringCache [string , int ]
344+ volumeInitializations expiringcache.ExpiringCache [string , volumeInitialization ]
329345}
330346
331347var _ Cloud = & cloud {}
@@ -375,14 +391,15 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
375391 }
376392
377393 return & cloud {
378- region : region ,
379- dm : dm .NewDeviceManager (),
380- ec2 : svc ,
381- bm : bm ,
382- rm : newRetryManager (),
383- vwp : vwp ,
384- likelyBadDeviceNames : expiringcache.New [string , sync.Map ](cacheForgetDelay ),
385- latestClientTokens : expiringcache.New [string , int ](cacheForgetDelay ),
394+ region : region ,
395+ dm : dm .NewDeviceManager (),
396+ ec2 : svc ,
397+ bm : bm ,
398+ rm : newRetryManager (),
399+ vwp : vwp ,
400+ likelyBadDeviceNames : expiringcache.New [string , sync.Map ](cacheForgetDelay ),
401+ latestClientTokens : expiringcache.New [string , int ](cacheForgetDelay ),
402+ volumeInitializations : expiringcache.New [string , volumeInitialization ](volInitCacheForgetDelay ),
386403 }
387404}
388405
@@ -409,6 +426,12 @@ func newBatcherManager(svc EC2API) *batcherManager {
409426 volumeModificationIDBatcher : batcher .New (500 , batchMaxDelay , func (names []string ) (map [string ]* types.VolumeModification , error ) {
410427 return execBatchDescribeVolumesModifications (svc , names )
411428 }),
429+ volumeStatusIDBatcherSlow : batcher .New (1000 , slowVolumeStatusBatchMaxDelay , func (ids []string ) (map [string ]* types.VolumeStatusItem , error ) {
430+ return execBatchDescribeVolumeStatus (svc , ids )
431+ }),
432+ volumeStatusIDBatcherFast : batcher .New (1000 , fastVolumeStatusBatchMaxDelay , func (ids []string ) (map [string ]* types.VolumeStatusItem , error ) {
433+ return execBatchDescribeVolumeStatus (svc , ids )
434+ }),
412435 }
413436}
414437
@@ -1015,6 +1038,142 @@ func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
10151038 return nil
10161039}
10171040
1041+ type volumeInitialization struct {
1042+ initialized bool
1043+ estimatedInitializationTime time.Time
1044+ }
1045+
1046+ // IsVolumeInitialized calls EC2 DescribeVolumeStatus and returns whether the volume is initialized.
1047+ func (c * cloud ) IsVolumeInitialized (ctx context.Context , volumeID string ) (bool , error ) {
1048+ var volumeStatusItem * types.VolumeStatusItem
1049+ var err error
1050+
1051+ // Because volumes can take hours to initialize, we shouldn't poll DescribeVolumeStatus (DVS) as aggressively as we
1052+ // do for other EC2 APIs.
1053+ //
1054+ // We use a volumeInitializations cache to keep track of initializing volumes that we should poll at a slower rate.
1055+ //
1056+ // Furthermore, if initializationRate was set during volume creation, DVS returns an estimated initialization time.
1057+ // We cache that estimate and defer polling of DVS until we reach that time.
1058+ // We clamp to a minimum of 1 min because as of July 2025 it can take up to 5 min for volume initialization info to update.
1059+
1060+ // Check volumeInitializations cache to potentially delay EC2 DescribeVolumeStatus call
1061+ volInit , ok := c .volumeInitializations .Get (volumeID )
1062+ switch {
1063+ // Case 1: We've never called DVS for volume. Call DVS ASAP.
1064+ case ! ok :
1065+ volumeStatusItem , err = c .describeVolumeStatus (volumeID , true /* callASAP */ )
1066+ // Case 2: We already know volume is initialized. Don't call DVS.
1067+ case volInit .initialized :
1068+ return true , nil
1069+ // Case 3: We know volume is initializing, but there is no SLA. Call DVS eventually during next slow batch.
1070+ case volInit .estimatedInitializationTime .IsZero ():
1071+ volumeStatusItem , err = c .describeVolumeStatus (volumeID , false /* callASAP */ )
1072+ // Case 4: We have an estimated time for initialization. Wait to call DVS again until then unless RPC ctx is done.
1073+ case ! volInit .initialized :
1074+ util .WaitUntilTimeOrContext (ctx , volInit .estimatedInitializationTime )
1075+ if err := ctx .Err (); err != nil {
1076+ return false , err
1077+ }
1078+ volumeStatusItem , err = c .describeVolumeStatus (volumeID , true /* callASAP */ )
1079+ }
1080+ if err != nil {
1081+ return false , err
1082+ }
1083+
1084+ // Parse volume status
1085+ if volumeStatusItem == nil || volumeStatusItem .VolumeStatus == nil || volumeStatusItem .VolumeStatus .Details == nil {
1086+ return false , errors .New ("IsVolumeInitialized: EC2 DescribeVolumeStatus response missing volume status details" )
1087+ }
1088+ isVolInitializing := isVolumeStatusInitializing (* volumeStatusItem )
1089+
1090+ // Update cache
1091+ var newExpectedInitTime time.Time
1092+ if isVolInitializing && volumeStatusItem .InitializationStatusDetails != nil && volumeStatusItem .InitializationStatusDetails .EstimatedTimeToCompleteInSeconds != nil {
1093+ secondsLeft := * volumeStatusItem .InitializationStatusDetails .EstimatedTimeToCompleteInSeconds
1094+ klog .V (4 ).InfoS ("IsVolumeInitialized: volume still initializing according to EC2 DescribeVolumeStatus" , "volumeID" , volumeID , "estimatedTimeToCompleteInSeconds" , secondsLeft )
1095+ // Clamp to a minimum of 1 min because as of July 2025 it can take up to 5 min for volume initialization info to update.
1096+ if secondsLeft < 60 {
1097+ secondsLeft = 60
1098+ }
1099+ newExpectedInitTime = time .Now ().Add (time .Duration (secondsLeft ) * time .Second )
1100+ }
1101+ c .volumeInitializations .Set (volumeID , & volumeInitialization {initialized : ! isVolInitializing , estimatedInitializationTime : newExpectedInitTime })
1102+
1103+ if isVolInitializing {
1104+ klog .V (4 ).InfoS ("IsVolumeInitialized: volume not initialized yet" , "volumeID" , volumeID )
1105+ } else {
1106+ klog .V (4 ).InfoS ("IsVolumeInitialized: volume is initialized" , "volumeID" , volumeID )
1107+ }
1108+
1109+ return ! isVolInitializing , nil
1110+ }
1111+
1112+ func isVolumeStatusInitializing (vsi types.VolumeStatusItem ) bool {
1113+ for _ , detail := range vsi .VolumeStatus .Details {
1114+ if detail .Name == types .VolumeStatusNameInitializationState && detail .Status != nil && * detail .Status == VolumeStatusInitializingState {
1115+ return true
1116+ }
1117+ }
1118+ return false
1119+ }
1120+
1121+ func execBatchDescribeVolumeStatus (svc EC2API , input []string ) (map [string ]* types.VolumeStatusItem , error ) {
1122+ klog .V (7 ).InfoS ("execBatchDescribeVolumeStatus" , "volumeIds" , input )
1123+ request := & ec2.DescribeVolumeStatusInput {
1124+ VolumeIds : input ,
1125+ }
1126+
1127+ ctx , cancel := context .WithTimeout (context .Background (), batchDescribeTimeout )
1128+ defer cancel ()
1129+
1130+ var volumeStatusItems []types.VolumeStatusItem
1131+ var nextToken * string
1132+ for {
1133+ response , err := svc .DescribeVolumeStatus (ctx , request )
1134+ if err != nil {
1135+ return nil , err
1136+ }
1137+ volumeStatusItems = append (volumeStatusItems , response .VolumeStatuses ... )
1138+ nextToken = response .NextToken
1139+ if aws .ToString (nextToken ) == "" {
1140+ break
1141+ }
1142+ request .NextToken = nextToken
1143+ }
1144+
1145+ result := make (map [string ]* types.VolumeStatusItem )
1146+
1147+ for _ , m := range volumeStatusItems {
1148+ volumeStatus := m
1149+ result [* volumeStatus .VolumeId ] = & volumeStatus
1150+ }
1151+
1152+ klog .V (7 ).InfoS ("execBatchDescribeVolumeStatus: success" , "result" , result )
1153+ return result , nil
1154+ }
1155+
1156+ // describeVolumeStatus will return the VolumeStatusItem associated with volumeID from EC2 DescribeVolumeStatus
1157+ // Set callASAP to true if you need status within seconds (Otherwise it may take minutes).
1158+ func (c * cloud ) describeVolumeStatus (volumeID string , callASAP bool ) (* types.VolumeStatusItem , error ) {
1159+ ch := make (chan batcher.BatchResult [* types.VolumeStatusItem ])
1160+
1161+ var b * batcher.Batcher [string , * types.VolumeStatusItem ]
1162+ if callASAP {
1163+ b = c .bm .volumeStatusIDBatcherFast
1164+ } else {
1165+ b = c .bm .volumeStatusIDBatcherSlow
1166+ }
1167+ b .AddTask (volumeID , ch )
1168+
1169+ r := <- ch
1170+
1171+ if r .Err != nil {
1172+ return nil , r .Err
1173+ }
1174+ return r .Result , nil
1175+ }
1176+
10181177// WaitForAttachmentState polls until the attachment status is the expected value.
10191178func (c * cloud ) WaitForAttachmentState (ctx context.Context , expectedState types.VolumeAttachmentState , volumeID string , expectedInstance string , expectedDevice string , alreadyAssigned bool ) (* types.VolumeAttachment , error ) {
10201179 var attachment * types.VolumeAttachment
0 commit comments