Skip to content

Commit 2c7a0d1

Browse files
authored
Merge pull request #702 from AndyXiangLi/node-concurrent-issue
Refactor inFlight key to add lock per volumeId
2 parents 5828d42 + 8155fe8 commit 2c7a0d1

File tree

6 files changed

+157
-153
lines changed

6 files changed

+157
-153
lines changed

pkg/driver/controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,11 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
205205
}
206206

207207
// check if a request is already in-flight because the CreateVolume API is not idempotent
208-
if ok := d.inFlight.Insert(req); !ok {
208+
if ok := d.inFlight.Insert(req.String()); !ok {
209209
msg := fmt.Sprintf("Create volume request for %s is already in progress", volName)
210210
return nil, status.Error(codes.Aborted, msg)
211211
}
212-
defer d.inFlight.Delete(req)
212+
defer d.inFlight.Delete(req.String())
213213

214214
// create a new volume
215215
zone := pickAvailabilityZone(req.GetAccessibilityRequirements())

pkg/driver/controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,8 +1534,8 @@ func TestCreateVolume(t *testing.T) {
15341534
mockCloud.EXPECT().GetDiskByName(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(stdVolSize)).Return(nil, cloud.ErrNotFound)
15351535

15361536
inFlight := internal.NewInFlight()
1537-
inFlight.Insert(req)
1538-
defer inFlight.Delete(req)
1537+
inFlight.Insert(req.String())
1538+
defer inFlight.Delete(req.String())
15391539

15401540
awsDriver := controllerService{
15411541
cloud: mockCloud,

pkg/driver/internal/inflight.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package internal
1818

1919
import (
20+
"k8s.io/klog"
2021
"sync"
2122
)
2223

@@ -29,7 +30,7 @@ type Idempotent interface {
2930
String() string
3031
}
3132

32-
// InFlight is a struct used to manage in flight requests.
33+
// InFlight is a struct used to manage in flight requests per volumeId.
3334
type InFlight struct {
3435
mux *sync.Mutex
3536
inFlight map[string]bool
@@ -43,28 +44,27 @@ func NewInFlight() *InFlight {
4344
}
4445
}
4546

46-
// Insert inserts the entry to the current list of inflight requests.
47+
// Insert inserts the entry to the current list of inflight request key is volumeId for node and req hash for controller .
4748
// Returns false when the key already exists.
48-
func (db *InFlight) Insert(entry Idempotent) bool {
49+
func (db *InFlight) Insert(key string) bool {
4950
db.mux.Lock()
5051
defer db.mux.Unlock()
5152

52-
hash := entry.String()
53-
54-
_, ok := db.inFlight[hash]
53+
_, ok := db.inFlight[key]
5554
if ok {
5655
return false
5756
}
5857

59-
db.inFlight[hash] = true
58+
db.inFlight[key] = true
6059
return true
6160
}
6261

6362
// Delete removes the entry from the inFlight entries map.
6463
// It doesn't return anything, and will do nothing if the specified key doesn't exist.
65-
func (db *InFlight) Delete(h Idempotent) {
64+
func (db *InFlight) Delete(key string) {
6665
db.mux.Lock()
6766
defer db.mux.Unlock()
6867

69-
delete(db.inFlight, h.String())
68+
delete(db.inFlight, key)
69+
klog.V(4).Infof("Node Service: volume=%q operation finished", key)
7070
}

pkg/driver/internal/inflight_test.go

Lines changed: 26 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,14 @@ package internal
1818

1919
import (
2020
"testing"
21-
22-
"github.com/container-storage-interface/spec/lib/go/csi"
23-
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util"
2421
)
2522

2623
type testRequest struct {
27-
request *csi.CreateVolumeRequest
28-
expResp bool
29-
delete bool
24+
volumeId string
25+
expResp bool
26+
delete bool
3027
}
3128

32-
var stdVolCap = []*csi.VolumeCapability{
33-
{
34-
AccessType: &csi.VolumeCapability_Mount{
35-
Mount: &csi.VolumeCapability_MountVolume{},
36-
},
37-
AccessMode: &csi.VolumeCapability_AccessMode{
38-
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
39-
},
40-
},
41-
}
42-
43-
var (
44-
stdVolSize = int64(5 * util.GiB)
45-
stdCapRange = &csi.CapacityRange{RequiredBytes: stdVolSize}
46-
stdParams = map[string]string{
47-
"key1": "value1",
48-
"key2": "value2",
49-
}
50-
)
51-
5229
func TestInFlight(t *testing.T) {
5330
testCases := []struct {
5431
name string
@@ -58,137 +35,54 @@ func TestInFlight(t *testing.T) {
5835
name: "success normal",
5936
requests: []testRequest{
6037
{
61-
request: &csi.CreateVolumeRequest{
62-
Name: "random-vol-name",
63-
CapacityRange: stdCapRange,
64-
VolumeCapabilities: stdVolCap,
65-
Parameters: stdParams,
66-
},
67-
expResp: true,
68-
},
69-
},
70-
},
71-
{
72-
name: "success adding request with different name",
73-
requests: []testRequest{
74-
{
75-
request: &csi.CreateVolumeRequest{
76-
Name: "random-vol-foobar",
77-
CapacityRange: stdCapRange,
78-
VolumeCapabilities: stdVolCap,
79-
Parameters: stdParams,
80-
},
81-
expResp: true,
82-
},
83-
{
84-
request: &csi.CreateVolumeRequest{
85-
Name: "random-vol-name-foobar",
86-
CapacityRange: stdCapRange,
87-
VolumeCapabilities: stdVolCap,
88-
Parameters: stdParams,
89-
},
90-
expResp: true,
91-
},
92-
},
93-
},
94-
{
95-
name: "success adding request with different parameters",
96-
requests: []testRequest{
97-
{
98-
request: &csi.CreateVolumeRequest{
99-
Name: "random-vol-name-foobar",
100-
CapacityRange: stdCapRange,
101-
VolumeCapabilities: stdVolCap,
102-
Parameters: map[string]string{"foo": "bar"},
103-
},
104-
expResp: true,
105-
},
106-
{
107-
request: &csi.CreateVolumeRequest{
108-
Name: "random-vol-name-foobar",
109-
CapacityRange: stdCapRange,
110-
VolumeCapabilities: stdVolCap,
111-
},
112-
expResp: true,
38+
39+
volumeId: "random-vol-name",
40+
expResp: true,
11341
},
11442
},
11543
},
11644
{
117-
name: "success adding request with different parameters",
45+
name: "success adding request with different volumeId",
11846
requests: []testRequest{
11947
{
120-
request: &csi.CreateVolumeRequest{
121-
Name: "random-vol-name-foobar",
122-
CapacityRange: stdCapRange,
123-
VolumeCapabilities: stdVolCap,
124-
Parameters: map[string]string{"foo": "bar"},
125-
},
126-
expResp: true,
48+
volumeId: "random-vol-foobar",
49+
expResp: true,
12750
},
12851
{
129-
request: &csi.CreateVolumeRequest{
130-
Name: "random-vol-name-foobar",
131-
CapacityRange: stdCapRange,
132-
VolumeCapabilities: stdVolCap,
133-
Parameters: map[string]string{"foo": "baz"},
134-
},
135-
expResp: true,
52+
volumeId: "random-vol-name-foobar",
53+
expResp: true,
13654
},
13755
},
13856
},
13957
{
140-
name: "failure adding copy of request",
58+
name: "failed adding request with same volumeId",
14159
requests: []testRequest{
14260
{
143-
request: &csi.CreateVolumeRequest{
144-
Name: "random-vol-name",
145-
CapacityRange: stdCapRange,
146-
VolumeCapabilities: stdVolCap,
147-
Parameters: stdParams,
148-
},
149-
expResp: true,
61+
volumeId: "random-vol-name-foobar",
62+
expResp: true,
15063
},
15164
{
152-
request: &csi.CreateVolumeRequest{
153-
Name: "random-vol-name",
154-
CapacityRange: stdCapRange,
155-
VolumeCapabilities: stdVolCap,
156-
Parameters: stdParams,
157-
},
158-
expResp: false,
65+
volumeId: "random-vol-name-foobar",
66+
expResp: false,
15967
},
16068
},
16169
},
70+
16271
{
16372
name: "success add, delete, add copy",
16473
requests: []testRequest{
16574
{
166-
request: &csi.CreateVolumeRequest{
167-
Name: "random-vol-name",
168-
CapacityRange: stdCapRange,
169-
VolumeCapabilities: stdVolCap,
170-
Parameters: stdParams,
171-
},
172-
expResp: true,
75+
volumeId: "random-vol-name",
76+
expResp: true,
17377
},
17478
{
175-
request: &csi.CreateVolumeRequest{
176-
Name: "random-vol-name",
177-
CapacityRange: stdCapRange,
178-
VolumeCapabilities: stdVolCap,
179-
Parameters: stdParams,
180-
},
181-
expResp: false,
182-
delete: true,
79+
volumeId: "random-vol-name",
80+
expResp: false,
81+
delete: true,
18382
},
18483
{
185-
request: &csi.CreateVolumeRequest{
186-
Name: "random-vol-name",
187-
CapacityRange: stdCapRange,
188-
VolumeCapabilities: stdVolCap,
189-
Parameters: stdParams,
190-
},
191-
expResp: true,
84+
volumeId: "random-vol-name",
85+
expResp: true,
19286
},
19387
},
19488
},
@@ -200,9 +94,9 @@ func TestInFlight(t *testing.T) {
20094
for _, r := range tc.requests {
20195
var resp bool
20296
if r.delete {
203-
db.Delete(r.request)
97+
db.Delete(r.volumeId)
20498
} else {
205-
resp = db.Insert(r.request)
99+
resp = db.Insert(r.volumeId)
206100
}
207101
if r.expResp != resp {
208102
t.Fatalf("expected insert to be %+v, got %+v", r.expResp, resp)

pkg/driver/node.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ const (
5757

5858
// defaultMaxEBSNitroVolumes is the limit of volumes for some smaller instances, like c5 and m5.
5959
defaultMaxEBSNitroVolumes = 25
60+
61+
// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID
62+
VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress"
6063
)
6164

6265
var (
@@ -141,13 +144,12 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
141144
}
142145
}
143146

144-
if ok := d.inFlight.Insert(req); !ok {
145-
msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID)
146-
return nil, status.Error(codes.Aborted, msg)
147+
if ok := d.inFlight.Insert(volumeID); !ok {
148+
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
147149
}
148150
defer func() {
149-
klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId())
150-
d.inFlight.Delete(req)
151+
klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", volumeID)
152+
d.inFlight.Delete(volumeID)
151153
}()
152154

153155
devicePath, ok := req.PublishContext[DevicePathKey]
@@ -217,6 +219,14 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
217219
return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
218220
}
219221

222+
if ok := d.inFlight.Insert(volumeID); !ok {
223+
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
224+
}
225+
defer func() {
226+
klog.V(4).Infof("NodeUnStageVolume: volume=%q operation finished", volumeID)
227+
d.inFlight.Delete(volumeID)
228+
}()
229+
220230
// Check if target directory is a mount point. GetDeviceNameFromMount
221231
// given a mnt point, finds the device from /proc/mounts
222232
// returns the device name, reference count, and error code
@@ -343,6 +353,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
343353
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
344354
}
345355

356+
if ok := d.inFlight.Insert(volumeID); !ok {
357+
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
358+
}
359+
defer func() {
360+
klog.V(4).Infof("NodePublishVolume: volume=%q operation finished", volumeID)
361+
d.inFlight.Delete(volumeID)
362+
}()
363+
346364
mountOptions := []string{"bind"}
347365
if req.GetReadonly() {
348366
mountOptions = append(mountOptions, "ro")
@@ -373,6 +391,13 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
373391
if len(target) == 0 {
374392
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
375393
}
394+
if ok := d.inFlight.Insert(volumeID); !ok {
395+
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
396+
}
397+
defer func() {
398+
klog.V(4).Infof("NodeUnPublishVolume: volume=%q operation finished", volumeID)
399+
d.inFlight.Delete(volumeID)
400+
}()
376401

377402
klog.V(5).Infof("NodeUnpublishVolume: unmounting %s", target)
378403
err := d.mounter.Unmount(target)

0 commit comments

Comments
 (0)