Spaces:
Running
on
Zero
Running
on
Zero
File size: 4,270 Bytes
9360743 1082c60 9360743 1082c60 9360743 1082c60 9360743 1082c60 9360743 1082c60 9360743 1082c60 9360743 1082c60 9360743 1082c60 9360743 1082c60 9360743 1082c60 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
import time
from threading import Thread, Lock
class Listener:
task_queue = []
lock = Lock()
thread = None
@classmethod
def _process_tasks(cls):
while True:
task = None
with cls.lock:
if cls.task_queue:
task = cls.task_queue.pop(0)
if task is None:
time.sleep(0.001)
continue
func, args, kwargs = task
try:
func(*args, **kwargs)
except Exception as e:
print(f"Error in listener thread: {e}")
@classmethod
def add_task(cls, func, *args, **kwargs):
with cls.lock:
cls.task_queue.append((func, args, kwargs))
if cls.thread is None:
cls.thread = Thread(target=cls._process_tasks, daemon=True)
cls.thread.start()
def async_run(func, *args, **kwargs):
Listener.add_task(func, *args, **kwargs)
class FIFOQueue:
def __init__(self):
self.queue = []
self.lock = Lock()
print("【调试】创建新的FIFOQueue")
def push(self, item):
print(f"【调试】FIFOQueue.push: 准备添加项目: {item}")
with self.lock:
self.queue.append(item)
print(f"【调试】FIFOQueue.push: 成功添加项目: {item}, 当前队列长度: {len(self.queue)}")
def pop(self):
print("【调试】FIFOQueue.pop: 准备弹出队列首项")
with self.lock:
if self.queue:
item = self.queue.pop(0)
print(f"【调试】FIFOQueue.pop: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}")
return item
print("【调试】FIFOQueue.pop: 队列为空,返回None")
return None
def top(self):
print("【调试】FIFOQueue.top: 准备查看队列首项")
with self.lock:
if self.queue:
item = self.queue[0]
print(f"【调试】FIFOQueue.top: 队列首项为: {item}, 当前队列长度: {len(self.queue)}")
return item
print("【调试】FIFOQueue.top: 队列为空,返回None")
return None
def next(self):
print("【调试】FIFOQueue.next: 等待弹出队列首项")
while True:
with self.lock:
if self.queue:
item = self.queue.pop(0)
print(f"【调试】FIFOQueue.next: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}")
return item
time.sleep(0.001)
class AsyncStream:
def __init__(self):
self.input_queue = FIFOQueue()
self.output_queue = FIFOQueue()
class InterruptibleStreamData:
def __init__(self):
self.input_queue = FIFOQueue()
self.output_queue = FIFOQueue()
print("【调试】创建新的InterruptibleStreamData,初始化输入输出队列")
# 推送数据至输出队列
def push_output(self, item):
print(f"【调试】InterruptibleStreamData.push_output: 准备推送输出: {type(item)}")
self.output_queue.push(item)
print(f"【调试】InterruptibleStreamData.push_output: 成功推送输出")
# 获取下一个输出数据
def get_output(self):
print("【调试】InterruptibleStreamData.get_output: 准备获取下一个输出数据")
item = self.output_queue.next()
print(f"【调试】InterruptibleStreamData.get_output: 获取到输出数据: {type(item)}")
return item
# 推送数据至输入队列
def push_input(self, item):
print(f"【调试】InterruptibleStreamData.push_input: 准备推送输入: {type(item)}")
self.input_queue.push(item)
print(f"【调试】InterruptibleStreamData.push_input: 成功推送输入")
# 获取下一个输入数据
def get_input(self):
print("【调试】InterruptibleStreamData.get_input: 准备获取下一个输入数据")
item = self.input_queue.next()
print(f"【调试】InterruptibleStreamData.get_input: 获取到输入数据: {type(item)}")
return item
|