Go言語におけるsync.Poolの実装解析

Go言語におけるsync.Poolの実装解析

syncパッケージのPoolは、一時的にオブジェクトを再利用するための仕組みを提供します。これにより、頻繁に作成・破棄されるオブジェクトのガベージコレクションオーバーヘッドを削減できます。

基本的な使い方

package main

import (
	"sync"
	"fmt"
)

// 再利用可能なオブジェクトの型
type Resource struct {
	ID string
}

func main() {
	// 新しいプールを作成
	resourcePool := sync.Pool{
		New: func() interface{} {
			return &Resource{ID: ""}
		},
	}
	
	// プールからリソースを取得
	item := resourcePool.Get().(*Resource)
	item.ID = "resource-1"
	fmt.Printf("取得したリソース: %s\n", item.ID)
	
	// 使用後にプールに戻す
	resourcePool.Put(item)
	
	// 再度取得して確認
	reusedItem := resourcePool.Get().(*Resource)
	fmt.Printf("再利用されたリソース: %s\n", reusedItem.ID)
}

内部構造

Pool構造体

type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // 各Pに対応する固定サイズのローカルプール、実際の型は[P]poolLocal
	localSize uintptr        // ローカル配列のサイズ

	victim     unsafe.Pointer // 前回のサイクルからのローカルプール
	victimSize uintptr        // ビクトム配列のサイズ

	// NewはGetがnilを返す場合に値を生成する関数を指定します
	// Getと並行して変更することはできません
	New func() any
}

Pool構造体は以下の要素から構成されます:

  • local: Pの数の長さを持つ配列で、要素の型はpoolLocalです。各Pに対応するローカルオブジェクトプールを格納します。
  • localSize: local配列の長さを表します。Pは実行時にruntime.GOMAXPROCSで変更可能なため、localSizeで配列の長さを対応させます。
  • New: ユーザーが提供するオブジェクト生成関数です。必須ではなく、指定しない場合はGetがnilを返す可能性があります。

poolLocal構造体

// 各P用のプール補助構造体
type poolLocalInternal struct {
	private any       // 対応するPのみが使用可能
	shared  poolChain // ローカルPはpushHead/popHead可能、任意のPがpopTail可能
}

type poolLocal struct {
	poolLocalInternal

	// キャッシュラインサイズが128のプラットフォームで偽共有を防ぐ
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

poolLocalは各Pにバインドされたストレージです:

  • private: 高速な処理のために使用されます。特にGet->Put->Get->Putのような頻繁な操作時に、双方向リストストレージへのアクセスを減らします。
  • shared: 2つの役割があります。1つは大容量ストレージとして、もう1つは他のPからの窃取用です。
  • pad: CPUキャッシュラインの速度向上のためのパディングです(注意して使用)。

注意点として、poolLocalは1つのP専用であるため、他の断片化されたデータがキャッシュラインに一緒に含まれるのを防ぎます。これにより、断片化データの頻繁な更新によるキャッシュミスを回避できます。

主要メソッド

Putメソッド

func (p *Pool) Put(x any) {
	// nilチェック
	if x == nil {
		return
	}

	// 現在のGをPに固定し、対応するpoolLocalを取得
	l, _ := p.pin()
	
	// 同じスレッド内で実行されるため、ミューテックスなどは不要

	// privateが空なら直接格納
	if l.private == nil {
		l.private = x
	} else {
		// そうでなければ共有キューに格納
		l.shared.pushHead(x)
	}
	
	// Pからの固定を解除
	runtime_procUnpin()
}

Putメソッドの主な手順:

  1. 現在のGをPにバインド
  2. PのIDを取得
  3. IDに対応するpoolLocalを取得
  4. privateが空ならそこに格納、そうでなければ共有リストに格納

Getメソッド

func (p *Pool) Get() any {
	// 現在のGをPに固定し、対応するpoolLocalを取得
	l, pid := p.pin()
	
	// privateを優先的に使用
	x := l.private
	l.private = nil

	// privateが空の場合
	if x == nil {
		// ローカル共有リストから取得を試みる
		x, _ = l.shared.popHead()
		// それでも取得できない場合
		if x == nil {
			// 他のPから取得を試みる、失敗すればvictimから取得
			x = p.getSlow(pid)
		}
	}
	
	// Pからの固定を解除
	runtime_procUnpin()
	
	// プールに利用可能なオブジェクトがなく、New関数がある場合は新規作成
	if x == nil && p.New != nil {
		x = p.New()
	}
	
	// xを返す(nilの可能性あり)
	return x
}

Getメソッドの主な手順:

  1. 現在のGをPにバインド
  2. PのIDを取得
  3. IDに対応するpoolLocalを取得
  4. privateを優先的に使用
  5. 次に自分の共有リストを使用
  6. 次に他のPのpoolLocalから取得を試みる
  7. それでもなければvictimから再利用
  8. プールに利用可能なオブジェクトがなければNew関数で生成

pinメソッド

func (p *Pool) pin() (*poolLocal, int) {
	// 現在のgoroutineをPに固定し、PのIDを取得
	pid := runtime_procPin()

	// p.localSizeとp.localを読み込む
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume

	// pidがp.localSizeより小さいかチェック
	if uintptr(pid) < s {
		// 対応するpoolLocalとpidを返す
		return indexLocal(l, pid), pid
	}
	// 初期化が必要な場合
	return p.pinSlow()
}

pinメソッドは、現在のgoroutineをPに固定し、PのpoolLocalプールとPのIDを返します。呼び出し側はプールを使用後にruntime_procUnpin()を呼び出す必要があります。

pinSlowメソッド

func (p *Pool) pinSlow() (*poolLocal, int) {
	// pinSlowを呼び出す前にpinが呼ばれているため、一旦固定を解除
	// ミューテックス操作のために必要
	runtime_procUnpin()
	
	// localの初期化中は他のアクセスを禁止
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	
	// 再度Pに固定
	pid := runtime_procPin()
	
	// double check
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	
	// すべてのプールをGCが管理できるようにキューに追加
	if p.local == nil {
		allPools = append(allPools, p)
	}
	
	// GOMAXPROCSがGC間で変更された場合、配列を再割り当て
	size := runtime.GOMAXPROCS(0)
	
	// プールを初期化
	local := make([]poolLocal, size)
	
	// 原子的にlocalとlocalSizeを更新
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	
	// 対応するpoolLocalとpidを返す
	return &local[pid], pid
}

getSlowメソッド

func (p *Pool) getSlow(pid int) any {
	// プールのサイズを取得
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
	
	// 他のプロセスから要素を窃取を試みる
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// プライマリキャッシュから取得できなかった場合、ビクトムキャッシュを試す
	// 優先的にプライマリキャッシュから窃取するのは、ビクトムキャッシュのオブジェクトが可能な限り老化するため
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// ビクトムキャッシュが空であることを将来のGetが無駄にしないようにマーク
	atomic.StoreUintptr(&p.victimSize, 0)
	
	return nil
}

poolCleanupメソッド

func poolCleanup() {
	// この関数はガベージコレクションの開始時にワールドストップ状態で呼び出される
	// メモリを割り当てたり、ランタイム関数を呼び出したりしてはならない

	// ワールドがストップしているため、プールユーザーは固定されたセクションに存在しない

	// すべてのプールからビクトムキャッシュをドロップ
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// プライマリキャッシュをビクトムキャッシュに移動
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	// プライマリキャッシュが非空のプールは、今や非空のビクトムキャッシュを持ち、
	// プールはプライマリキャッシュを持たない
	oldPools, allPools = allPools, nil
}

poolCleanup関数はGC時に呼び出され、現在のlocalをvictimにフラッシュする役割を果たします。直接クリアしない理由は、大量のNew呼び出しの可能性があるためです。

poolChainの詳細

構造体

poolChain

type poolChain struct {
	// headはpushするためのpoolDequeue。プロデューサーのみがアクセスするため、同期は不要
	head *poolChainElt

	// tailはpopTailするためのpoolDequeue。コンシューマーがアクセスするため、読み書きはアトミックである必要がある
	tail *poolChainElt
}

poolDequeue

type poolDequeue struct {
	// headTailは32ビットのheadインデックスと32ビットのtailインデックスをまとめたもの
	// 両方ともvalsへのインデックス(len(vals)-1に対するmodulo)
	//
	// tail = キュー中最も古いデータのインデックス
	// head = 次に埋めるスロットのインデックス
	//
	// [tail, head)範囲のスロットはコンシューマーが所有
	// コンシューマーはtailインデックスがスロットを越えていて、かつtypがnilになるまでスロットを所有し続ける
	// コンシューマーがスロットをnilに設定すると、所有権はプロデューサーに移る
	//
	// headインデックスは最上位ビットに格納されているため、アトミックに加算でき、オーバーフローも問題にならない
	headTail uint64

	// このdequeueに格納されるinterface{}値のリングバッファ
	// サイズは2のべき乗でなければならない
	// vals[i].typがnilならスロットは空、そうでなければ非-nil
	// スロットはtailインデックスがそれを越え、かつtypがnilに設定されるまで使用中
	// これはコンシューマーによってアトミックにnilに設定され、プロデューサーによってアトミックに読み取られる
	vals []eface
}

poolChainElt

type poolChainElt struct {
	poolDequeue

	// このpoolChain内の隣接するpoolChainEltへのリンク
	//
	// nextはプロデューサーによってアトミックに書き込み、コンシューマーによってアトミックに読み取られる
	// nilから非-nilへのみ遷移する
	//
	// prevはコンシューマーによってアトミックに書き込み、プロデューサーによってアトミックに読み取られる
	// 非-nilからnilへのみ遷移する
	next, prev *poolChainElt
}

eface

type eface struct {
	typ, val unsafe.Pointer
}

efaceはGo言語の空インターフェースを表現する構造体で、値の型(typ)と値自体(val)を格納します。これにより、低レベルの同時実行データ構造(poolChainのロックフリー実装)において、メモリ使用と同期をより細かく制御できます。

メソッド

poolChain.pushHead

func (c *poolChain) pushHead(val any) {
	d := c.head
	// 初期化されていない場合
	if d == nil {
		// チェーンを初期化
		const initSize = 8 // 2のべき乗でなければならない
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		// headとtailをアトミックに設定
		storePoolChainElt(&c.tail, d)
	}
	// 現在のdequeueに書き込める場合は直接戻る
	if d.pushHead(val) {
		return
	}

	// 現在のdequeueは満杯。サイズを2倍に新しいdequeueを割り当て
	newSize := len(d.vals) * 2
	// 最大サイズ制限
	if newSize >= dequeueLimit {
		newSize = dequeueLimit
	}

	// 次のレイヤーを初期化
	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	// 初期化直後なので必ず書き込み可能
	d2.pushHead(val)
}

headが満杯の場合、新しいheadを生成し、現在のheadのprevに設定します。

poolChain.popHead

func (c *poolChain) popHead() (any, bool) {
	// 現在のhead poolChainEltを取得
	d := c.head
	for d != nil {
		// 現在のheadのdequeueからpopを試みる
		if val, ok := d.popHead(); ok {
			return val, ok
		}
		// まだ前のdequeueに未消費の要素がある可能性があるため、一つ戻る
		d = loadPoolChainElt(&d.prev)
	}
	return nil, false
}

poolChain.popTail

func (c *poolChain) popTail() (any, bool) {
	// tailを取得
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil, false
	}

	for {
		// tailをpopする前にnextポインタを読み込むことが重要
		// 一般的にdは一時的に空になる可能性があるが、pop前にnextが非-nilでpopに失敗した場合、dは永続的に空であり、
		// チェーンからドロップする唯一の条件
		d2 := loadPoolChainElt(&d.next)
		
		// popTailを試みる
		if val, ok := d.popTail(); ok {
			return val, ok
		}
		
		// tailのdequeueが空の場合
		if d2 == nil {
			// これが唯一のdequeue。現在は空だが、将来に向けてpushされる可能性がある
			return nil, false
		}

		// チェーンのtailが空になったため、次のdequeueに移動
		// 次のpopで空のdequeueを再度見ないように、チェーンからドロップを試みる
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			// 競合に勝った場合、空のdequeueをガベージコレクトできるようにprevポインタをクリア
			// またpopHeadが不必要に遡らないようにする
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}

poolChain.popTailは、キューの尾部から要素を削除して返すメソッドです。キューが空の場合、falseを返します。このメソッドは任意数のコンシューマーから呼び出すことができます。

重要な注意点

poolDequeueはロックフリーな固定サイズの単一プロデューサー、複数コンシューマーキューアプリケーションです。3つの主要なメソッドがあります:

  1. pushHead(val any) bool: 要素をキューの先頭に追加します。キューが満杯の場合、falseを返します。このメソッドは単一のプロデューサーのみが呼び出すことができます。
  2. popHead() (any, bool): キューの先頭から要素を削除して返します。キューが空の場合、falseを返します。このメソッドは単一のプロデューサーのみが呼び出すことができます。
  3. popTail() (any, bool): キューの末尾から要素を削除して返します。キューが空の場合、falseを返します。このメソッドは任意数のコンシューマーから呼び出すことができます。

重要な点として、popTailによるスロットの完全な解放はアトミック操作ではありません。そのため、pushHeadは以下の2つの操作を行う必要があります:

  1. スロットを取得できるか確認
  2. popTailがスロットを解放したか確認
  3. pushHeadとpopHeadは同時に1つだけ占有するため、競合問題を考慮する必要はありません

タグ: Go言語 sync.Pool メモリ管理 並行処理 ガベージコレクション

5月24日 03:09 投稿