@@ -16,7 +16,102 @@ limitations under the License.
16
16
17
17
package restic
18
18
19
+ import (
20
+ "bytes"
21
+ "context"
22
+ "fmt"
23
+ "time"
24
+
25
+ corev1 "k8s.io/api/core/v1"
26
+ "k8s.io/apimachinery/pkg/api/errors"
27
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
+ "k8s.io/apimachinery/pkg/util/wait"
29
+ "k8s.io/client-go/kubernetes"
30
+ "k8s.io/klog/v2"
31
+ kutil "kmodules.xyz/client-go"
32
+ )
33
+
19
34
func (w * ResticWrapper ) UnlockRepository () error {
20
35
_ , err := w .unlock ()
21
36
return err
22
37
}
38
+
39
+ // getLockIDs lists every lock ID currently held in the repository.
40
+ func (w * ResticWrapper ) getLockIDs () ([]string , error ) {
41
+ w .sh .ShowCMD = true
42
+ out , err := w .listLocks ()
43
+ if err != nil {
44
+ return nil , err
45
+ }
46
+ return extractLockIDs (bytes .NewReader (out ))
47
+ }
48
+
49
+ // getLockStats returns the decoded JSON for a single lock.
50
+ func (w * ResticWrapper ) getLockStats (lockID string ) (* LockStats , error ) {
51
+ w .sh .ShowCMD = true
52
+ out , err := w .lockStats (lockID )
53
+ if err != nil {
54
+ return nil , err
55
+ }
56
+ return extractLockStats (out )
57
+ }
58
+
59
+ // getPodNameIfAnyExclusiveLock scans every lock and returns the hostname aka (Pod name) of the first exclusive lock it finds, or "" if none exist.
60
+ func (w * ResticWrapper ) getPodNameIfAnyExclusiveLock () (string , error ) {
61
+ klog .Infoln ("Checking for exclusive locks in the repository..." )
62
+ ids , err := w .getLockIDs ()
63
+ if err != nil {
64
+ return "" , fmt .Errorf ("failed to list locks: %w" , err )
65
+ }
66
+ for _ , id := range ids {
67
+ st , err := w .getLockStats (id )
68
+ if err != nil {
69
+ return "" , fmt .Errorf ("failed to inspect lock %s: %w" , id , err )
70
+ }
71
+ if st .Exclusive { // There's no chances to get multiple exclusive locks, so we can return the first one we find.
72
+ return st .Hostname , nil
73
+ }
74
+ }
75
+ return "" , nil
76
+ }
77
+
78
+ // EnsureNoExclusiveLock blocks until any exclusive lock is released.
79
+ // If a lock is held by a Running Pod, it waits; otherwise it unlocks.
80
+ func (w * ResticWrapper ) EnsureNoExclusiveLock (k8sClient kubernetes.Interface , namespace string ) error {
81
+ klog .Infoln ("Ensuring no exclusive lock is held in the repository..." )
82
+ podName , err := w .getPodNameIfAnyExclusiveLock ()
83
+ if err != nil {
84
+ return fmt .Errorf ("failed to query exclusive lock: %w" , err )
85
+ }
86
+ if podName == "" {
87
+ klog .Infoln ("No exclusive lock found, nothing to do." )
88
+ return nil // nothing to do
89
+ }
90
+
91
+ return wait .PollUntilContextTimeout (
92
+ context .Background (),
93
+ 5 * time .Second ,
94
+ kutil .ReadinessTimeout ,
95
+ true ,
96
+ func (ctx context.Context ) (bool , error ) {
97
+ klog .Infoln ("Getting Pod:" , podName , "to check if it's finished..." )
98
+ pod , err := k8sClient .CoreV1 ().Pods (namespace ).Get (ctx , podName , metav1.GetOptions {})
99
+ switch {
100
+ case errors .IsNotFound (err ): // Pod gone → unlock
101
+ klog .Infoln ("Pod:" , podName , "not found, unlocking repository..." )
102
+ _ , err := w .unlock ()
103
+ return true , err
104
+ case err != nil : // API error → stop
105
+ return false , err
106
+ case pod .Status .Phase == corev1 .PodSucceeded ||
107
+ pod .Status .Phase == corev1 .PodFailed : // Pod finished → unlock
108
+ klog .Infoln ("Pod:" , podName , "finished with phase" , pod .Status .Phase , ", unlocking repository..." )
109
+ _ , err := w .unlock ()
110
+ return true , err
111
+ default : // Not finished yet → keep waiting
112
+ klog .Infoln ("Pod:" , podName , "is in phase" , pod .Status .Phase , ", waiting for it to finish..." )
113
+ return false , nil
114
+ }
115
+ },
116
+ )
117
+ }
0 commit comments