Spaces:
Sleeping
Sleeping
import streamlit as st | |
import libtorrent as lt | |
import os | |
import time | |
import threading | |
from pathlib import Path | |
import warnings | |
# 过滤libtorrent的弃用警告 | |
warnings.filterwarnings("ignore", category=DeprecationWarning, module="libtorrent") | |
class DownloadStatus: | |
def __init__(self): | |
self.progress = 0.0 | |
self.download_rate = 0.0 | |
self.remaining_time = 0.0 | |
self.total_size = 0 | |
self.downloaded_size = 0 | |
self.status = "等待中" | |
self.file_path = None | |
def human_readable_size(size): | |
units = ['B', 'KB', 'MB', 'GB', 'TB'] | |
index = 0 | |
while size >= 1024 and index < 4: | |
size /= 1024 | |
index += 1 | |
return f"{size:.2f} {units[index]}" | |
def configure_session(): | |
"""无警告的会话配置""" | |
try: | |
# 新版本配置方式 | |
settings = lt.settings_pack() | |
settings.set_str(lt.settings_pack.listen_interfaces, '0.0.0.0:6881') | |
return lt.session(settings) | |
except: | |
# 旧版本简化配置(不指定端口) | |
return lt.session() | |
def add_torrent_safely(ses, magnet_uri, download_path): | |
"""兼容的种子添加方法""" | |
try: | |
# 新版本解析方式 | |
params = lt.parse_magnet_uri(magnet_uri) | |
params.save_path = download_path | |
if hasattr(lt.storage_mode_t, 'storage_mode_sparse'): | |
params.storage_mode = lt.storage_mode_t.storage_mode_sparse | |
return ses.add_torrent(params) | |
except: | |
# 旧版本添加方式(已过滤警告) | |
return lt.add_magnet_uri(ses, magnet_uri, { | |
'save_path': download_path, | |
'storage_mode': lt.storage_mode_t(2) | |
}) | |
def download_task(magnet_uri, download_path, status): | |
try: | |
ses = configure_session() | |
status.status = "解析磁力链接..." | |
handle = add_torrent_safely(ses, magnet_uri, download_path) | |
status.status = "获取元数据..." | |
# 元数据等待(带进度反馈) | |
metadata_progress = st.progress(0) | |
timeout = 30 | |
start = time.time() | |
while not handle.has_metadata(): | |
if time.time() - start > timeout: | |
raise TimeoutError("元数据获取超时") | |
metadata_progress.progress(min((time.time() - start)/timeout, 1.0)) | |
time.sleep(0.5) | |
metadata_progress.empty() | |
torrent_info = handle.get_torrent_info() | |
status.total_size = torrent_info.total_size() | |
status.status = "下载中..." | |
# 下载主循环 | |
progress_bar = st.progress(0) | |
speed_chart = st.line_chart(pd.DataFrame(columns=['下载速度 (KB/s)'])) | |
start_time = time.time() | |
while not handle.is_seed(): | |
s = handle.status() | |
# 更新进度 | |
status.progress = s.progress | |
progress_bar.progress(status.progress) | |
# 计算下载速度 | |
current_time = time.time() | |
status.download_rate = s.download_rate / 1000 # KB/s | |
speed_chart.add_rows({'下载速度 (KB/s)': status.download_rate}) | |
# 更新剩余时间 | |
if status.download_rate > 0: | |
status.remaining_time = (status.total_size - s.total_done) / (status.download_rate * 1000) | |
else: | |
status.remaining_time = 0 | |
time.sleep(1) | |
status.status = "下载完成" | |
status.file_path = os.path.join(download_path, handle.name()) | |
except Exception as e: | |
status.status = f"错误: {str(e)}" | |
def main(): | |
st.title("🛠️ 磁力链接下载工具") | |
if 'download_status' not in st.session_state: | |
st.session_state.download_status = DownloadStatus() | |
with st.form("magnet_form"): | |
magnet_uri = st.text_input("输入磁力链接:", placeholder="magnet:?xt=urn:...") | |
submitted = st.form_submit_button("🚀 开始下载") | |
download_path = Path("downloads") | |
download_path.mkdir(exist_ok=True) | |
if submitted and magnet_uri: | |
if not magnet_uri.startswith("magnet:?"): | |
st.error("❌ 无效的磁力链接格式") | |
return | |
st.session_state.download_status = DownloadStatus() | |
threading.Thread( | |
target=download_task, | |
args=(magnet_uri, download_path, st.session_state.download_status) | |
).start() | |
status = st.session_state.download_status | |
status_container = st.empty() | |
while status.status not in ["下载完成", "错误"] and status.status != "等待中": | |
with status_container.container(): | |
st.subheader("实时下载状态") | |
# 进度展示 | |
cols = st.columns([2, 1, 1]) | |
cols[0].metric("总体进度", f"{status.progress*100:.1f}%") | |
cols[1].metric("传输速度", f"{status.download_rate:.2f} KB/s") | |
cols[2].metric("剩余时间", | |
f"{status.remaining_time:.1f}s" if status.remaining_time > 0 else "未知") | |
# 可视化图表 | |
st.progress(status.progress) | |
time.sleep(1) | |
if status.status == "下载完成": | |
st.balloons() | |
with open(status.file_path, "rb") as f: | |
st.download_button( | |
"💾 保存文件", | |
data=f, | |
file_name=os.path.basename(status.file_path), | |
mime="application/octet-stream" | |
) | |
if __name__ == "__main__": | |
main() |