Go言語では、既にクローズされたチャンネルへの送信や、クローズ操作自体を目的の影响力外で実行すると panic が発生します。したがって、チャンネルのクローズは明確な責任を持つ単一の担当者によってのみ実行されるべきです。
以下に、状況に応じた安全なチャンネル制御の実装パターンを示します。
- 同時書き込みを防ぐラッパーとクローズ管理
mutex + 状態フラグによる冪等性の保証:
type SafeChan[T any] struct {
data chan T
closed bool
mu sync.Mutex
}
func NewSafeChan[T any](buf int) *SafeChan[T] {
return &SafeChan[T]{data: make(chan T, buf)}
}
func (sc *SafeChan[T]) TryClose() bool {
sc.mu.Lock()
defer sc.mu.Unlock()
if !sc.closed {
close(sc.data)
sc.closed = true
return true
}
return false
}
func (sc *SafeChan[T]) IsClosed() bool {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.closed
}
- 送信操作の安全化と select 式への適用
標準的な select と互換性を持つような送信処理:
type SendResult[T any] struct {
Data T
Done bool
Closed bool
}
func TrySend[T any](ch chan<- T, val T, stop <-chan struct{}) SendResult[T] {
select {
case <-stop:
return SendResult[T]{Closed: true}
case ch <- val:
return SendResult[T]{Done: true}
default:
return SendResult[T]{}
}
}
func NonBlockingSend[T any](ch chan<- T, val T) (bool, bool) {
select {
case ch <- val:
return true, false
default:
return false, false
}
}
- シンブルな送信者 × 複数受信者
送信者が唯一であれば、その送信者がチャンネルをクローズできます:
const limit = 100000
dataCh := make(chan int, 64)
var wg sync.WaitGroup
wg.Add(1)
// send goroutine
go func() {
defer wg.Done()
defer close(dataCh)
for {
r := rand.Intn(limit)
if r == 0 {
return
}
dataCh <- r
}
}()
// recv goroutines (multiple)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range dataCh {
// 処理
}
}()
}
wg.Wait()
- 複数送信者 × シンブルな受信者
受信者が全体の停止を制御し、専用停止チャネル stopCh を通知手段として用います:
const limit = 100000
dataCh := make(chan int, 64)
stopCh := make(chan struct{})
// stopCh sender: receiver
go func() {
for v := range dataCh {
if v == limit-1 {
close(stopCh)
return
}
//println(v)
}
}()
// stopCh receivers: multiple senders
for i := 0; i < 100; i++ {
go func() {
for {
select {
case <-stopCh:
return
default:
}
select {
case <-stopCh:
return
case dataCh <- rand.Intn(limit):
}
}
}()
}
- 複数送信者 × 複数受信者
全協調者間の同期を仲介する「モデレータ」プロセスを導入:
const limit = 100000
stopCh := make(chan struct{})
dataCh := make(chan int, 64)
notifyCh := make(chan string, 2)
var (
stoppedBy string
wg sync.WaitGroup
)
// moderator
wg.Add(1)
go func() {
defer wg.Done()
stoppedBy = <-notifyCh
close(stopCh)
}()
// senders (multiple)
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
val := rand.Intn(limit)
if val == 0 {
select {
case notifyCh <- fmt.Sprintf("sender-%d", id):
default:
}
return
}
select {
case <-stopCh:
return
case dataCh <- val:
}
}
}(i)
}
// receivers (multiple)
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-stopCh:
return
default:
}
select {
case <-stopCh:
return
case val := <-dataCh:
if val == limit-1 {
select {
case notifyCh <- fmt.Sprintf("receiver-%d", id):
default:
}
return
}
}
}
}(i)
}
wg.Wait()
この設計では、チャンネルの「クローズ責任」を明確に分離しつつ、select 句との自然な統合を実現しています。また、 try-receive/default はコンパイラの最適化により効率的です。