-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzipfile.py
More file actions
122 lines (104 loc) · 4.47 KB
/
zipfile.py
File metadata and controls
122 lines (104 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Utility functions for handling and fetching repo archives in zip format."""
from __future__ import annotations
import os
import tempfile
from pathlib import Path
from zipfile import BadZipFile, ZipFile
import requests
from cookiecutter.exceptions import InvalidZipRepository
from cookiecutter.prompt import prompt_and_delete, read_repo_password
from cookiecutter.utils import make_sure_path_exists
def unzip(
zip_uri: str,
is_url: bool,
clone_to_dir: Path | str = ".",
no_input: bool = False,
password: str | None = None,
) -> str:
"""Download and unpack a zipfile at a given URI.
This will download the zipfile to the cookiecutter repository,
and unpack into a temporary directory.
:param zip_uri: The URI for the zipfile.
:param is_url: Is the zip URI a URL or a file?
:param clone_to_dir: The cookiecutter repository directory
to put the archive into.
:param no_input: Do not prompt for user input and eventually force a refresh of
cached resources.
:param password: The password to use when unpacking the repository.
"""
# Ensure that clone_to_dir exists
clone_to_dir = Path(clone_to_dir).expanduser()
make_sure_path_exists(clone_to_dir)
if is_url:
# Build the name of the cached zipfile,
# and prompt to delete if it already exists.
identifier = zip_uri.rsplit('/', 1)[1]
zip_path = os.path.join(clone_to_dir, identifier)
if os.path.exists(zip_path):
download = prompt_and_delete(zip_path, no_input=no_input)
else:
download = True
if download:
# (Re) download the zipfile
r = requests.get(zip_uri, stream=True, timeout=100)
with open(zip_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
else:
# Just use the local zipfile as-is.
zip_path = os.path.abspath(zip_uri)
# Now unpack the repository. The zipfile will be unpacked
# into a temporary directory
try:
zip_file = ZipFile(zip_path)
if len(zip_file.namelist()) == 0:
raise InvalidZipRepository(f'Zip repository {zip_uri} is empty')
# The first record in the zipfile should be the directory entry for
# the archive. If it isn't a directory, there's a problem.
first_filename = zip_file.namelist()[0]
if not first_filename.endswith('/'):
raise InvalidZipRepository(
f"Zip repository {zip_uri} does not include a top-level directory"
)
# Construct the final target directory
project_name = first_filename[:-1]
unzip_base = tempfile.mkdtemp()
unzip_path = os.path.join(unzip_base, project_name)
# Extract the zip file into the temporary directory
try:
zip_file.extractall(path=unzip_base)
except RuntimeError as runtime_err:
# File is password protected; try to get a password from the
# environment; if that doesn't work, ask the user.
if password is not None:
try:
zip_file.extractall(path=unzip_base, pwd=password.encode('utf-8'))
except RuntimeError as e:
raise InvalidZipRepository(
'Invalid password provided for protected repository'
) from e
elif no_input:
raise InvalidZipRepository(
'Unable to unlock password protected repository'
) from runtime_err
else:
retry: int | None = 0
while retry is not None:
try:
password = read_repo_password('Repo password')
zip_file.extractall(
path=unzip_base, pwd=password.encode('utf-8')
)
retry = None
except RuntimeError as e: # noqa: PERF203
retry += 1 # type: ignore[operator]
if retry == 3:
raise InvalidZipRepository(
'Invalid password provided for protected repository'
) from e
except BadZipFile as e:
raise InvalidZipRepository(
f'Zip repository {zip_uri} is not a valid zip archive:'
) from e
return unzip_path