Spaces:
Sleeping
Sleeping
File size: 5,631 Bytes
31d1f71 ae5d853 31d1f71 ceae804 45da914 e4a61fc 463f386 45da914 ac7717b e4a61fc f7afc9d 6e4f272 e4a61fc f7afc9d e4a61fc 6e4f272 e4a61fc 6e4f272 ac7717b 45da914 ac7717b 45da914 e4a61fc 45da914 e4a61fc 6e4f272 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 463f386 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 e4a61fc 45da914 463f386 e4a61fc 463f386 45da914 463f386 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import streamlit as st
import libtorrent as lt
import os
import time
import threading
from pathlib import Path
import warnings
# 过滤libtorrent的弃用警告
warnings.filterwarnings("ignore", category=DeprecationWarning, module="libtorrent")
class DownloadStatus:
def __init__(self):
self.progress = 0.0
self.download_rate = 0.0
self.remaining_time = 0.0
self.total_size = 0
self.downloaded_size = 0
self.status = "等待中"
self.file_path = None
def human_readable_size(size):
units = ['B', 'KB', 'MB', 'GB', 'TB']
index = 0
while size >= 1024 and index < 4:
size /= 1024
index += 1
return f"{size:.2f} {units[index]}"
def configure_session():
"""无警告的会话配置"""
try:
# 新版本配置方式
settings = lt.settings_pack()
settings.set_str(lt.settings_pack.listen_interfaces, '0.0.0.0:6881')
return lt.session(settings)
except:
# 旧版本简化配置(不指定端口)
return lt.session()
def add_torrent_safely(ses, magnet_uri, download_path):
"""兼容的种子添加方法"""
try:
# 新版本解析方式
params = lt.parse_magnet_uri(magnet_uri)
params.save_path = download_path
if hasattr(lt.storage_mode_t, 'storage_mode_sparse'):
params.storage_mode = lt.storage_mode_t.storage_mode_sparse
return ses.add_torrent(params)
except:
# 旧版本添加方式(已过滤警告)
return lt.add_magnet_uri(ses, magnet_uri, {
'save_path': download_path,
'storage_mode': lt.storage_mode_t(2)
})
def download_task(magnet_uri, download_path, status):
try:
ses = configure_session()
status.status = "解析磁力链接..."
handle = add_torrent_safely(ses, magnet_uri, download_path)
status.status = "获取元数据..."
# 元数据等待(带进度反馈)
metadata_progress = st.progress(0)
timeout = 30
start = time.time()
while not handle.has_metadata():
if time.time() - start > timeout:
raise TimeoutError("元数据获取超时")
metadata_progress.progress(min((time.time() - start)/timeout, 1.0))
time.sleep(0.5)
metadata_progress.empty()
torrent_info = handle.get_torrent_info()
status.total_size = torrent_info.total_size()
status.status = "下载中..."
# 下载主循环
progress_bar = st.progress(0)
speed_chart = st.line_chart(pd.DataFrame(columns=['下载速度 (KB/s)']))
start_time = time.time()
while not handle.is_seed():
s = handle.status()
# 更新进度
status.progress = s.progress
progress_bar.progress(status.progress)
# 计算下载速度
current_time = time.time()
status.download_rate = s.download_rate / 1000 # KB/s
speed_chart.add_rows({'下载速度 (KB/s)': status.download_rate})
# 更新剩余时间
if status.download_rate > 0:
status.remaining_time = (status.total_size - s.total_done) / (status.download_rate * 1000)
else:
status.remaining_time = 0
time.sleep(1)
status.status = "下载完成"
status.file_path = os.path.join(download_path, handle.name())
except Exception as e:
status.status = f"错误: {str(e)}"
def main():
st.title("🛠️ 磁力链接下载工具")
if 'download_status' not in st.session_state:
st.session_state.download_status = DownloadStatus()
with st.form("magnet_form"):
magnet_uri = st.text_input("输入磁力链接:", placeholder="magnet:?xt=urn:...")
submitted = st.form_submit_button("🚀 开始下载")
download_path = Path("downloads")
download_path.mkdir(exist_ok=True)
if submitted and magnet_uri:
if not magnet_uri.startswith("magnet:?"):
st.error("❌ 无效的磁力链接格式")
return
st.session_state.download_status = DownloadStatus()
threading.Thread(
target=download_task,
args=(magnet_uri, download_path, st.session_state.download_status)
).start()
status = st.session_state.download_status
status_container = st.empty()
while status.status not in ["下载完成", "错误"] and status.status != "等待中":
with status_container.container():
st.subheader("实时下载状态")
# 进度展示
cols = st.columns([2, 1, 1])
cols[0].metric("总体进度", f"{status.progress*100:.1f}%")
cols[1].metric("传输速度", f"{status.download_rate:.2f} KB/s")
cols[2].metric("剩余时间",
f"{status.remaining_time:.1f}s" if status.remaining_time > 0 else "未知")
# 可视化图表
st.progress(status.progress)
time.sleep(1)
if status.status == "下载完成":
st.balloons()
with open(status.file_path, "rb") as f:
st.download_button(
"💾 保存文件",
data=f,
file_name=os.path.basename(status.file_path),
mime="application/octet-stream"
)
if __name__ == "__main__":
main() |