14
14
15
15
"""Use command-line tools to check for audio file corruption."""
16
16
17
+ import concurrent .futures
17
18
import errno
18
19
import os
19
20
import shlex
20
21
import sys
21
22
from subprocess import STDOUT , CalledProcessError , check_output , list2cmdline
23
+ from typing import Callable , Optional , Union
22
24
23
25
import confuse
24
26
25
- from beets import importer , ui
27
+ from beets import importer , library , ui
26
28
from beets .plugins import BeetsPlugin
27
29
from beets .ui import Subcommand
28
- from beets .util import displayable_path , par_map
30
+ from beets .util import displayable_path
29
31
30
32
31
33
class CheckerCommandError (Exception ):
@@ -45,6 +47,13 @@ def __init__(self, cmd, oserror):
45
47
self .msg = str (oserror )
46
48
47
49
50
+ # CheckResult is a tuple of 1. status code, 2. how many errors there were, and 3.
51
+ # a list of error output messages.
52
+ CheckResult = tuple [int , int , list [str ]]
53
+
54
+ CheckMethod = Callable [[str ], CheckResult ]
55
+
56
+
48
57
class BadFiles (BeetsPlugin ):
49
58
def __init__ (self ):
50
59
super ().__init__ ()
@@ -55,12 +64,12 @@ def __init__(self):
55
64
"import_task_before_choice" , self .on_import_task_before_choice
56
65
)
57
66
58
- def run_command (self , cmd ) :
67
+ def run_command (self , cmd : list [ str ]) -> CheckResult :
59
68
self ._log .debug (
60
69
"running command: {}" , displayable_path (list2cmdline (cmd ))
61
70
)
62
71
try :
63
- output = check_output (cmd , stderr = STDOUT )
72
+ output : bytes = check_output (cmd , stderr = STDOUT )
64
73
errors = 0
65
74
status = 0
66
75
except CalledProcessError as e :
@@ -69,28 +78,28 @@ def run_command(self, cmd):
69
78
status = e .returncode
70
79
except OSError as e :
71
80
raise CheckerCommandError (cmd , e )
72
- output = output .decode (sys .getdefaultencoding (), "replace" )
73
- return status , errors , [line for line in output .split ("\n " ) if line ]
81
+ output_dec = output .decode (sys .getdefaultencoding (), "replace" )
82
+ return status , errors , [line for line in output_dec .split ("\n " ) if line ]
74
83
75
- def check_mp3val (self , path ) :
84
+ def check_mp3val (self , path : str ) -> CheckResult :
76
85
status , errors , output = self .run_command (["mp3val" , path ])
77
86
if status == 0 :
78
87
output = [line for line in output if line .startswith ("WARNING:" )]
79
88
errors = len (output )
80
89
return status , errors , output
81
90
82
- def check_flac (self , path ) :
91
+ def check_flac (self , path : str ) -> CheckResult :
83
92
return self .run_command (["flac" , "-wst" , path ])
84
93
85
- def check_custom (self , command ) :
94
+ def check_custom (self , command : str ) -> Callable [[ str ], CheckResult ] :
86
95
def checker (path ):
87
96
cmd = shlex .split (command )
88
97
cmd .append (path )
89
98
return self .run_command (cmd )
90
99
91
100
return checker
92
101
93
- def get_checker (self , ext ) :
102
+ def get_checker (self , ext : str ) -> Optional [ CheckMethod ] :
94
103
ext = ext .lower ()
95
104
try :
96
105
command = self .config ["commands" ].get (dict ).get (ext )
@@ -102,8 +111,9 @@ def get_checker(self, ext):
102
111
return self .check_mp3val
103
112
if ext == "flac" :
104
113
return self .check_flac
114
+ return None
105
115
106
- def check_item (self , item ) :
116
+ def check_item (self , item : library . Item ) -> tuple [ bool , list [ str ]] :
107
117
# First, check whether the path exists. If not, the user
108
118
# should probably run `beet update` to cleanup your library.
109
119
dpath = displayable_path (item .path )
@@ -118,8 +128,8 @@ def check_item(self, item):
118
128
checker = self .get_checker (ext )
119
129
if not checker :
120
130
self ._log .error ("no checker specified in the config for {}" , ext )
121
- return []
122
- path = item .path
131
+ return False , []
132
+ path : Union [ bytes , str ] = item .path
123
133
if not isinstance (path , str ):
124
134
path = item .path .decode (sys .getfilesystemencoding ())
125
135
try :
@@ -132,11 +142,13 @@ def check_item(self, item):
132
142
)
133
143
else :
134
144
self ._log .error ("error invoking {0.checker}: {0.msg}" , e )
135
- return []
145
+ return False , []
136
146
147
+ success = True
137
148
error_lines = []
138
149
139
150
if status > 0 :
151
+ success = False
140
152
error_lines .append (
141
153
f"{ ui .colorize ('text_error' , dpath )} : checker exited with"
142
154
f" status { status } "
@@ -145,6 +157,7 @@ def check_item(self, item):
145
157
error_lines .append (f" { line } " )
146
158
147
159
elif errors > 0 :
160
+ success = False
148
161
error_lines .append (
149
162
f"{ ui .colorize ('text_warning' , dpath )} : checker found"
150
163
f" { status } errors or warnings"
@@ -154,23 +167,24 @@ def check_item(self, item):
154
167
elif self .verbose :
155
168
error_lines .append (f"{ ui .colorize ('text_success' , dpath )} : ok" )
156
169
157
- return error_lines
170
+ return success , error_lines
158
171
159
- def on_import_task_start (self , task , session ):
172
+ def on_import_task_start (self , task , session ) -> None :
160
173
if not self .config ["check_on_import" ].get (False ):
161
174
return
162
175
163
176
checks_failed = []
164
177
165
178
for item in task .items :
166
- error_lines = self .check_item (item )
167
- if error_lines :
168
- checks_failed .append (error_lines )
179
+ _ , error_lines = self .check_item (item )
180
+ checks_failed .append (error_lines )
169
181
170
182
if checks_failed :
171
183
task ._badfiles_checks_failed = checks_failed
172
184
173
- def on_import_task_before_choice (self , task , session ):
185
+ def on_import_task_before_choice (
186
+ self , task , session
187
+ ) -> Optional [importer .Action ]:
174
188
if hasattr (task , "_badfiles_checks_failed" ):
175
189
ui .print_ (
176
190
f"{ ui .colorize ('text_warning' , 'BAD' )} one or more files failed"
@@ -194,16 +208,27 @@ def on_import_task_before_choice(self, task, session):
194
208
else :
195
209
raise Exception (f"Unexpected selection: { sel } " )
196
210
197
- def command (self , lib , opts , args ):
211
+ return None
212
+
213
+ def command (self , lib , opts , args ) -> None :
198
214
# Get items from arguments
199
215
items = lib .items (args )
200
216
self .verbose = opts .verbose
201
217
202
218
def check_and_print (item ):
203
- for error_line in self .check_item (item ):
204
- ui .print_ (error_line )
205
-
206
- par_map (check_and_print , items )
219
+ success , error_lines = self .check_item (item )
220
+ if not success :
221
+ for line in error_lines :
222
+ ui .print_ (line )
223
+
224
+ with concurrent .futures .ThreadPoolExecutor () as executor :
225
+ for _ in ui .iprogress_bar (
226
+ executor .map (check_and_print , items ),
227
+ desc = "Checking" ,
228
+ unit = "item" ,
229
+ total = len (items ),
230
+ ):
231
+ pass
207
232
208
233
def commands (self ):
209
234
bad_command = Subcommand (
0 commit comments