import streamlit as st import libtorrent as lt import os import time import threading from pathlib import Path import warnings # 过滤libtorrent的弃用警告 warnings.filterwarnings("ignore", category=DeprecationWarning, module="libtorrent") class DownloadStatus: def __init__(self): self.progress = 0.0 self.download_rate = 0.0 self.remaining_time = 0.0 self.total_size = 0 self.downloaded_size = 0 self.status = "等待中" self.file_path = None def human_readable_size(size): units = ['B', 'KB', 'MB', 'GB', 'TB'] index = 0 while size >= 1024 and index < 4: size /= 1024 index += 1 return f"{size:.2f} {units[index]}" def configure_session(): """无警告的会话配置""" try: # 新版本配置方式 settings = lt.settings_pack() settings.set_str(lt.settings_pack.listen_interfaces, '0.0.0.0:6881') return lt.session(settings) except: # 旧版本简化配置(不指定端口) return lt.session() def add_torrent_safely(ses, magnet_uri, download_path): """兼容的种子添加方法""" try: # 新版本解析方式 params = lt.parse_magnet_uri(magnet_uri) params.save_path = download_path if hasattr(lt.storage_mode_t, 'storage_mode_sparse'): params.storage_mode = lt.storage_mode_t.storage_mode_sparse return ses.add_torrent(params) except: # 旧版本添加方式(已过滤警告) return lt.add_magnet_uri(ses, magnet_uri, { 'save_path': download_path, 'storage_mode': lt.storage_mode_t(2) }) def download_task(magnet_uri, download_path, status): try: ses = configure_session() status.status = "解析磁力链接..." handle = add_torrent_safely(ses, magnet_uri, download_path) status.status = "获取元数据..." # 元数据等待(带进度反馈) metadata_progress = st.progress(0) timeout = 30 start = time.time() while not handle.has_metadata(): if time.time() - start > timeout: raise TimeoutError("元数据获取超时") metadata_progress.progress(min((time.time() - start)/timeout, 1.0)) time.sleep(0.5) metadata_progress.empty() torrent_info = handle.get_torrent_info() status.total_size = torrent_info.total_size() status.status = "下载中..." # 下载主循环 progress_bar = st.progress(0) speed_chart = st.line_chart(pd.DataFrame(columns=['下载速度 (KB/s)'])) start_time = time.time() while not handle.is_seed(): s = handle.status() # 更新进度 status.progress = s.progress progress_bar.progress(status.progress) # 计算下载速度 current_time = time.time() status.download_rate = s.download_rate / 1000 # KB/s speed_chart.add_rows({'下载速度 (KB/s)': status.download_rate}) # 更新剩余时间 if status.download_rate > 0: status.remaining_time = (status.total_size - s.total_done) / (status.download_rate * 1000) else: status.remaining_time = 0 time.sleep(1) status.status = "下载完成" status.file_path = os.path.join(download_path, handle.name()) except Exception as e: status.status = f"错误: {str(e)}" def main(): st.title("🛠️ 磁力链接下载工具") if 'download_status' not in st.session_state: st.session_state.download_status = DownloadStatus() with st.form("magnet_form"): magnet_uri = st.text_input("输入磁力链接:", placeholder="magnet:?xt=urn:...") submitted = st.form_submit_button("🚀 开始下载") download_path = Path("downloads") download_path.mkdir(exist_ok=True) if submitted and magnet_uri: if not magnet_uri.startswith("magnet:?"): st.error("❌ 无效的磁力链接格式") return st.session_state.download_status = DownloadStatus() threading.Thread( target=download_task, args=(magnet_uri, download_path, st.session_state.download_status) ).start() status = st.session_state.download_status status_container = st.empty() while status.status not in ["下载完成", "错误"] and status.status != "等待中": with status_container.container(): st.subheader("实时下载状态") # 进度展示 cols = st.columns([2, 1, 1]) cols[0].metric("总体进度", f"{status.progress*100:.1f}%") cols[1].metric("传输速度", f"{status.download_rate:.2f} KB/s") cols[2].metric("剩余时间", f"{status.remaining_time:.1f}s" if status.remaining_time > 0 else "未知") # 可视化图表 st.progress(status.progress) time.sleep(1) if status.status == "下载完成": st.balloons() with open(status.file_path, "rb") as f: st.download_button( "💾 保存文件", data=f, file_name=os.path.basename(status.file_path), mime="application/octet-stream" ) if __name__ == "__main__": main()