package safeChan import ( "context" "fmt" "sync" ) type ByteChan = SafeChan[[]byte] type SafeChan[T any] struct { ch chan T ctx context.Context cancel context.CancelFunc once sync.Once } func NewSafeChan[T any](size int) *SafeChan[T] { ch := &SafeChan[T]{} ch.ctx, ch.cancel = context.WithCancel(context.Background()) if size < 1 { ch.ch = make(chan T) } else { ch.ch = make(chan T, size) } // ch.ch = make(chan T, size) return ch } func (s *SafeChan[T]) Close() { s.once.Do(func() { s.cancel() close(s.ch) }) } // 管道中剩余数量 func (s *SafeChan[T]) Size() int { return len(s.ch) } func (s *SafeChan[T]) Reader() <-chan T { return s.ch } func (s *SafeChan[T]) Write(d T) error { select { case <-s.ctx.Done(): return fmt.Errorf("chan was closed") default: s.ch <- d return nil } }