서버 개발/멀티쓰레드 프로그래밍

Epoch-based Reclamation (EBR, 시기 기반 메모리 회수)

지노윈 2025. 9. 18. 10:58
반응형

멀티스레드 환경에서 lock-free 자료구조를 쓰면, 메모리 해제가 큰 문제가 됩니다.

어떤 스레드가 노드를 제거했다고 해도, 다른 스레드가 여전히 그 노드를 참조 중일 수 있습니다. 이때 free()를 즉시 해버리면, 참조 중인 스레드가 잘못된 메모리에 접근(Use-After-Free)해서 프로그램이 망가집니다.

이를 해결하기 위해 EBR(Epoch-based Reclamation)모든 스레드가 특정 "안전 시점(Epoch)"을 지나야 메모리를 해제할 수 있도록 관리합니다.

 

시스템 전체에 Epoch(시기)라는 전역 카운터를 둡니다.

각 스레드는 자신이 현재 어떤 Epoch에서 작업 중인지 기록합니다.

메모리를 해제할 때 즉시 free하지 않고, “현재 Epoch에 묶어서” Retire List에 넣어둡니다.

모든 스레드가 해당 Epoch을 지나서 더 이상 그 노드를 참조할 수 없는 상태가 되면, 그제서야 안전하게 free.

 

Epoch 관리

  • 전역 Epoch 변수를 운영(예: 0, 1, 2...).
  • 스레드가 lock-free 자료구조에 들어갈 때 “나 지금 Epoch N에 있음”을 기록.

노드 제거 시

  • free하지 않고 “Epoch N에서 제거된 노드”를 Retire List에 넣음.

안전 확인

  • Retire List에 있는 노드의 Epoch를 확인.
  • 모든 스레드가 이미 Epoch N을 벗어났다면 → 그 노드를 free.

장점

  • 간단하고 빠름: Hazard Pointer처럼 매번 포인터 보호 등록을 하지 않아도 됨.
  • 락 불필요: lock-free 자료구조의 장점을 그대로 유지.
  • 일괄 해제 가능: 특정 시점(Epoch)이 끝나면 여러 노드를 한꺼번에 해제.

단점

  • 지연된 메모리 해제: 스레드가 오랫동안 한 Epoch에 머무르면 Retire List가 계속 쌓임.
  • 긴 꼬리 문제: 특정 스레드가 느리게 동작하면(예: preemption), 메모리 회수가 늦어짐.
  • Hazard Pointer보다 메모리 사용량이 커질 수 있음 (Retire List가 커짐).
// g++ -std=c++17 -O2 -pthread ebr_stack_demo.cpp && ./a.out
#include <atomic>
#include <thread>
#include <vector>
#include <unordered_set>
#include <optional>
#include <iostream>
#include <cassert>

namespace ebr {

// ===== 설정 =====
constexpr size_t MAX_THREADS = 128;     // 참여 가능한 최대 스레드 수
constexpr size_t RECLAIM_THRESHOLD = 64; // retire 누적 후 스캔 임계치

// ===== 전역 Epoch & 스레드 등록 =====
static std::atomic<size_t> g_epoch{0};      // 전역 epoch (단조 증가)
static std::atomic<size_t> g_next_id{0};    // 스레드 슬롯 할당기

// 각 스레드의 활동/선언 epoch 테이블
static std::atomic<bool>   g_active[MAX_THREADS];
static std::atomic<size_t> g_announced[MAX_THREADS]; // active일 때 의미

struct Retired {
    void* p;
    void (*deleter)(void*);
    size_t retire_epoch;
};

// 스레드별 컨텍스트
class ThreadCtx {
public:
    ThreadCtx() {
        id_ = g_next_id.fetch_add(1, std::memory_order_relaxed);
        if (id_ >= MAX_THREADS) {
            std::cerr << "Too many threads for EBR\n";
            std::terminate();
        }
        g_active[id_].store(false, std::memory_order_relaxed);
        g_announced[id_].store(0, std::memory_order_relaxed);
    }

    ~ThreadCtx() {
        // 남은 것 최대한 회수
        scan_and_reclaim();
        // 비활성 표기
        g_active[id_].store(false, std::memory_order_release);
    }

    // ===== EBR 진입/이탈 =====
    inline void enter() {
        g_active[id_].store(true, std::memory_order_release);
        // 현재 전역 epoch을 읽어 선언
        size_t e = g_epoch.load(std::memory_order_acquire);
        g_announced[id_].store(e, std::memory_order_release);
    }

    inline void exit() {
        // 이 스레드는 더 이상 보호 구간에 없음
        g_active[id_].store(false, std::memory_order_release);
    }

    // ===== retire / reclaim =====
    inline void retire(void* p, void (*deleter)(void*)) {
        size_t e = g_epoch.load(std::memory_order_acquire);
        retired_.push_back({p, deleter, e});
        if (retired_.size() >= RECLAIM_THRESHOLD) scan_and_reclaim();
    }

    // 모든 활동 스레드가 최소 어느 epoch 이상을 **지나갔는지** 확인
    // retire_epoch < safe_epoch 이면 삭제 가능
    void scan_and_reclaim() {
        try_advance_global_epoch(); // 가능하면 전역 epoch 전진

        size_t safe_epoch = min_active_epoch();
        // 관례상 retire_epoch < safe_epoch 인 것들을 해제
        std::vector<Retired> keep;
        keep.reserve(retired_.size());
        for (auto& r : retired_) {
            if (r.retire_epoch < safe_epoch) {
                r.deleter(r.p);
            } else {
                keep.push_back(r);
            }
        }
        retired_.swap(keep);
    }

private:
    size_t id_{0};
    std::vector<Retired> retired_;

    // 현재 활동(active=true) 중인 스레드들의 announced epoch 중 최솟값
    static size_t min_active_epoch() {
        size_t min_e = SIZE_MAX;
        bool any_active = false;
        for (size_t i = 0; i < MAX_THREADS; ++i) {
            if (g_active[i].load(std::memory_order_acquire)) {
                any_active = true;
                size_t e = g_announced[i].load(std::memory_order_acquire);
                if (e < min_e) min_e = e;
            }
        }
        if (!any_active) {
            // 아무도 활동 중이 아니면 전부 지나감 → 전역 epoch+1을 안전선으로
            return g_epoch.load(std::memory_order_acquire) + 1;
        }
        return min_e;
    }

    // 모든 활동 스레드가 **현재 전역 epoch 이상**을 선언했으면 전역 epoch 한 칸 전진
    static void try_advance_global_epoch() {
        size_t cur = g_epoch.load(std::memory_order_acquire);
        for (size_t i = 0; i < MAX_THREADS; ++i) {
            if (g_active[i].load(std::memory_order_acquire)) {
                size_t e = g_announced[i].load(std::memory_order_acquire);
                if (e < cur) {
                    return; // 아직 못 따라온 스레드가 있음
                }
            }
        }
        // 모두 따라왔으면 전진
        g_epoch.fetch_add(1, std::memory_order_acq_rel);
    }
};

// 스레드 로컬
inline thread_local ThreadCtx tls;

// 편의 래퍼
inline void enter() { tls.enter(); }
inline void exit()  { tls.exit(); }
inline void retire(void* p, void (*deleter)(void*)) { tls.retire(p, deleter); }
inline void scan() { tls.scan_and_reclaim(); }

} // namespace ebr

// ===== Treiber Stack with EBR =====
template <typename T>
class LockFreeStack {
    struct Node {
        T value;
        Node* next;
        explicit Node(T v) : value(std::move(v)), next(nullptr) {}
    };

    std::atomic<Node*> head_{nullptr};

    static void delete_node_void(void* p) {
        delete static_cast<Node*>(p);
    }

public:
    void push(T v) {
        auto* n = new Node(std::move(v));
        n->next = head_.load(std::memory_order_relaxed);
        while (!head_.compare_exchange_weak(
            n->next, n,
            std::memory_order_release,
            std::memory_order_relaxed)) {}
    }

    // pop: 성공 시 값, 실패(빈 스택) 시 nullopt
    std::optional<T> pop() {
        ebr::enter(); // === 보호 구간 진입 ===
        Node* h = head_.load(std::memory_order_acquire);
        while (true) {
            if (!h) { ebr::exit(); return std::nullopt; }
            Node* next = h->next;
            if (head_.compare_exchange_weak(
                    h, next,
                    std::memory_order_acq_rel,
                    std::memory_order_acquire)) {
                // 이 시점부터 h는 스택에서 분리됨 → retire로 지연 해제
                T out = std::move(h->value);
                ebr::exit(); // === 보호 구간 이탈 ===
                ebr::retire(static_cast<void*>(h), &delete_node_void);
                return out;
            }
            // CAS 실패 시 h 가 업데이트되어 돌아오므로 루프 계속
        }
    }

    // 테스트용: 가능한 한 다 꺼내기
    std::vector<T> drain() {
        std::vector<T> res;
        for (;;) {
            auto v = pop();
            if (!v) break;
            res.push_back(std::move(*v));
        }
        return res;
    }
};

// ===== 간단 테스트 =====
int main() {
    LockFreeStack<int> s;

    const int producers = 4;
    const int per_producer = 10000;

    std::vector<std::thread> ths;
    ths.reserve(producers);

    for (int p = 0; p < producers; ++p) {
        ths.emplace_back([&, base = p * per_producer] {
            for (int i = 0; i < per_producer; ++i) {
                s.push(base + i);
                if ((i & 255) == 0) ebr::scan(); // 가끔 회수 시도
            }
        });
    }

    std::atomic<bool> done{false};
    std::thread consumer([&] {
        size_t popped = 0;
        const size_t total = size_t(producers) * per_producer;
        while (!done.load(std::memory_order_acquire) || popped < total) {
            auto v = s.pop();
            if (v) ++popped;
            if ((popped & 1023) == 0) ebr::scan();
        }
        ebr::scan();
        std::cout << "Popped total: " << popped << "\n";
    });

    for (auto& t : ths) t.join();
    done.store(true, std::memory_order_release);
    consumer.join();

    auto rest = s.drain();
    std::cout << "Remaining after drain: " << rest.size() << "\n";

    ebr::scan(); // 마지막 정리
    return 0;
}
반응형