brpcのbthreadについて

概要

bthreadは非同期タスクを実行するための協程(コルーチン)機能を提供します

構造

TaskControl :すべてのタスクグループTaskGroupを管理します TaskGroup:スレッド数と同じ数存在し、各スレッドに1つずつ割り当てられます。定義はtls_task_groupで、キーワード__threadが使用されています。worker_thread作成時にTaskGroupが生成され、tls_task_groupに割り当てられます。メンバーには2つのキューが含まれ、1つは_rq(WorkStealingQueue型の作業窃取キュー)、もう1つは_remote_rqです。_rqはworker_threadが追加するタスク用、_remote_rqは非作業グループスレッドが作成するタスク用です TaskMeta:TaskGroup内の1つのタスクに相当し、bthread_start_backgroundで作成され、同時にtid(bthread_t型)が取得されます RemoteTaskQueue構造は以下の通りです

WorkStealingQueue構造は以下の通りです

_topはキューの先頭、_bottomはキューの末尾を表します。capacity容量サイズは2のべき乗である必要があります。pushpopはbottom側で、stealはtop側で行われます

協程スタック

定義は以下の通りです

struct ContextualStack {
    bthread_fcontext_t context;
    StackType stacktype;
    StackStorage storage;
};

context:協程スタックの先頭を指します stacktype:スタックタイプを表す列挙型です

enum StackType {
    STACK_TYPE_MAIN = 0,
    STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD,
    STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
    STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
    STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
};

storage:スタック領域で、メンバーのbottomはスタックの底を表します 定義は以下の通りです

struct StackStorage {
     int stacksize;
     int guardsize;
    // スタックが上方向に成長すると仮定します。
    // http://www.boost.org/doc/libs/1_55_0/libs/context/doc/html/context/stack.html
    void* bottom;
    unsigned valgrind_stack_id;

    // すべてのメンバーをクリアします。
    void zeroize() {
        stacksize = 0;
        guardsize = 0;
        bottom = NULL;
        valgrind_stack_id = 0;
    }
};

ヒープ領域の割り当て

StackFactoryテンプレートクラスを通じて4種類のスタックを作成します

  • MainStackClass
  • SmallStackClass
  • NormalStackClass
  • LargeStackClass 後の3つは汎用テンプレートクラスを使用しています
template <typename StackClass> struct StackFactory {
    struct Wrapper : public ContextualStack {
        explicit Wrapper(void (*entry)(intptr_t)) {
            if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
                                       FLAGS_guard_page_size) != 0) {
                storage.zeroize();
                context = NULL;
                return;
            }
            context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
            stacktype = (StackType)StackClass::stacktype;
        }
        ~Wrapper() {
            if (context) {
                context = NULL;
                deallocate_stack_storage(&storage);
                storage.zeroize();
            }
        }
    };
    
    static ContextualStack* get_stack(void (*entry)(intptr_t)) {
        return butil::get_object<Wrapper>(entry);
    }
    
    static void return_stack(ContextualStack* sc) {
        butil::return_object(static_cast<Wrapper*>(sc));
    }
};

MainStackClassタイプはテンプレート特殊化クラスを通じて実装されています

template <> struct StackFactory<MainStackClass> {
    static ContextualStack* get_stack(void (*)(intptr_t)) {
        ContextualStack* s = new (std::nothrow) ContextualStack;
        if (NULL == s) {
            return NULL;
        }
        s->context = NULL;
        s->stacktype = STACK_TYPE_MAIN;
        s->storage.zeroize();
        return s;
    }
    
    static void return_stack(ContextualStack* s) {
        delete s;
    }
};

allocate_stack_storage

ヒープ領域を割り当て、bottomアドレスは割り当てられたヒープ領域の最後のアドレスを指し、先頭アドレスではありません

int allocate_stack_storage(StackStorage* s, int stacksize_in, int guardsize_in) {
    const static int PAGESIZE = getpagesize();
    const int PAGESIZE_M1 = PAGESIZE - 1;
    const int MIN_STACKSIZE = PAGESIZE * 2;
    const int MIN_GUARDSIZE = PAGESIZE;

    // スタックサイズをアラインします
    const int stacksize =
        (std::max(stacksize_in, MIN_STACKSIZE) + PAGESIZE_M1) &
        ~PAGESIZE_M1;

    if (guardsize_in <= 0) {
        void* mem = malloc(stacksize);
        if (NULL == mem) {
            PLOG_EVERY_SECOND(ERROR) << "mallocに失敗しました (size="
                                     << stacksize << ")";
            return -1;
        }
        s_stack_count.fetch_add(1, butil::memory_order_relaxed);
        s->bottom = (char*)mem + stacksize;
        s->stacksize = stacksize;
        s->guardsize = 0;
        if (RunningOnValgrind()) {
            s->valgrind_stack_id = VALGRIND_STACK_REGISTER(
                s->bottom, (char*)s->bottom - stacksize);
        } else {
            s->valgrind_stack_id = 0;
        }
        return 0;
    } else {
        // ガードサイズをアラインします
        const int guardsize =
            (std::max(guardsize_in, MIN_GUARDSIZE) + PAGESIZE_M1) &
            ~PAGESIZE_M1;

        const int memsize = stacksize + guardsize;
        void* const mem = mmap(NULL, memsize, (PROT_READ | PROT_WRITE),
                               (MAP_PRIVATE | MAP_ANONYMOUS), -1, 0);

        if (MAP_FAILED == mem) {
            PLOG_EVERY_SECOND(ERROR) 
                << "mmapに失敗しました size=" << memsize << " stack_count="
                << s_stack_count.load(butil::memory_order_relaxed)
                << ", /proc/sys/vm/max_map_countの制限による可能性があります";
            // max_map_countの制限(デフォルト65536)による失敗の可能性があります
            return -1;
        }

        void* aligned_mem = (void*)(((intptr_t)mem + PAGESIZE_M1) & ~PAGESIZE_M1);
        if (aligned_mem != mem) {
            LOG_ONCE(ERROR) << "addr=" << mem << " mmapによって返されたアドレスは"
                "pagesize=" << PAGESIZE "でアラインされていません";
        }
        const int offset = (char*)aligned_mem - (char*)mem;
        if (guardsize <= offset ||
            mprotect(aligned_mem, guardsize - offset, PROT_NONE) != 0) {
            munmap(mem, memsize);
            PLOG_EVERY_SECOND(ERROR) 
                << "mprotectに失敗しました " << (void*)aligned_mem << " length="
                << guardsize - offset; 
            return -1;
        }

        s_stack_count.fetch_add(1, butil::memory_order_relaxed);
        s->bottom = (char*)mem + memsize;
        s->stacksize = stacksize;
        s->guardsize = guardsize;
        if (RunningOnValgrind()) {
            s->valgrind_stack_id = VALGRIND_STACK_REGISTER(
                s->bottom, (char*)s->bottom - stacksize);
        } else {
            s->valgrind_stack_id = 0;
        }
        return 0;
    }
}

協程スタックの作成

bthread_fcontext_t BTHREAD_CONTEXT_CALL_CONVENTION
bthread_make_fcontext(void* sp, size_t size, void (* fn)( intptr_t));

実装はアセンブリコードを使用しており、アセンブリではまず関数パラメータとレジスタの関係を見てみましょう:

%rdi 第1パラメータ
%rsi 第2パラメータ
%rdx 第3パラメータ
%rcx 第4パラメータ

Linux x86_64のアセンブリは以下の通りです

#if defined(BTHREAD_CONTEXT_PLATFORM_linux_x86_64) && defined(BTHREAD_CONTEXT_COMPILER_gcc)
__asm (
".text\n"
".globl bthread_make_fcontext\n"
".type bthread_make_fcontext,@function\n"
".align 16\n"
"bthread_make_fcontext:\n"
"    movq  %rdi, %rax\n"
"    andq  $-16, %rax\n"
"    leaq  -0x48(%rax), %rax\n"
"    movq  %rdx, 0x38(%rax)\n"
"    stmxcsr  (%rax)\n"
"    fnstcw   0x4(%rax)\n"
"    leaq  finish(%rip), %rcx\n"
"    movq  %rcx, 0x40(%rax)\n"
"    ret \n"
"finish:\n"
"    xorq  %rdi, %rdi\n"
"    call  _exit@PLT\n"
"    hlt\n"
".size bthread_make_fcontext,.-bthread_make_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
".previous\n"
);

rdiはbottomに対応し、まず16バイトをアラインし、その後72バイトを下げます。(rax+56)アドレスにentry関数アドレスが格納されます 図示形式は以下の通りです

bottom:StackStorageのbottom、つまり割り当てられたヒープの最後のアドレス(mem+stack_size) bthread_make_fcontextが返すアドレスはヒープの最後のアドレス-72

協程スタックのジャンプ

intptr_t BTHREAD_CONTEXT_CALL_CONVENTION
bthread_jump_fcontext(bthread_fcontext_t * ofc, bthread_fcontext_t nfc,
                      intptr_t vp, bool preserve_fpu = false);

この関数は以下のように呼び出されます

inline void jump_stack(ContextualStack* from, ContextualStack* to) {
    bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}

bthread_jump_fcontextの実装はアセンブリです

#if defined(BTHREAD_CONTEXT_PLATFORM_linux_x86_64) && defined(BTHREAD_CONTEXT_COMPILER_gcc)
__asm (
".text\n"
".globl bthread_jump_fcontext\n"
".type bthread_jump_fcontext,@function\n"
".align 16\n"
"bthread_jump_fcontext:\n"
"    pushq  %rbp  \n"
"    pushq  %rbx  \n"
"    pushq  %r15  \n"
"    pushq  %r14  \n"
"    pushq  %r13  \n"
"    pushq  %r12  \n"
"    leaq  -0x8(%rsp), %rsp\n"
"    cmp  $0, %rcx\n"
"    je  1f\n"
"    stmxcsr  (%rsp)\n"
"    fnstcw   0x4(%rsp)\n"
"1:\n"
"    movq  %rsp, (%rdi)\n"  //rdiは関数の第1パラメータofc、rspをofcに保存
"    movq  %rsi, %rsp\n" //rsiは第2パラメータ、nfcをrspに保存
"    cmp  $0, %rcx\n"
"    je  2f\n"
"    ldmxcsr  (%rsp)\n"
"    fldcw  0x4(%rsp)\n"
"2:\n"
"    leaq  0x8(%rsp), %rsp\n"
"    popq  %r12  \n"
"    popq  %r13  \n"
"    popq  %r14  \n"
"    popq  %r15  \n"
"    popq  %rbx  \n"
"    popq  %rbp  \n"
"    popq  %r8\n"
"    movq  %rdx, %rax\n"
"    movq  %rdx, %rdi\n"
"    jmp  *%r8\n"
".size bthread_jump_fcontext,.-bthread_jump_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
".previous\n"
);

ジャンプの図は以下の通りです

r8レジスタは協程コンテキスト作成時に指定されたfn関数、つまりtask_runnerです

スタックの取得

get_stack関数を通じて、5種類のタイプがあります

  • STACK_TYPE_PTHREAD:ネイティブのスレッドスタック
  • STACK_TYPE_SMALL
  • STACK_TYPE_NORMAL
  • STACK_TYPE_LARGE
  • STACK_TYPE_MAIN
inline ContextualStack* get_stack(StackType type, void (*entry)(intptr_t)) {
    switch (type) {
    case STACK_TYPE_PTHREAD:
        return NULL;
    case STACK_TYPE_SMALL:
        return StackFactory<SmallStackClass>::get_stack(entry);
    case STACK_TYPE_NORMAL:
        return StackFactory<NormalStackClass>::get_stack(entry);
    case STACK_TYPE_LARGE:
        return StackFactory<LargeStackClass>::get_stack(entry);
    case STACK_TYPE_MAIN:
        return StackFactory<MainStackClass>::get_stack(entry);
    }
    return NULL;
}

タスクの待機

時系列は以下の通りです

bool TaskGroup::wait_task(bthread_t* tid) {
    do {
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
        if (_last_pl_state.stopped()) {
            return false;
        }
        _pl->wait(_last_pl_state);
        if (steal_task(tid)) {
            return true;
        }
#else
        const ParkingLot::State st = _pl->get_state();
        if (st.stopped()) {
            return false;
        }
        if (steal_task(tid)) {
            return true;
        }
        _pl->wait(st);
#endif
    } while (true);
}

bool steal_task(bthread_t* tid) {
        if (_remote_rq.pop(tid)) {
            return true;
        }
#ifndef BTHREAD_DONT_SAVE_PARKING_STATE
        _last_pl_state = _pl->get_state();
#endif
        return _control->steal_task(tid, &_steal_seed, _steal_offset);
    }

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    auto tag = tls_task_group->tag();
    // 1: Acquiring fenceは_add_groupのreleasing fenceとペアになっており、
    // _groupsの初期化されていないスロットへのアクセスを避けるために使用されます。
    const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
        return false;
    }

    // 注意:forループ内でreturnしないようにしてください。seedを更新する必要があるためです
    bool stolen = false;
    size_t s = *seed;
    auto& groups = tag_group(tag);
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = groups[s % ngroup];
        // gは_concurrent _destroy_groupによる可能性があるためNULLになる可能性があります
        if (g) {
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}

メインスタックの初期化

TaskGroup::initで初期化が完了します

int TaskGroup::init(size_t runqueue_capacity) {
    if (_rq.init(runqueue_capacity) != 0) {
        LOG(FATAL) << "Fail to init _rq";
        return -1;
    }
    if (_remote_rq.init(runqueue_capacity / 2) != 0) {
        LOG(FATAL) << "Fail to init _remote_rq";
        return -1;
    }
    ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
    if (NULL == stk) {
        LOG(FATAL) << "Fail to get main stack container";
        return -1;
    }
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
    if (NULL == m) {
        LOG(FATAL) << "Fail to get TaskMeta";
        return -1;
    }
    m->sleep_failed = false;
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = NULL;
    m->arg = NULL;
    m->local_storage = LOCAL_STORAGE_INIT;
    m->cpuwide_start_ns = butil::cpuwide_time_ns();
    m->stat = EMPTY_STAT;
    m->attr = BTHREAD_ATTR_TASKGROUP;
    m->tid = make_tid(*m->version_butex, slot);
    m->set_stack(stk);

    _cur_meta = m;
    _main_tid = m->tid;
    _main_stack = stk;
    _last_run_ns = butil::cpuwide_time_ns();
    return 0;
}

タスクスケジューリング

sched_toを通じて行われます。協程スタックが割り当てられていない場合は、まず協程スタックを割り当てます。現在のタスクメタデータTaskMetaと実行対象のタスクメタデータが異なる場合は、実行対象のタスクメタデータを現在のタスクグループTaskGroupの現在のタスクメタデータに割り当て、対応するタスクにスケジュールしますtask_runnertask_runnerは実行時に、前のタスク実行後の_last_context_remainedを実行してから、タスクに設定された実行関数を実行し、set_remainedを設定します。ending_schedでスケジュールを終了するとき、キューにタスクがあるかどうかを確認し、あれば引き続きsched_toします

static void sched_to(TaskGroup** pg, bthread_t next_tid);

inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
    TaskMeta* next_meta = address_meta(next_tid);
    if (next_meta->stack == NULL) {
        ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
        if (stk) {
            next_meta->set_stack(stk);
        } else {
            // stack_typeはBTHREAD_STACKTYPE_PTHREADまたはメモリ不足の場合、
            // 後者の場合、attrは強制的にBTHREAD_STACKTYPE_PTHREADになります。
            // これは基本的にスタックを割り当てられない場合、タスクをpthreadで直接実行することを意味します。
            next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
            next_meta->set_stack((*pg)->_main_stack);
        }
    }
    // wait_taskがyieldした場合にのみnow_nsを更新します。
    sched_to(pg, next_meta);
}


void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
    TaskGroup* g = *pg;
#ifndef NDEBUG
    if ((++g->_sched_recursive_guard) > 1) {
        LOG(FATAL) << "Recursively(" << g->_sched_recursive_guard - 1
                   << ") call sched_to(" << g << ")";
    }
#endif
    // errnoを保存して、errnoがbthread固有になるようにします。
    const int saved_errno = errno;
    void* saved_unique_user_ptr = tls_unique_user_ptr;

    TaskMeta* const cur_meta = g->_cur_meta;
    const int64_t now = butil::cpuwide_time_ns();
    const int64_t elp_ns = now - g->_last_run_ns;
    g->_last_run_ns = now;
    cur_meta->stat.cputime_ns += elp_ns;
    if (cur_meta->tid != g->main_tid()) {
        g->_cumulated_cputime_ns += elp_ns;
    }
    ++cur_meta->stat.nswitch;
    ++ g->_nswitch;
    // タスクに切り替えます
    if (__builtin_expect(next_meta != cur_meta, 1)) {
        g->_cur_meta = next_meta;
        // tls_blsを切り替えます
        cur_meta->local_storage = tls_bls;
        tls_bls = next_meta->local_storage;

        // ロギングはローカルストレージを切り替えた後に実行する必要があります。ロギングライブラリは
        // 内部的にbthreadローカルストレージを使用しているため、そうしないとメモリリークが発生します。
        if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
            (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
            LOG(INFO) << "bthreadを切り替えます: " << cur_meta->tid << " -> "
                      << next_meta->tid;
        }

        if (cur_meta->stack != NULL) {
            if (next_meta->stack != cur_meta->stack) {
                jump_stack(cur_meta->stack, next_meta->stack);
                // 別のグループに移動した可能性があるため、gを再割り当てする必要があります。
                g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
            }
#ifndef NDEBUG
            else {
                // そうでない場合、pthread_taskは別のpthread_taskに切り替わります。sc
                // は両方が_main_stackの場合にのみ等しくなります
                CHECK(cur_meta->stack == g->_main_stack);
            }
#endif
        }
        // そうでない場合、pthread_taskからpthread_taskへの切り替えは、
        // ending_sched(pthread_task->pthread_taskを含む)によるものです
    } else {
        LOG(FATAL) << "bthread=" << g->current_tid() << " 自身にsched_to!";
    }

    while (g->_last_context_remained) {
        RemainedFn fn = g->_last_context_remained;
        g->_last_context_remained = NULL;
        fn(g->_last_context_remained_arg);
        g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
    }

    // errnoを復元します
    errno = saved_errno;
    // tls_unique_user_ptrは変更されている可能性があります。
    BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_unique_user_ptr, saved_unique_user_ptr);

#ifndef NDEBUG
    --g->_sched_recursive_guard;
#endif
    *pg = g;
}

void TaskGroup::task_runner(intptr_t skip_remained) {
    // 注意:tls_task_groupは揮発的です。タスクが異なるグループ間で移動されるためです。
    TaskGroup* g = tls_task_group;

    if (!skip_remained) {
        while (g->_last_context_remained) {
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
        }

#ifndef NDEBUG
        --g->_sched_recursive_guard;
#endif
    }

    do {
        // タスクが実行される前に停止される可能性があり、その場合
        // ユーザー関数をスキップするかもしれませんが、ユーザーを混乱させる可能性があります:
        // ほとんどのタスクには、タスクの実行結果を記憶する変数があり、
        // これは通常成功を示す値で初期化されます。ユーザー関数が
        // 呼び出されない場合、変数は変更されませんが、タスクが異常終了した場合
        // 失敗を反映する必要があります。

        // この実行中、タスクのメタデータと識別子は永続的です。
        TaskMeta* const m = g->_cur_meta;

        if (FLAGS_show_bthread_creation_in_vars) {
            // 注意:公開遅延時間をトリガーするスレッドは、
            // 単一のbvar::LatencyRecorderが多くのbvarを含むため、
            // 相当な時間を費やす可能性があります。
            g->_control->exposed_pending_time() <<
                (butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L;
        }

        // ExitExceptionを除いて例外をキャッチしません。これはbthread_exit()を実装するためです。
        // ユーザーコードは明示的にキャッチされない例外が発生した場合にクラッシュするように設計されています。
        // これは他のスレッディングライブラリと一貫しています。
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }

        // TODO: thread_returnを保存します
        (void)thread_return;

        // ロギングはkeytableを返す前に実行する必要があります。ロギングライブラリは
        // 内部的にbthreadローカルストレージを使用しているため、そうしないとメモリリークが発生します。
        // FIXME: fnを終了してからここに到達するまでの時間はcputimeにカウントされません
        if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) {
            LOG(INFO) << "bthread " << m->tid << "が終了しました, cputime="
                      << m->stat.cputime_ns / 1000000.0 << "ms";
        }

        // tls変数をクリーンアップします。version_butexを変更する前に実行する必要があります。
        // そうしないと、このスレッドに参加した他のスレッドは、
        // tls変数の破棄による副作用を見ることができません。
        KeyTable* kt = tls_bls.keytable;
        if (kt != NULL) {
            return_keytable(m->attr.keytable_pool, kt);
            // 削除後:tlsは削除中に設定される可能性があります。
            tls_bls.keytable = NULL;
            m->local_storage.keytable = NULL; // オプション
        }

        // 関数の実行中とKeyTableの削除中に、グループが変更される可能性があります。
        g =  BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);

        // バージョンをインクリメントし、すべてのjoinerをウェイクアップします。結果のバージョンが
        // 0の場合、bthread_tが0にならないように1に変更します。バージョン変更後の
        // bthreadへのアクセスまたはjoinは拒否されます。
        // TaskGroup::get_attrの可視性のためにスピンロックを使用します。
        {
            BAIDU_SCOPED_LOCK(m->version_lock);
            if (0 == ++*m->version_butex) {
                ++*m->version_butex;
            }
        }
        butex_wake_except(m->version_butex, 0);

        g->_control->_nbthreads << -1;
        g->_control->tag_nbthreads(g->tag()) << -1;
        g->set_remained(TaskGroup::_release_last_context, m);
        ending_sched(&g);

    } while (g->_cur_meta->tid != g->_main_tid);

    // pthreadから呼び出され、BTHREAD_STACKTYPE_PTHREADタスクを実行するものがない場合、
    // もっとタスクを待ちます。
}

void TaskGroup::ending_sched(TaskGroup** pg) {
    TaskGroup* g = *pg;
    bthread_t next_tid = 0;
    // 次に実行するタスクを見つけます。なければ、グループのアイドルスレッドに切り替えます。
#ifndef BTHREAD_FAIR_WSQ
    // BTHREAD_FAIR_WSQが定義されている場合、プロファイリングはexample/multi_threaded_echo_c++の
    // WSQ::steal()のCPUコストが1.9%から2.9%に変化することを示しています
    const bool popped = g->_rq.pop(&next_tid);
#else
    const bool popped = g->_rq.steal(&next_tid);
#endif
    if (!popped && !g->steal_task(&next_tid)) {
        // 実行するタスクがない場合は、メインタスクにジャンプします。
        next_tid = g->_main_tid;
    }

    TaskMeta* const cur_meta = g->_cur_meta;
    TaskMeta* next_meta = address_meta(next_tid);
    if (next_meta->stack == NULL) {
        if (next_meta->stack_type() == cur_meta->stack_type()) {
            // pthread_taskからpthread_taskへのスケジュールにも機能します。転送されたスタックは
            // 単に_main_stackです。
            next_meta->set_stack(cur_meta->release_stack());
        } else {
            ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
            if (stk) {
                next_meta->set_stack(stk);
            } else {
                // stack_typeはBTHREAD_STACKTYPE_PTHREADまたはメモリ不足の場合、
                // 後者の場合、attrは強制的にBTHREAD_STACKTYPE_PTHREADになります。
                // これは基本的にスタックを割り当てられない場合、タスクをpthreadで直接実行することを意味します。
                next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
                next_meta->set_stack(g->_main_stack);
            }
        }
    }
    sched_to(pg, next_meta);
}

タグ: bRPC bthread coroutine Asynchronous task-scheduling

6月28日 20:29 投稿