File size: 2,106 Bytes
7107f0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package local

import "context"

type TokenBucket interface {
	Take() <-chan struct{}
	Put()
	Do(context.Context, func() error) error
}

// StaticTokenBucket is a bucket with a fixed number of tokens,
// where the retrieval and return of tokens are manually controlled.
// In the initial state, the bucket is full.
type StaticTokenBucket struct {
	bucket chan struct{}
}

func NewStaticTokenBucket(size int) StaticTokenBucket {
	bucket := make(chan struct{}, size)
	for range size {
		bucket <- struct{}{}
	}
	return StaticTokenBucket{bucket: bucket}
}

func NewStaticTokenBucketWithMigration(oldBucket TokenBucket, size int) StaticTokenBucket {
	if oldBucket != nil {
		oldStaticBucket, ok := oldBucket.(StaticTokenBucket)
		if ok {
			oldSize := cap(oldStaticBucket.bucket)
			migrateSize := oldSize
			if size < migrateSize {
				migrateSize = size
			}

			bucket := make(chan struct{}, size)
			for range size - migrateSize {
				bucket <- struct{}{}
			}

			if migrateSize != 0 {
				go func() {
					for range migrateSize {
						<-oldStaticBucket.bucket
						bucket <- struct{}{}
					}
					close(oldStaticBucket.bucket)
				}()
			}
			return StaticTokenBucket{bucket: bucket}
		}
	}
	return NewStaticTokenBucket(size)
}

// Take channel maybe closed when local driver is modified.
// don't call Put method after the channel is closed.
func (b StaticTokenBucket) Take() <-chan struct{} {
	return b.bucket
}

func (b StaticTokenBucket) Put() {
	b.bucket <- struct{}{}
}

func (b StaticTokenBucket) Do(ctx context.Context, f func() error) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case _, ok := <-b.Take():
		if ok {
			defer b.Put()
		}
	}
	return f()
}

// NopTokenBucket all function calls to this bucket will success immediately
type NopTokenBucket struct {
	nop chan struct{}
}

func NewNopTokenBucket() NopTokenBucket {
	nop := make(chan struct{})
	close(nop)
	return NopTokenBucket{nop}
}

func (b NopTokenBucket) Take() <-chan struct{} {
	return b.nop
}

func (b NopTokenBucket) Put() {}

func (b NopTokenBucket) Do(_ context.Context, f func() error) error { return f() }