File size: 1,045 Bytes
7107f0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
package mq
import (
"sync"
"github.com/alist-org/alist/v3/pkg/generic"
)
type Message[T any] struct {
Content T
}
type BasicConsumer[T any] func(Message[T])
type AllConsumer[T any] func([]Message[T])
type MQ[T any] interface {
Publish(Message[T])
Consume(BasicConsumer[T])
ConsumeAll(AllConsumer[T])
Clear()
Len() int
}
type inMemoryMQ[T any] struct {
queue generic.Queue[Message[T]]
sync.Mutex
}
func NewInMemoryMQ[T any]() MQ[T] {
return &inMemoryMQ[T]{queue: *generic.NewQueue[Message[T]]()}
}
func (mq *inMemoryMQ[T]) Publish(msg Message[T]) {
mq.Lock()
defer mq.Unlock()
mq.queue.Push(msg)
}
func (mq *inMemoryMQ[T]) Consume(consumer BasicConsumer[T]) {
mq.Lock()
defer mq.Unlock()
for !mq.queue.IsEmpty() {
consumer(mq.queue.Pop())
}
}
func (mq *inMemoryMQ[T]) ConsumeAll(consumer AllConsumer[T]) {
mq.Lock()
defer mq.Unlock()
consumer(mq.queue.PopAll())
}
func (mq *inMemoryMQ[T]) Clear() {
mq.Lock()
defer mq.Unlock()
mq.queue.Clear()
}
func (mq *inMemoryMQ[T]) Len() int {
return mq.queue.Len()
}
|