Go言語におけるsync.Poolの実装解析
syncパッケージのPoolは、一時的にオブジェクトを再利用するための仕組みを提供します。これにより、頻繁に作成・破棄されるオブジェクトのガベージコレクションオーバーヘッドを削減できます。
基本的な使い方
package main
import (
"sync"
"fmt"
)
// 再利用可能なオブジェクトの型
type Resource struct {
ID string
}
func main() {
// 新しいプールを作成
resourcePool := sync.Pool{
New: func() interface{} {
return &Resource{ID: ""}
},
}
// プールからリソースを取得
item := resourcePool.Get().(*Resource)
item.ID = "resource-1"
fmt.Printf("取得したリソース: %s\n", item.ID)
// 使用後にプールに戻す
resourcePool.Put(item)
// 再度取得して確認
reusedItem := resourcePool.Get().(*Resource)
fmt.Printf("再利用されたリソース: %s\n", reusedItem.ID)
}
内部構造
Pool構造体
type Pool struct {
noCopy noCopy
local unsafe.Pointer // 各Pに対応する固定サイズのローカルプール、実際の型は[P]poolLocal
localSize uintptr // ローカル配列のサイズ
victim unsafe.Pointer // 前回のサイクルからのローカルプール
victimSize uintptr // ビクトム配列のサイズ
// NewはGetがnilを返す場合に値を生成する関数を指定します
// Getと並行して変更することはできません
New func() any
}
Pool構造体は以下の要素から構成されます:
local: Pの数の長さを持つ配列で、要素の型はpoolLocalです。各Pに対応するローカルオブジェクトプールを格納します。localSize:local配列の長さを表します。Pは実行時にruntime.GOMAXPROCSで変更可能なため、localSizeで配列の長さを対応させます。New: ユーザーが提供するオブジェクト生成関数です。必須ではなく、指定しない場合はGetがnilを返す可能性があります。
poolLocal構造体
// 各P用のプール補助構造体
type poolLocalInternal struct {
private any // 対応するPのみが使用可能
shared poolChain // ローカルPはpushHead/popHead可能、任意のPがpopTail可能
}
type poolLocal struct {
poolLocalInternal
// キャッシュラインサイズが128のプラットフォームで偽共有を防ぐ
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
poolLocalは各Pにバインドされたストレージです:
private: 高速な処理のために使用されます。特にGet->Put->Get->Putのような頻繁な操作時に、双方向リストストレージへのアクセスを減らします。shared: 2つの役割があります。1つは大容量ストレージとして、もう1つは他のPからの窃取用です。pad: CPUキャッシュラインの速度向上のためのパディングです(注意して使用)。
注意点として、poolLocalは1つのP専用であるため、他の断片化されたデータがキャッシュラインに一緒に含まれるのを防ぎます。これにより、断片化データの頻繁な更新によるキャッシュミスを回避できます。
主要メソッド
Putメソッド
func (p *Pool) Put(x any) {
// nilチェック
if x == nil {
return
}
// 現在のGをPに固定し、対応するpoolLocalを取得
l, _ := p.pin()
// 同じスレッド内で実行されるため、ミューテックスなどは不要
// privateが空なら直接格納
if l.private == nil {
l.private = x
} else {
// そうでなければ共有キューに格納
l.shared.pushHead(x)
}
// Pからの固定を解除
runtime_procUnpin()
}
Putメソッドの主な手順:
- 現在のGをPにバインド
- PのIDを取得
- IDに対応するpoolLocalを取得
- privateが空ならそこに格納、そうでなければ共有リストに格納
Getメソッド
func (p *Pool) Get() any {
// 現在のGをPに固定し、対応するpoolLocalを取得
l, pid := p.pin()
// privateを優先的に使用
x := l.private
l.private = nil
// privateが空の場合
if x == nil {
// ローカル共有リストから取得を試みる
x, _ = l.shared.popHead()
// それでも取得できない場合
if x == nil {
// 他のPから取得を試みる、失敗すればvictimから取得
x = p.getSlow(pid)
}
}
// Pからの固定を解除
runtime_procUnpin()
// プールに利用可能なオブジェクトがなく、New関数がある場合は新規作成
if x == nil && p.New != nil {
x = p.New()
}
// xを返す(nilの可能性あり)
return x
}
Getメソッドの主な手順:
- 現在のGをPにバインド
- PのIDを取得
- IDに対応するpoolLocalを取得
- privateを優先的に使用
- 次に自分の共有リストを使用
- 次に他のPのpoolLocalから取得を試みる
- それでもなければvictimから再利用
- プールに利用可能なオブジェクトがなければNew関数で生成
pinメソッド
func (p *Pool) pin() (*poolLocal, int) {
// 現在のgoroutineをPに固定し、PのIDを取得
pid := runtime_procPin()
// p.localSizeとp.localを読み込む
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
// pidがp.localSizeより小さいかチェック
if uintptr(pid) < s {
// 対応するpoolLocalとpidを返す
return indexLocal(l, pid), pid
}
// 初期化が必要な場合
return p.pinSlow()
}
pinメソッドは、現在のgoroutineをPに固定し、PのpoolLocalプールとPのIDを返します。呼び出し側はプールを使用後にruntime_procUnpin()を呼び出す必要があります。
pinSlowメソッド
func (p *Pool) pinSlow() (*poolLocal, int) {
// pinSlowを呼び出す前にpinが呼ばれているため、一旦固定を解除
// ミューテックス操作のために必要
runtime_procUnpin()
// localの初期化中は他のアクセスを禁止
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 再度Pに固定
pid := runtime_procPin()
// double check
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// すべてのプールをGCが管理できるようにキューに追加
if p.local == nil {
allPools = append(allPools, p)
}
// GOMAXPROCSがGC間で変更された場合、配列を再割り当て
size := runtime.GOMAXPROCS(0)
// プールを初期化
local := make([]poolLocal, size)
// 原子的にlocalとlocalSizeを更新
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
// 対応するpoolLocalとpidを返す
return &local[pid], pid
}
getSlowメソッド
func (p *Pool) getSlow(pid int) any {
// プールのサイズを取得
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 他のプロセスから要素を窃取を試みる
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// プライマリキャッシュから取得できなかった場合、ビクトムキャッシュを試す
// 優先的にプライマリキャッシュから窃取するのは、ビクトムキャッシュのオブジェクトが可能な限り老化するため
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// ビクトムキャッシュが空であることを将来のGetが無駄にしないようにマーク
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
poolCleanupメソッド
func poolCleanup() {
// この関数はガベージコレクションの開始時にワールドストップ状態で呼び出される
// メモリを割り当てたり、ランタイム関数を呼び出したりしてはならない
// ワールドがストップしているため、プールユーザーは固定されたセクションに存在しない
// すべてのプールからビクトムキャッシュをドロップ
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// プライマリキャッシュをビクトムキャッシュに移動
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// プライマリキャッシュが非空のプールは、今や非空のビクトムキャッシュを持ち、
// プールはプライマリキャッシュを持たない
oldPools, allPools = allPools, nil
}
poolCleanup関数はGC時に呼び出され、現在のlocalをvictimにフラッシュする役割を果たします。直接クリアしない理由は、大量のNew呼び出しの可能性があるためです。
poolChainの詳細
構造体
poolChain
type poolChain struct {
// headはpushするためのpoolDequeue。プロデューサーのみがアクセスするため、同期は不要
head *poolChainElt
// tailはpopTailするためのpoolDequeue。コンシューマーがアクセスするため、読み書きはアトミックである必要がある
tail *poolChainElt
}
poolDequeue
type poolDequeue struct {
// headTailは32ビットのheadインデックスと32ビットのtailインデックスをまとめたもの
// 両方ともvalsへのインデックス(len(vals)-1に対するmodulo)
//
// tail = キュー中最も古いデータのインデックス
// head = 次に埋めるスロットのインデックス
//
// [tail, head)範囲のスロットはコンシューマーが所有
// コンシューマーはtailインデックスがスロットを越えていて、かつtypがnilになるまでスロットを所有し続ける
// コンシューマーがスロットをnilに設定すると、所有権はプロデューサーに移る
//
// headインデックスは最上位ビットに格納されているため、アトミックに加算でき、オーバーフローも問題にならない
headTail uint64
// このdequeueに格納されるinterface{}値のリングバッファ
// サイズは2のべき乗でなければならない
// vals[i].typがnilならスロットは空、そうでなければ非-nil
// スロットはtailインデックスがそれを越え、かつtypがnilに設定されるまで使用中
// これはコンシューマーによってアトミックにnilに設定され、プロデューサーによってアトミックに読み取られる
vals []eface
}
poolChainElt
type poolChainElt struct {
poolDequeue
// このpoolChain内の隣接するpoolChainEltへのリンク
//
// nextはプロデューサーによってアトミックに書き込み、コンシューマーによってアトミックに読み取られる
// nilから非-nilへのみ遷移する
//
// prevはコンシューマーによってアトミックに書き込み、プロデューサーによってアトミックに読み取られる
// 非-nilからnilへのみ遷移する
next, prev *poolChainElt
}
eface
type eface struct {
typ, val unsafe.Pointer
}
efaceはGo言語の空インターフェースを表現する構造体で、値の型(typ)と値自体(val)を格納します。これにより、低レベルの同時実行データ構造(poolChainのロックフリー実装)において、メモリ使用と同期をより細かく制御できます。
メソッド
poolChain.pushHead
func (c *poolChain) pushHead(val any) {
d := c.head
// 初期化されていない場合
if d == nil {
// チェーンを初期化
const initSize = 8 // 2のべき乗でなければならない
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
// headとtailをアトミックに設定
storePoolChainElt(&c.tail, d)
}
// 現在のdequeueに書き込める場合は直接戻る
if d.pushHead(val) {
return
}
// 現在のdequeueは満杯。サイズを2倍に新しいdequeueを割り当て
newSize := len(d.vals) * 2
// 最大サイズ制限
if newSize >= dequeueLimit {
newSize = dequeueLimit
}
// 次のレイヤーを初期化
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
// 初期化直後なので必ず書き込み可能
d2.pushHead(val)
}
headが満杯の場合、新しいheadを生成し、現在のheadのprevに設定します。
poolChain.popHead
func (c *poolChain) popHead() (any, bool) {
// 現在のhead poolChainEltを取得
d := c.head
for d != nil {
// 現在のheadのdequeueからpopを試みる
if val, ok := d.popHead(); ok {
return val, ok
}
// まだ前のdequeueに未消費の要素がある可能性があるため、一つ戻る
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
poolChain.popTail
func (c *poolChain) popTail() (any, bool) {
// tailを取得
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// tailをpopする前にnextポインタを読み込むことが重要
// 一般的にdは一時的に空になる可能性があるが、pop前にnextが非-nilでpopに失敗した場合、dは永続的に空であり、
// チェーンからドロップする唯一の条件
d2 := loadPoolChainElt(&d.next)
// popTailを試みる
if val, ok := d.popTail(); ok {
return val, ok
}
// tailのdequeueが空の場合
if d2 == nil {
// これが唯一のdequeue。現在は空だが、将来に向けてpushされる可能性がある
return nil, false
}
// チェーンのtailが空になったため、次のdequeueに移動
// 次のpopで空のdequeueを再度見ないように、チェーンからドロップを試みる
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 競合に勝った場合、空のdequeueをガベージコレクトできるようにprevポインタをクリア
// またpopHeadが不必要に遡らないようにする
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
poolChain.popTailは、キューの尾部から要素を削除して返すメソッドです。キューが空の場合、falseを返します。このメソッドは任意数のコンシューマーから呼び出すことができます。
重要な注意点
poolDequeueはロックフリーな固定サイズの単一プロデューサー、複数コンシューマーキューアプリケーションです。3つの主要なメソッドがあります:
pushHead(val any) bool: 要素をキューの先頭に追加します。キューが満杯の場合、falseを返します。このメソッドは単一のプロデューサーのみが呼び出すことができます。popHead() (any, bool): キューの先頭から要素を削除して返します。キューが空の場合、falseを返します。このメソッドは単一のプロデューサーのみが呼び出すことができます。popTail() (any, bool): キューの末尾から要素を削除して返します。キューが空の場合、falseを返します。このメソッドは任意数のコンシューマーから呼び出すことができます。
重要な点として、popTailによるスロットの完全な解放はアトミック操作ではありません。そのため、pushHeadは以下の2つの操作を行う必要があります:
- スロットを取得できるか確認
- popTailがスロットを解放したか確認
- pushHeadとpopHeadは同時に1つだけ占有するため、競合問題を考慮する必要はありません