57 lines
964 B
Go
57 lines
964 B
Go
package safeChan
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
)
|
||
|
||
type ByteChan = SafeChan[[]byte]
|
||
|
||
type SafeChan[T any] struct {
|
||
ch chan T
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
once sync.Once
|
||
}
|
||
|
||
// size为0会创建无缓冲队列,读chan时,没有数据会阻塞到有数据可读
|
||
func NewSafeChan[T any](size int) *SafeChan[T] {
|
||
ch := &SafeChan[T]{}
|
||
ch.ctx, ch.cancel = context.WithCancel(context.Background())
|
||
if size < 1 {
|
||
ch.ch = make(chan T)
|
||
} else {
|
||
ch.ch = make(chan T, size)
|
||
}
|
||
// ch.ch = make(chan T, size)
|
||
return ch
|
||
}
|
||
|
||
func (s *SafeChan[T]) Close() {
|
||
s.once.Do(func() {
|
||
s.cancel()
|
||
close(s.ch)
|
||
})
|
||
}
|
||
|
||
// 管道中剩余数量
|
||
func (s *SafeChan[T]) Size() int {
|
||
return len(s.ch)
|
||
}
|
||
|
||
func (s *SafeChan[T]) Reader() <-chan T {
|
||
//log.Debug("reader channel")
|
||
return s.ch
|
||
}
|
||
|
||
func (s *SafeChan[T]) Write(d T) error {
|
||
select {
|
||
case <-s.ctx.Done():
|
||
return fmt.Errorf("chan was closed")
|
||
default:
|
||
s.ch <- d
|
||
return nil
|
||
}
|
||
}
|