Skip to content

Commit 2f2ba41

Browse files
andyzhangxk8s-infra-cherrypick-robot
authored and
k8s-infra-cherrypick-robot
committed
fix: cleanup azcopy jobs after job complete
fix comments
1 parent 412e0a5 commit 2f2ba41

File tree

4 files changed

+60
-28
lines changed

4 files changed

+60
-28
lines changed

pkg/azurefile/azurefile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ func NewDriver(options *DriverOptions) *Driver {
321321
driver.volLockMap = newLockMap()
322322
driver.subnetLockMap = newLockMap()
323323
driver.volumeLocks = newVolumeLocks()
324-
driver.azcopy = &fileutil.Azcopy{}
324+
driver.azcopy = &fileutil.Azcopy{ExecCmd: &fileutil.ExecCommand{}}
325325
driver.kubeconfig = options.KubeConfig
326326
driver.endpoint = options.Endpoint
327327
driver.resolver = new(NetResolver)

pkg/azurefile/controllerserver.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"strings"
2727
"time"
2828

29-
volumehelper "sigs.k8s.io/azurefile-csi-driver/pkg/util"
29+
"sigs.k8s.io/azurefile-csi-driver/pkg/util"
3030

3131
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
3232
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas"
@@ -96,7 +96,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
9696
}
9797

9898
capacityBytes := req.GetCapacityRange().GetRequiredBytes()
99-
requestGiB := volumehelper.RoundUpGiB(capacityBytes)
99+
requestGiB := util.RoundUpGiB(capacityBytes)
100100
if requestGiB == 0 {
101101
requestGiB = defaultAzureFileQuota
102102
klog.Warningf("no quota specified, set as default value(%d GiB)", defaultAzureFileQuota)
@@ -634,7 +634,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
634634
// use uuid as vhd disk name if file share specified
635635
diskName = uuid.NewString() + vhdSuffix
636636
}
637-
diskSizeBytes := volumehelper.GiBToBytes(requestGiB)
637+
diskSizeBytes := util.GiBToBytes(requestGiB)
638638
klog.V(2).Infof("begin to create vhd file(%s) size(%d) on share(%s) on account(%s) type(%s) rg(%s) location(%s)",
639639
diskName, diskSizeBytes, validFileShareName, account, sku, resourceGroup, location)
640640
if err := createDisk(ctx, accountName, accountKey, d.getStorageEndPointSuffix(), validFileShareName, diskName, diskSizeBytes); err != nil {
@@ -906,7 +906,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
906906
klog.V(2).Infof("snapshot(%s) already exists", snapshotName)
907907
return &csi.CreateSnapshotResponse{
908908
Snapshot: &csi.Snapshot{
909-
SizeBytes: volumehelper.GiBToBytes(int64(itemSnapshotQuota)),
909+
SizeBytes: util.GiBToBytes(int64(itemSnapshotQuota)),
910910
SnapshotId: sourceVolumeID + "#" + itemSnapshot,
911911
SourceVolumeId: sourceVolumeID,
912912
CreationTime: timestamppb.New(itemSnapshotTime),
@@ -977,7 +977,7 @@ func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequ
977977

978978
createResp := &csi.CreateSnapshotResponse{
979979
Snapshot: &csi.Snapshot{
980-
SizeBytes: volumehelper.GiBToBytes(int64(itemSnapshotQuota)),
980+
SizeBytes: util.GiBToBytes(int64(itemSnapshotQuota)),
981981
SnapshotId: sourceVolumeID + "#" + itemSnapshot,
982982
SourceVolumeId: sourceVolumeID,
983983
CreationTime: timestamppb.New(itemSnapshotTime),
@@ -1108,21 +1108,21 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa
11081108
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
11091109

11101110
switch jobState {
1111-
case volumehelper.AzcopyJobError, volumehelper.AzcopyJobCompleted:
1111+
case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped:
11121112
return err
1113-
case volumehelper.AzcopyJobRunning:
1113+
case util.AzcopyJobRunning:
11141114
err = wait.PollImmediate(20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() (bool, error) {
11151115
jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
11161116
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
11171117
if err != nil {
11181118
return false, err
11191119
}
1120-
if jobState == volumehelper.AzcopyJobRunning {
1120+
if jobState == util.AzcopyJobRunning {
11211121
return false, nil
11221122
}
11231123
return true, nil
11241124
})
1125-
case volumehelper.AzcopyJobNotFound:
1125+
case util.AzcopyJobNotFound:
11261126
klog.V(2).Infof("copy fileshare %s:%s to %s:%s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName)
11271127
execAzcopyJob := func() error {
11281128
if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil {
@@ -1134,13 +1134,16 @@ func (d *Driver) copyFileShareByAzcopy(srcFileShareName, dstFileShareName, srcPa
11341134
jobState, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv)
11351135
return fmt.Errorf("azcopy job status: %s, timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent)
11361136
}
1137-
err = volumehelper.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc)
1137+
err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc)
11381138
}
11391139

11401140
if err != nil {
11411141
klog.Warningf("CopyFileShare(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstFileShareName, err)
11421142
} else {
11431143
klog.V(2).Infof("copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName)
1144+
if out, err := d.azcopy.CleanJobs(); err != nil {
1145+
klog.Warningf("clean azcopy jobs failed with error: %v, output: %s", err, string(out))
1146+
}
11441147
}
11451148
return err
11461149
}
@@ -1165,7 +1168,7 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.Controller
11651168
if capacityBytes == 0 {
11661169
return nil, status.Error(codes.InvalidArgument, "volume capacity range missing in request")
11671170
}
1168-
requestGiB := volumehelper.RoundUpGiB(capacityBytes)
1171+
requestGiB := util.RoundUpGiB(capacityBytes)
11691172
if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
11701173
return nil, status.Errorf(codes.InvalidArgument, "invalid expand volume request: %v", req)
11711174
}

pkg/util/util.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@ const (
3434
type AzcopyJobState string
3535

3636
const (
37-
AzcopyJobError AzcopyJobState = "Error"
38-
AzcopyJobNotFound AzcopyJobState = "NotFound"
39-
AzcopyJobRunning AzcopyJobState = "Running"
40-
AzcopyJobCompleted AzcopyJobState = "Completed"
37+
AzcopyJobError AzcopyJobState = "Error"
38+
AzcopyJobNotFound AzcopyJobState = "NotFound"
39+
AzcopyJobRunning AzcopyJobState = "Running"
40+
AzcopyJobCompleted AzcopyJobState = "Completed"
41+
AzcopyJobCompletedWithErrors AzcopyJobState = "CompletedWithErrors"
42+
AzcopyJobCompletedWithSkipped AzcopyJobState = "CompletedWithSkipped"
43+
AzcopyJobCompletedWithErrorsAndSkipped AzcopyJobState = "CompletedWithErrorsAndSkipped"
4144
)
4245

4346
// control the number of concurrent powershell commands running on Windows node
@@ -122,9 +125,6 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc
122125
// Start Time: Wednesday, 09-Aug-23 09:09:03 UTC
123126
// Status: Cancelled
124127
// Command: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false
125-
if ac.ExecCmd == nil {
126-
ac.ExecCmd = &ExecCommand{}
127-
}
128128
out, err := ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
129129
// if grep command returns nothing, the exec will return exit status 1 error, so filter this error
130130
if err != nil && err.Error() != "exit status 1" {
@@ -158,13 +158,8 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc
158158
return jobState, percent, nil
159159
}
160160

161-
// TestListJobs test azcopy jobs list command with authAzcopyEnv
162-
func (ac *Azcopy) TestListJobs(accountName, storageEndpointSuffix string, authAzcopyEnv []string) (string, error) {
163-
cmdStr := fmt.Sprintf("azcopy list %s", fmt.Sprintf("https://%s.file.%s", accountName, storageEndpointSuffix))
164-
if ac.ExecCmd == nil {
165-
ac.ExecCmd = &ExecCommand{}
166-
}
167-
return ac.ExecCmd.RunCommand(cmdStr, authAzcopyEnv)
161+
func (ac *Azcopy) CleanJobs() (string, error) {
162+
return ac.ExecCmd.RunCommand("azcopy jobs clean", nil)
168163
}
169164

170165
// parseAzcopyJobList parse command azcopy jobs list, get jobid and state from joblist

pkg/util/util_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,49 @@ func TestGetAzcopyJob(t *testing.T) {
161161
m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstFileshare -B 3"), []string{}).Return(test.showStr, test.showErr)
162162
}
163163

164-
azcopyFunc := &Azcopy{}
165-
azcopyFunc.ExecCmd = m
164+
azcopyFunc := &Azcopy{ExecCmd: m}
166165
jobState, percent, err := azcopyFunc.GetAzcopyJob(dstFileshare, []string{})
167166
if jobState != test.expectedJobState || percent != test.expectedPercent || !reflect.DeepEqual(err, test.expectedErr) {
168167
t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, err, test.expectedJobState, test.expectedPercent, test.expectedErr)
169168
}
170169
}
171170
}
172171

172+
func TestCleanJobs(t *testing.T) {
173+
tests := []struct {
174+
desc string
175+
execStr string
176+
execErr error
177+
expectedErr error
178+
}{
179+
{
180+
desc: "run exec get error",
181+
execStr: "",
182+
execErr: fmt.Errorf("error"),
183+
expectedErr: fmt.Errorf("error"),
184+
},
185+
{
186+
desc: "run exec succeed",
187+
execStr: "cleaned",
188+
execErr: nil,
189+
expectedErr: nil,
190+
},
191+
}
192+
for _, test := range tests {
193+
ctrl := gomock.NewController(t)
194+
defer ctrl.Finish()
195+
196+
m := NewMockEXEC(ctrl)
197+
m.EXPECT().RunCommand(gomock.Eq("azcopy jobs clean"), nil).Return(test.execStr, test.execErr)
198+
199+
azcopyFunc := &Azcopy{ExecCmd: m}
200+
_, err := azcopyFunc.CleanJobs()
201+
if !reflect.DeepEqual(err, test.expectedErr) {
202+
t.Errorf("test[%s]: unexpected err: %v, expected err: %v", test.desc, err, test.expectedErr)
203+
}
204+
}
205+
}
206+
173207
func TestParseAzcopyJobList(t *testing.T) {
174208
tests := []struct {
175209
desc string

0 commit comments

Comments
 (0)