Spaces:
Sleeping
Sleeping
import streamlit as st | |
import libtorrent as lt | |
import os | |
import time | |
import threading | |
from pathlib import Path | |
# 下载状态类 | |
class DownloadStatus: | |
def __init__(self): | |
self.progress = 0.0 | |
self.download_rate = 0.0 | |
self.remaining_time = 0.0 | |
self.total_size = 0 | |
self.downloaded_size = 0 | |
self.status = "等待中" | |
self.file_path = None | |
def human_readable_size(size): | |
"""转换文件大小到易读格式""" | |
units = ['B', 'KB', 'MB', 'GB', 'TB'] | |
index = 0 | |
while size >= 1024 and index < 4: | |
size /= 1024 | |
index += 1 | |
return f"{size:.2f} {units[index]}" | |
def configure_session(): | |
"""兼容不同版本的会话配置""" | |
try: | |
# 新版配置方式 | |
settings = lt.settings_pack() | |
settings.set_str(lt.settings_pack.listen_interfaces, '0.0.0.0:6881') | |
ses = lt.session(settings) | |
except AttributeError: | |
# 旧版配置方式 | |
ses = lt.session() | |
ses.listen_on(6881, 6891) | |
return ses | |
def parse_magnet_compat(magnet_uri): | |
"""兼容不同版本的磁力解析方法""" | |
try: | |
return lt.parse_magnet_uri(magnet_uri) | |
except AttributeError: | |
# 旧版处理方式 | |
params = {'save_path': '.', 'storage_mode': lt.storage_mode_t(2)} | |
return (magnet_uri, params) | |
def download_task(magnet_uri, download_path, status): | |
"""后台下载任务""" | |
try: | |
ses = configure_session() | |
status.status = "解析磁力链接..." | |
# 兼容解析磁力链接 | |
try: | |
params = lt.parse_magnet_uri(magnet_uri) | |
params.save_path = download_path | |
params.storage_mode = lt.storage_mode_t.storage_mode_sparse | |
handle = ses.add_torrent(params) | |
except: | |
# 旧版处理方式 | |
handle = lt.add_magnet_uri(ses, magnet_uri, {'save_path': download_path}) | |
status.status = "获取元数据..." | |
# 等待元数据,带超时机制 | |
timeout = 30 # 30秒超时 | |
start = time.time() | |
while not handle.has_metadata(): | |
if time.time() - start > timeout: | |
raise TimeoutError("获取元数据超时") | |
time.sleep(0.5) | |
try: | |
ses.post_torrent_updates() # 新版状态更新 | |
except: | |
pass | |
torrent_info = handle.get_torrent_info() | |
status.total_size = torrent_info.total_size() | |
status.status = "下载中..." | |
# 主下载循环 | |
start_time = time.time() | |
last_downloaded = 0 | |
while not handle.is_seed(): | |
try: | |
ses.post_torrent_updates() # 新版需要状态更新 | |
except: | |
pass | |
s = handle.status() | |
# 计算下载速度 | |
now = time.time() | |
dt = now - start_time | |
status.downloaded_size = s.total_done | |
status.download_rate = (s.total_done - last_downloaded) / dt if dt > 0 else 0 | |
last_downloaded = s.total_done | |
start_time = now | |
# 计算进度和剩余时间 | |
status.progress = s.progress | |
if status.download_rate > 0: | |
status.remaining_time = (status.total_size - s.total_done) / status.download_rate | |
else: | |
status.remaining_time = 0 | |
time.sleep(1) | |
status.status = "下载完成" | |
status.file_path = os.path.join(download_path, handle.name()) | |
except Exception as e: | |
status.status = f"错误: {str(e)}" | |
def main(): | |
st.title("🕹️ 磁力链接下载器") | |
# 初始化session状态 | |
if 'download_status' not in st.session_state: | |
st.session_state.download_status = DownloadStatus() | |
# 输入区域 | |
with st.form("magnet_form"): | |
magnet_uri = st.text_input("磁力链接:", placeholder="magnet:?xt=urn:...") | |
submitted = st.form_submit_button("开始下载") | |
# 创建下载目录 | |
download_path = Path("downloads") | |
download_path.mkdir(exist_ok=True) | |
if submitted and magnet_uri: | |
if not magnet_uri.startswith("magnet:?"): | |
st.error("无效的磁力链接格式") | |
return | |
# 启动下载线程 | |
st.session_state.download_status = DownloadStatus() | |
thread = threading.Thread( | |
target=download_task, | |
args=(magnet_uri, download_path, st.session_state.download_status) | |
) | |
thread.start() | |
# 实时状态显示 | |
status = st.session_state.download_status | |
status_container = st.empty() | |
while status.status not in ["下载完成", "错误"] and status.status != "等待中": | |
with status_container.container(): | |
# 进度条 | |
st.progress(status.progress, text=f"进度: {status.progress*100:.1f}%") | |
# 下载信息 | |
cols = st.columns(4) | |
cols[0].metric("总大小", human_readable_size(status.total_size) if status.total_size > 0 else "--") | |
cols[1].metric("已下载", human_readable_size(status.downloaded_size)) | |
speed_display = (f"{status.download_rate/1024:.2f} MB/s" if status.download_rate > 1024 | |
else f"{status.download_rate:.2f} KB/s") if status.download_rate > 0 else "--" | |
cols[2].metric("速度", speed_display) | |
time_display = f"{status.remaining_time:.1f}秒" if status.remaining_time > 0 else "--" | |
cols[3].metric("剩余时间", time_display) | |
st.caption(f"状态: {status.status}") | |
time.sleep(0.5) | |
# 下载完成后的处理 | |
if status.status == "下载完成": | |
st.success("🎉 下载完成!") | |
with open(status.file_path, "rb") as f: | |
st.download_button( | |
label="保存文件", | |
data=f, | |
file_name=os.path.basename(status.file_path), | |
mime="application/octet-stream" | |
) | |
if __name__ == "__main__": | |
main() |