-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapp.py
More file actions
246 lines (213 loc) · 8.47 KB
/
app.py
File metadata and controls
246 lines (213 loc) · 8.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import time
import json
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
try:
import streamlit as st
except ModuleNotFoundError:
from utils.deps import install_and_import
st = install_and_import("streamlit", "streamlit")
from services.processor import process_upload
from services.remote_api import send_to_remote
from services.mailer import send_admin_mail
from utils.fs import save_uploaded_file
APP_TITLE = "CodeDriver推文投稿入口(MD/ZIP 转公众号 HTML)"
MAX_UPLOAD_MB = 200
@st.cache_resource
def get_executor() -> ThreadPoolExecutor:
return ThreadPoolExecutor(max_workers=4)
def bytes_human(n: int) -> str:
for unit in ["B", "KB", "MB", "GB"]:
if n < 1024:
return f"{n:.1f}{unit}"
n /= 1024
return f"{n:.1f}TB"
def execute_job(
original_saved_path: str,
wechat: str,
email: str,
original_filename: str,
remote_cfg: dict,
smtp_cfg: dict,
):
"""执行任务并返回结果摘要,供同步调试或异步上报使用。"""
import traceback
start_ts = time.time()
child_id = None
folder_url = None
uploaded_ok = []
errors = []
result = None
try:
user_meta = {"wechat": wechat, "email": email}
# 1) 处理与转换
result = process_upload(original_saved_path, user_meta)
# 2) 读取 meta.json 以获取 timestamp,构建 manifest
try:
meta_obj = json.loads(open(result["meta_path"], "r", encoding="utf-8").read())
timestamp = meta_obj.get("timestamp")
except Exception:
timestamp = None
manifest = {
"wechat": wechat,
"email": email,
"original_filename": original_filename,
"folder_name": result["folder_name"],
"timestamp": timestamp or "",
# duration_ms 在下方最终填充或由服务端忽略
}
# 3) 打包并提交至远端 FastAPI(zip + manifest)
try:
remote_resp = send_to_remote(result, manifest, remote_cfg)
# 记录一些反馈信息
folder_url = remote_resp.get("folder") # 服务器返回的相对目录名
uploaded_ok.extend(result.get("html_files", []))
uploaded_ok.append(result.get("meta_path"))
uploaded_ok.append(result.get("original_file_path"))
except Exception as e:
errors.append(f"上报远端失败: {e!r}")
except Exception as e:
errors.append(f"处理/建链路失败: {e!r}\n{traceback.format_exc()}")
# 3) 发送管理员通知(成功或失败)
duration = int((time.time() - start_ts) * 1000)
try:
if errors:
subject = f"[推文提交失败告警] {email} / {wechat}"
else:
subject = f"[新推文提交] {email} / {wechat}"
lines = [
f"用户邮箱: {email}",
f"微信号: {wechat}",
f"原始文件: {original_filename}",
f"耗时: {duration}ms",
f"远端存储: {folder_url or '未创建'}",
f"成功上传: {len(uploaded_ok)} 个",
f"MD 数量: {len(result['md_files']) if result else 'N/A'}",
f"HTML 数量: {len(result['html_files']) if result else 'N/A'}",
]
if uploaded_ok:
lines.append("成功文件:\n" + "\n".join(uploaded_ok))
if errors:
lines.append("错误详情:\n" + "\n".join(errors))
body = "\n".join(lines)
send_admin_mail(subject, body, smtp_cfg=smtp_cfg)
except Exception as mail_e:
# 邮件失败仅打印日志
print("[管理员邮件发送失败]", mail_e)
print(traceback.format_exc())
return {
"child_id": child_id,
"folder_url": folder_url,
"uploaded_ok": uploaded_ok,
"errors": errors,
"result": result,
"duration_ms": duration,
}
def run_job(
original_saved_path: str,
wechat: str,
email: str,
original_filename: str,
remote_cfg: dict,
smtp_cfg: dict,
) -> None:
"""在后台线程执行处理、上传与通知(异步)。"""
execute_job(
original_saved_path,
wechat,
email,
original_filename,
remote_cfg,
smtp_cfg,
)
def main():
st.set_page_config(page_title=APP_TITLE, page_icon="📝", layout="centered")
st.title(APP_TITLE)
st.write("请上传 .md 或 .zip 文件(含 Markdown 与资源),并填写微信号与邮箱。单文件不超过 200MB。")
with st.form(key="submit_form", clear_on_submit=False):
uploaded = st.file_uploader(
"上传资料(.md 或 .zip)",
type=["md", "zip"],
accept_multiple_files=False,
help="支持 Markdown 文件或 ZIP 压缩包(压缩包内可包含图片等资源)"
)
wechat = st.text_input("微信号(必填)", placeholder="用于发送预览链接")
email = st.text_input("邮箱(必填)", placeholder="用于审核后的邮件通知")
submitted = st.form_submit_button("提交")
if submitted:
# 基础校验
if not uploaded:
st.error("请先选择要上传的文件。")
return
if not wechat.strip():
st.error("请填写微信号。")
return
if not email.strip():
st.error("请填写邮箱。")
return
# 大小校验(双保险)
size_bytes: Optional[int] = getattr(uploaded, "size", None)
if size_bytes is None:
size_bytes = uploaded.getbuffer().nbytes
if size_bytes > MAX_UPLOAD_MB * 1024 * 1024:
st.error("单文件大小超过 200MB,请压缩或拆分后再上传。")
return
# 落盘保存
work_dir, saved_path = save_uploaded_file(uploaded)
# 读取调试开关:secrets.debug.sync = true 时,同步执行以在页面展示错误
debug_sync = False
try:
debug_sync = bool(st.secrets.get("debug", {}).get("sync", False))
except Exception:
debug_sync = False
remote_cfg = dict(st.secrets["remote"]) # 复制为普通 dict(base_url, token, verify_ssl)
smtp_cfg = dict(st.secrets["smtp"]) # 复制为普通 dict
if debug_sync:
with st.status("正在处理(同步调试模式)…", expanded=True) as status:
st.write("1) 开始解压/转换/上传")
summary = execute_job(
saved_path,
wechat.strip(),
email.strip(),
uploaded.name,
remote_cfg,
smtp_cfg,
)
status.update(label="处理完成", state="complete")
if summary["errors"]:
st.error("发生错误,详情如下:")
st.code("\n".join(summary["errors"]))
else:
st.success("上传成功!")
if summary["folder_url"]:
st.write(f"远端存储: {summary['folder_url']}")
if summary.get("result"):
st.caption("处理摘要:")
st.json({
"md_files": summary["result"].get("md_files"),
"html_files": summary["result"].get("html_files"),
"meta_path": summary["result"].get("meta_path"),
})
else:
# 入队后台任务(将 secrets 在主线程读取并以纯 dict 传入子线程)
executor = get_executor()
executor.submit(
run_job,
saved_path,
wechat.strip(),
email.strip(),
uploaded.name,
remote_cfg,
smtp_cfg,
)
# 立即向用户反馈
with st.status("已接收上传,正在后台处理…", expanded=True) as status:
st.write("1) 文件已上传并入队处理")
st.write("2) 解压/转换/上报至远端存储将在后台进行")
st.write("3) 管理员审核后会通过邮件通知您")
time.sleep(0.8)
status.update(label="处理任务已入队", state="complete")
st.success("感谢您的推文贡献!待审核通过将会自动生成一份预览链接并将通过邮件通知您!")
st.info("如需再次提交,可刷新页面或直接再次上传。")
if __name__ == "__main__":
main()