22
22
23
23
'''Provides functions for build artifacts'''
24
24
25
- import copy
25
+ from __future__ import annotations
26
+
26
27
import datetime
27
28
import hashlib
28
29
import json
34
35
import stat
35
36
import zipfile
36
37
from pathlib import Path
38
+ from typing import TYPE_CHECKING
39
+ from typing import TypedDict
37
40
38
41
import requests
39
42
52
55
torf = None
53
56
54
57
55
- def _is_http_url (string ):
58
+ if TYPE_CHECKING :
59
+ from typing import Any
60
+ from typing import Iterable
61
+ from typing import Mapping
62
+
63
+ StrPathLike = str | os .PathLike [str ]
64
+
65
+
66
+ class Artifact (TypedDict ):
67
+ revision : str
68
+ uri : str
69
+
70
+
71
+ def _is_http_url (string : str ) -> bool :
56
72
return re .match (r'^http[s]?:\/\/.*$' , string ) is not None
57
73
58
74
59
- def list_artifacts (artifact_pattern : str , format_arguments , api_context ):
75
+ def list_artifacts (
76
+ artifact_pattern : str ,
77
+ format_arguments : Mapping [str , Any ],
78
+ api_context_ : nimp .utils .git .GitApiContext | None ,
79
+ ) -> list [Artifact ]:
60
80
'''List all artifacts and their revision using the provided pattern after formatting'''
61
81
62
- format_arguments = copy .deepcopy (format_arguments )
63
- format_arguments ['revision' ] = '{revision}'
64
- artifact_pattern = artifact_pattern .format (** format_arguments )
82
+ artifact_pattern = artifact_pattern .format_map (
83
+ {
84
+ ** format_arguments ,
85
+ 'revision' : '{revision}' ,
86
+ }
87
+ )
65
88
66
89
if not _is_http_url (artifact_pattern ):
67
90
artifact_source = nimp .system .sanitize_path (os .path .dirname (artifact_pattern ))
@@ -74,27 +97,25 @@ def list_artifacts(artifact_pattern: str, format_arguments, api_context):
74
97
)
75
98
76
99
all_files = _list_files (artifact_source , False )
77
- all_artifacts = []
100
+ all_artifacts : list [ Artifact ] = []
78
101
for file_uri in all_files :
79
102
file_name = os .path .basename (file_uri .rstrip ('/' ))
80
103
artifact_match = artifact_regex .match (file_name )
81
- if artifact_match :
82
- group_revision = artifact_match .group ('revision' )
83
- sortable_revision = copy .deepcopy (group_revision )
84
- if api_context :
85
- sortable_revision = nimp .utils .git .get_gitea_commit_timestamp (api_context , group_revision )
86
- if sortable_revision is not None :
87
- artifact = {
88
- 'revision' : group_revision ,
89
- 'sortable_revision' : sortable_revision ,
90
- 'uri' : file_uri ,
91
- }
92
- all_artifacts .append (artifact )
104
+ if not artifact_match :
105
+ continue
106
+
107
+ group_revision = artifact_match .group ('revision' )
108
+ all_artifacts .append (
109
+ {
110
+ 'revision' : group_revision ,
111
+ 'uri' : file_uri ,
112
+ }
113
+ )
93
114
return all_artifacts
94
115
95
116
96
- def _list_files (source : str , recursive ) :
97
- all_files = []
117
+ def _list_files (source : str , recursive : bool ) -> list [ str ] :
118
+ all_files : list [ str ] = []
98
119
99
120
if _is_http_url (source ):
100
121
if not source .endswith ('/' ):
@@ -128,7 +149,7 @@ def _list_files(source: str, recursive):
128
149
return all_files
129
150
130
151
131
- def download_artifact (workspace_directory , artifact_uri ) :
152
+ def download_artifact (workspace_directory : str , artifact_uri : str ) -> str :
132
153
'''Download an artifact to the workspace'''
133
154
134
155
download_directory = os .path .join (workspace_directory , '.nimp' , 'downloads' )
@@ -158,7 +179,7 @@ def download_artifact(workspace_directory, artifact_uri):
158
179
return local_artifact_path
159
180
160
181
161
- def _download_file (file_uri , output_path ) :
182
+ def _download_file (file_uri : str , output_path : StrPathLike ) -> None :
162
183
if os .path .exists (output_path ):
163
184
os .remove (output_path )
164
185
output_directory = os .path .dirname (output_path )
@@ -174,7 +195,7 @@ def _download_file(file_uri, output_path):
174
195
shutil .copyfile (file_uri , output_path )
175
196
176
197
177
- def _extract_archive (archive_path , output_path ) :
198
+ def _extract_archive (archive_path : StrPathLike , output_path : StrPathLike ) -> None :
178
199
if os .path .exists (output_path ):
179
200
shutil .rmtree (output_path )
180
201
@@ -192,7 +213,7 @@ def _extract_archive(archive_path, output_path):
192
213
os .remove (inner_archive_path )
193
214
194
215
195
- def install_artifact (artifact_path , destination_directory ) :
216
+ def install_artifact (artifact_path : str , destination_directory : StrPathLike ) -> None :
196
217
'''Install an artifact in the workspace'''
197
218
198
219
if not os .path .exists (artifact_path ):
@@ -213,7 +234,7 @@ def install_artifact(artifact_path, destination_directory):
213
234
_try_make_executable (destination )
214
235
215
236
216
- def _try_make_executable (file_path ) :
237
+ def _try_make_executable (file_path : str ) -> None :
217
238
if platform .system () == 'Windows' :
218
239
return
219
240
@@ -231,14 +252,20 @@ def _try_make_executable(file_path):
231
252
logging .warning ('Failed to make file executable: %s (FilePath: %s)' , exception , file_path )
232
253
233
254
234
- def _try_rename (src , dst , max_attempts = 5 , retry_delay = 2 ) :
235
- def _rename ():
255
+ def _try_rename (src : StrPathLike , dst : StrPathLike , max_attempts : int = 5 , retry_delay : int = 2 ) -> None :
256
+ def _rename () -> None :
236
257
os .rename (src , dst )
237
258
238
259
nimp .system .try_execute (_rename , OSError , attempt_maximum = max_attempts , retry_delay = retry_delay )
239
260
240
261
241
- def create_artifact (artifact_path , file_collection , archive , compress , dry_run ):
262
+ def create_artifact (
263
+ artifact_path : str ,
264
+ file_collection : Iterable [tuple [StrPathLike , StrPathLike ]],
265
+ archive : bool ,
266
+ compress : bool ,
267
+ dry_run : bool ,
268
+ ) -> None :
242
269
'''Create an artifact'''
243
270
244
271
if os .path .isfile (artifact_path + '.zip' ) or os .path .isdir (artifact_path ):
@@ -292,7 +319,7 @@ def create_artifact(artifact_path, file_collection, archive, compress, dry_run):
292
319
shutil .move (artifact_path_tmp , artifact_path )
293
320
294
321
295
- def create_torrent (artifact_path , announce , dry_run ) :
322
+ def create_torrent (artifact_path : StrPathLike , announce : str | None , dry_run : bool ) -> None :
296
323
'''Create a torrent for an existing artifact'''
297
324
298
325
if torf is None :
@@ -329,7 +356,7 @@ def create_torrent(artifact_path, announce, dry_run):
329
356
tmp_torrent_path .rename (torrent_path )
330
357
331
358
332
- def create_hash (artifact_path , hash_method , dry_run ) :
359
+ def create_hash (artifact_path : str , hash_method , dry_run : bool ) -> None :
333
360
artifact_full_path = _find_artifact (artifact_path )
334
361
if not artifact_full_path :
335
362
raise FileNotFoundError (f'Artifact not found: { artifact_path } ' )
@@ -343,26 +370,23 @@ def create_hash(artifact_path, hash_method, dry_run):
343
370
json .dump ({hash_method : file_hash }, fh )
344
371
345
372
346
- def get_file_hash (file_path , hash_method ):
373
+ def get_file_hash (file_path : StrPathLike , hash_method : str ):
347
374
'''helper function to parse potentially big files'''
348
- assert getattr (hashlib , hash_method )()
349
- hash_lib = getattr (hashlib , hash_method )()
375
+ hasher = hashlib .new (hash_method )
350
376
351
377
_BLOCK_SIZE = 65536
352
378
353
379
with open (file_path , 'rb' ) as fh :
354
- file_buffer = fh .read (_BLOCK_SIZE )
355
- while len (file_buffer ) > 0 :
356
- hash_lib .update (file_buffer )
357
- file_buffer = fh .read (_BLOCK_SIZE )
358
- file_hash = hash_lib .hexdigest ()
380
+ while buffer := fh .read (_BLOCK_SIZE ):
381
+ hasher .update (buffer )
382
+ file_hash = hasher .hexdigest ()
359
383
360
- logging .debug (f" { file_path } { hash_method } : { file_hash } " )
384
+ logging .debug ("%s %s: %s" , file_path , hash_method , file_hash )
361
385
return file_hash
362
386
363
387
364
388
# TODO (l.cahour): this is workaround the fact we don't use artifact objects containing the info we need
365
- def _find_artifact (artifact_path ) :
389
+ def _find_artifact (artifact_path : str ) -> str | None :
366
390
if os .path .isfile (artifact_path + '.zip' ):
367
391
return artifact_path + '.zip'
368
392
elif os .path .isdir (artifact_path ):
@@ -373,8 +397,8 @@ def _find_artifact(artifact_path):
373
397
374
398
# TODO (l.cahour): this is a naive first attempt at using a wrapper class to clean how we handle artifacts saving
375
399
# Try to make this better and use it everywhere else in the future
376
- class TempArtifact ( object ) :
377
- def __init__ (self , file_path , mode , force = False ):
400
+ class TempArtifact :
401
+ def __init__ (self , file_path : StrPathLike , mode , force : bool = False ) -> None :
378
402
self .name = file_path
379
403
self .temp = f'{ file_path } .tmp'
380
404
self .mode = mode
0 commit comments