File size: 4,270 Bytes
9360743
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1082c60
9360743
 
1082c60
9360743
 
1082c60
9360743
 
1082c60
9360743
 
1082c60
 
 
 
9360743
 
 
1082c60
9360743
 
1082c60
 
 
 
9360743
 
 
1082c60
9360743
 
 
1082c60
 
 
9360743
 
 
 
 
 
 
 
1082c60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import time

from threading import Thread, Lock


class Listener:
    task_queue = []
    lock = Lock()
    thread = None
    
    @classmethod
    def _process_tasks(cls):
        while True:
            task = None
            with cls.lock:
                if cls.task_queue:
                    task = cls.task_queue.pop(0)
                    
            if task is None:
                time.sleep(0.001)
                continue
                
            func, args, kwargs = task
            try:
                func(*args, **kwargs)
            except Exception as e:
                print(f"Error in listener thread: {e}")
    
    @classmethod
    def add_task(cls, func, *args, **kwargs):
        with cls.lock:
            cls.task_queue.append((func, args, kwargs))

        if cls.thread is None:
            cls.thread = Thread(target=cls._process_tasks, daemon=True)
            cls.thread.start()


def async_run(func, *args, **kwargs):
    Listener.add_task(func, *args, **kwargs)


class FIFOQueue:
    def __init__(self):
        self.queue = []
        self.lock = Lock()
        print("【调试】创建新的FIFOQueue")

    def push(self, item):
        print(f"【调试】FIFOQueue.push: 准备添加项目: {item}")
        with self.lock:
            self.queue.append(item)
            print(f"【调试】FIFOQueue.push: 成功添加项目: {item}, 当前队列长度: {len(self.queue)}")

    def pop(self):
        print("【调试】FIFOQueue.pop: 准备弹出队列首项")
        with self.lock:
            if self.queue:
                item = self.queue.pop(0)
                print(f"【调试】FIFOQueue.pop: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}")
                return item
            print("【调试】FIFOQueue.pop: 队列为空,返回None")
            return None

    def top(self):
        print("【调试】FIFOQueue.top: 准备查看队列首项")
        with self.lock:
            if self.queue:
                item = self.queue[0]
                print(f"【调试】FIFOQueue.top: 队列首项为: {item}, 当前队列长度: {len(self.queue)}")
                return item
            print("【调试】FIFOQueue.top: 队列为空,返回None")
            return None

    def next(self):
        print("【调试】FIFOQueue.next: 等待弹出队列首项")
        while True:
            with self.lock:
                if self.queue:
                    item = self.queue.pop(0)
                    print(f"【调试】FIFOQueue.next: 成功弹出项目: {item}, 剩余队列长度: {len(self.queue)}")
                    return item

            time.sleep(0.001)


class AsyncStream:
    def __init__(self):
        self.input_queue = FIFOQueue()
        self.output_queue = FIFOQueue()


class InterruptibleStreamData:
    def __init__(self):
        self.input_queue = FIFOQueue()
        self.output_queue = FIFOQueue()
        print("【调试】创建新的InterruptibleStreamData,初始化输入输出队列")
        
    # 推送数据至输出队列
    def push_output(self, item):
        print(f"【调试】InterruptibleStreamData.push_output: 准备推送输出: {type(item)}")
        self.output_queue.push(item)
        print(f"【调试】InterruptibleStreamData.push_output: 成功推送输出")
        
    # 获取下一个输出数据
    def get_output(self):
        print("【调试】InterruptibleStreamData.get_output: 准备获取下一个输出数据")
        item = self.output_queue.next()
        print(f"【调试】InterruptibleStreamData.get_output: 获取到输出数据: {type(item)}")
        return item
    
    # 推送数据至输入队列
    def push_input(self, item):
        print(f"【调试】InterruptibleStreamData.push_input: 准备推送输入: {type(item)}")
        self.input_queue.push(item)
        print(f"【调试】InterruptibleStreamData.push_input: 成功推送输入")
    
    # 获取下一个输入数据
    def get_input(self):
        print("【调试】InterruptibleStreamData.get_input: 准备获取下一个输入数据")
        item = self.input_queue.next()
        print(f"【调试】InterruptibleStreamData.get_input: 获取到输入数据: {type(item)}")
        return item