|
1 | 1 | """ |
2 | 2 | Zip utility functions |
3 | 3 | """ |
4 | | -import os |
5 | 4 | import zipfile |
| 5 | +from pathlib import Path |
6 | 6 |
|
7 | 7 |
|
8 | 8 | def _add_to_zip(zip_file, path, ignored=None, path_in_zip=None): |
9 | | - for name in os.listdir(path): |
10 | | - if ignored and name in ignored: |
| 9 | + for picked_path in Path(path).iterdir(): |
| 10 | + if ignored and picked_path.name in ignored: |
11 | 11 | continue |
12 | | - picked_path = os.path.join(path, name) |
13 | | - picked_path_in_zip = path_in_zip + '/' + name if path_in_zip else name |
14 | | - if os.path.isfile(picked_path): |
| 12 | + picked_path_in_zip = path_in_zip + '/' + picked_path.name if path_in_zip else picked_path.name |
| 13 | + if picked_path.is_file(): |
15 | 14 | zip_file.write(picked_path, picked_path_in_zip) |
16 | | - elif os.path.isdir(picked_path): |
| 15 | + elif picked_path.is_dir(): |
17 | 16 | _add_to_zip(zip_file, picked_path, ignored, picked_path_in_zip) |
18 | 17 |
|
19 | 18 |
|
20 | 19 | def add_to_zip(path, zip_path, ignored=None, prefix=None, append=True): |
21 | | - os.makedirs(os.path.dirname(zip_path), exist_ok=True) |
22 | | - |
23 | | - if append and os.path.exists(zip_path): |
24 | | - zip_file = zipfile.ZipFile(zip_path, 'a') |
25 | | - else: |
26 | | - zip_file = zipfile.ZipFile(zip_path, 'w') |
27 | | - try: |
| 20 | + zip_path = Path(zip_path) |
| 21 | + zip_path.parent.mkdir(parents=True, exist_ok=True) |
| 22 | + mode = 'a' if append and zip_path.exists() else 'w' |
| 23 | + with zipfile.ZipFile(zip_path, mode) as zip_file: |
28 | 24 | _add_to_zip(zip_file, path, ignored, path_in_zip=prefix) |
29 | | - except Exception: |
30 | | - zip_file.close() |
31 | | - if os.path.exists(zip_path): |
32 | | - os.remove(zip_path) |
33 | | - raise |
34 | | - else: |
35 | | - zip_file.close() |
36 | 25 |
|
37 | 26 |
|
38 | 27 | def create_zip(path, zip_path, ignored=None, prefix=None): |
39 | 28 | return add_to_zip(path, zip_path, ignored, prefix, append=False) |
40 | 29 |
|
41 | 30 |
|
42 | | -def unzip(path, zip_path=None, zip_file=None): |
43 | | - if zip_file: |
44 | | - used_zip_file = zip_file |
45 | | - else: |
46 | | - # open zip file |
47 | | - if not zip_path: |
48 | | - return False, 'No zip file specified.' |
49 | | - try: |
50 | | - used_zip_file = zipfile.ZipFile(zip_path, 'r') |
51 | | - except Exception as e: |
52 | | - return False, 'Cannot open zip file (%s). Error: %s.' % (zip_path, e) |
53 | | - try: |
54 | | - # CRC test |
55 | | - zip_test = used_zip_file.testzip() |
| 31 | +def get_zip_content_size(zip_path, extensions_filter=None): |
| 32 | + size = 0 |
| 33 | + with zipfile.ZipFile(zip_path, 'r') as zip_file: |
| 34 | + zip_test = zip_file.testzip() |
| 35 | + if zip_test: |
| 36 | + raise RuntimeError(f'CRC error on zip file: "{zip_test}"') |
| 37 | + |
| 38 | + for info in zip_file.infolist(): |
| 39 | + if extensions_filter is None or Path(info.filename).suffix.lower() in extensions_filter: |
| 40 | + size += info.file_size |
| 41 | + return size |
| 42 | + |
| 43 | + |
| 44 | +def unzip(path, zip_path, extensions_filter=None): |
| 45 | + Path(path).parent.mkdir(parents=True, exist_ok=True) |
| 46 | + with zipfile.ZipFile(zip_path, 'r') as zip_file: |
| 47 | + zip_test = zip_file.testzip() |
56 | 48 | if zip_test: |
57 | | - return False, 'CRC error on zip file. Error detected on: %s' % zip_test |
58 | | - # create destination path |
59 | | - try: |
60 | | - os.makedirs(os.path.dirname(path), exist_ok=True) |
61 | | - except Exception as e: |
62 | | - return False, 'Cannot create folder "%s". Error is: %s' % (path, e) |
63 | | - # extract files |
64 | | - used_zip_file.extractall(path) |
65 | | - return True, '' |
66 | | - except AttributeError: |
67 | | - # for python 2.5 |
68 | | - retcode = os.system('unzip -d %s %s' % (path, zip_path)) |
69 | | - if retcode != 0: |
70 | | - return False, 'Unable to unzip file. The "unzip" command returned code %s.' % retcode |
71 | | - return True, '' |
72 | | - except Exception: |
73 | | - return False, 'File is not a valid zip.' |
74 | | - finally: |
75 | | - if not zip_file: |
76 | | - used_zip_file.close() |
| 49 | + raise RuntimeError(f'CRC error on zip file: "{zip_test}"') |
| 50 | + |
| 51 | + members = None |
| 52 | + if extensions_filter: |
| 53 | + memebers = [] |
| 54 | + for name in zip_file.namelist(): |
| 55 | + ext = Path(name).suffix.lower() |
| 56 | + if ext in extensions_filter: |
| 57 | + memebers.append(name) |
| 58 | + |
| 59 | + if memebers is None or memebers: |
| 60 | + zip_file.extractall(path=path, members=members) |
0 commit comments