Go言語におけるチャンネルの安全なクローズ戦略

Go言語では、既にクローズされたチャンネルへの送信や、クローズ操作自体を目的の影响力外で実行すると panic が発生します。したがって、チャンネルのクローズは明確な責任を持つ単一の担当者によってのみ実行されるべきです。

以下に、状況に応じた安全なチャンネル制御の実装パターンを示します。

  1. 同時書き込みを防ぐラッパーとクローズ管理

mutex + 状態フラグによる冪等性の保証:

type SafeChan[T any] struct {
	data   chan T
	closed bool
	mu     sync.Mutex
}

func NewSafeChan[T any](buf int) *SafeChan[T] {
	return &SafeChan[T]{data: make(chan T, buf)}
}

func (sc *SafeChan[T]) TryClose() bool {
	sc.mu.Lock()
	defer sc.mu.Unlock()
	if !sc.closed {
		close(sc.data)
		sc.closed = true
		return true
	}
	return false
}

func (sc *SafeChan[T]) IsClosed() bool {
	sc.mu.Lock()
	defer sc.mu.Unlock()
	return sc.closed
}
  1. 送信操作の安全化と select 式への適用

標準的な select と互換性を持つような送信処理:

type SendResult[T any] struct {
	Data   T
	Done   bool
	Closed bool
}

func TrySend[T any](ch chan<- T, val T, stop <-chan struct{}) SendResult[T] {
	select {
	case <-stop:
		return SendResult[T]{Closed: true}
	case ch <- val:
		return SendResult[T]{Done: true}
	default:
		return SendResult[T]{}
	}
}

func NonBlockingSend[T any](ch chan<- T, val T) (bool, bool) {
	select {
	case ch <- val:
		return true, false
	default:
		return false, false
	}
}
  1. シンブルな送信者 × 複数受信者

送信者が唯一であれば、その送信者がチャンネルをクローズできます:

const limit = 100000

dataCh := make(chan int, 64)
var wg sync.WaitGroup
wg.Add(1)

// send goroutine
go func() {
	defer wg.Done()
	defer close(dataCh)
	for {
		r := rand.Intn(limit)
		if r == 0 {
			return
		}
		dataCh <- r
	}
}()

// recv goroutines (multiple)
for i := 0; i < 10; i++ {
	wg.Add(1)
	go func() {
		defer wg.Done()
		for v := range dataCh {
			// 処理
		}
	}()
}

wg.Wait()
  1. 複数送信者 × シンブルな受信者

受信者が全体の停止を制御し、専用停止チャネル stopCh を通知手段として用います:

const limit = 100000
dataCh := make(chan int, 64)
stopCh := make(chan struct{})

// stopCh sender: receiver
go func() {
	for v := range dataCh {
		if v == limit-1 {
			close(stopCh)
			return
		}
		//println(v)
	}
}()

// stopCh receivers: multiple senders
for i := 0; i < 100; i++ {
	go func() {
		for {
			select {
			case <-stopCh:
				return
			default:
			}
			select {
			case <-stopCh:
				return
			case dataCh <- rand.Intn(limit):
			}
		}
	}()
}
  1. 複数送信者 × 複数受信者

全協調者間の同期を仲介する「モデレータ」プロセスを導入:

const limit = 100000
stopCh := make(chan struct{})
dataCh := make(chan int, 64)
notifyCh := make(chan string, 2)
var (
	stoppedBy string
	wg        sync.WaitGroup
)

// moderator
wg.Add(1)
go func() {
	defer wg.Done()
	stoppedBy = <-notifyCh
	close(stopCh)
}()

// senders (multiple)
for i := 0; i < 1000; i++ {
	wg.Add(1)
	go func(id int) {
		defer wg.Done()
		for {
			val := rand.Intn(limit)
			if val == 0 {
				select {
				case notifyCh <- fmt.Sprintf("sender-%d", id):
				default:
				}
				return
			}
			select {
			case <-stopCh:
				return
			case dataCh <- val:
			}
		}
	}(i)
}

// receivers (multiple)
for i := 0; i < 100; i++ {
	wg.Add(1)
	go func(id int) {
		defer wg.Done()
		for {
			select {
			case <-stopCh:
				return
			default:
			}
			select {
			case <-stopCh:
				return
			case val := <-dataCh:
				if val == limit-1 {
					select {
					case notifyCh <- fmt.Sprintf("receiver-%d", id):
					default:
					}
					return
				}
			}
		}
	}(i)
}

wg.Wait()

この設計では、チャンネルの「クローズ責任」を明確に分離しつつ、select 句との自然な統合を実現しています。また、 try-receive/default はコンパイラの最適化により効率的です。

タグ: go-channel safe-close select-pattern goroutine-synchronization

6月2日 21:40 投稿