반응형
멀티스레드 환경에서 lock-free 자료구조를 쓰면, 메모리 해제가 큰 문제가 됩니다.
어떤 스레드가 노드를 제거했다고 해도, 다른 스레드가 여전히 그 노드를 참조 중일 수 있습니다. 이때 free()를 즉시 해버리면, 참조 중인 스레드가 잘못된 메모리에 접근(Use-After-Free)해서 프로그램이 망가집니다.
이를 해결하기 위해 EBR(Epoch-based Reclamation)은 모든 스레드가 특정 "안전 시점(Epoch)"을 지나야 메모리를 해제할 수 있도록 관리합니다.
시스템 전체에 Epoch(시기)라는 전역 카운터를 둡니다.
각 스레드는 자신이 현재 어떤 Epoch에서 작업 중인지 기록합니다.
메모리를 해제할 때 즉시 free하지 않고, “현재 Epoch에 묶어서” Retire List에 넣어둡니다.
모든 스레드가 해당 Epoch을 지나서 더 이상 그 노드를 참조할 수 없는 상태가 되면, 그제서야 안전하게 free.
Epoch 관리
- 전역 Epoch 변수를 운영(예: 0, 1, 2...).
- 스레드가 lock-free 자료구조에 들어갈 때 “나 지금 Epoch N에 있음”을 기록.
노드 제거 시
- free하지 않고 “Epoch N에서 제거된 노드”를 Retire List에 넣음.
안전 확인
- Retire List에 있는 노드의 Epoch를 확인.
- 모든 스레드가 이미 Epoch N을 벗어났다면 → 그 노드를 free.
장점
- 간단하고 빠름: Hazard Pointer처럼 매번 포인터 보호 등록을 하지 않아도 됨.
- 락 불필요: lock-free 자료구조의 장점을 그대로 유지.
- 일괄 해제 가능: 특정 시점(Epoch)이 끝나면 여러 노드를 한꺼번에 해제.
단점
- 지연된 메모리 해제: 스레드가 오랫동안 한 Epoch에 머무르면 Retire List가 계속 쌓임.
- 긴 꼬리 문제: 특정 스레드가 느리게 동작하면(예: preemption), 메모리 회수가 늦어짐.
- Hazard Pointer보다 메모리 사용량이 커질 수 있음 (Retire List가 커짐).
// g++ -std=c++17 -O2 -pthread ebr_stack_demo.cpp && ./a.out
#include <atomic>
#include <thread>
#include <vector>
#include <unordered_set>
#include <optional>
#include <iostream>
#include <cassert>
namespace ebr {
// ===== 설정 =====
constexpr size_t MAX_THREADS = 128; // 참여 가능한 최대 스레드 수
constexpr size_t RECLAIM_THRESHOLD = 64; // retire 누적 후 스캔 임계치
// ===== 전역 Epoch & 스레드 등록 =====
static std::atomic<size_t> g_epoch{0}; // 전역 epoch (단조 증가)
static std::atomic<size_t> g_next_id{0}; // 스레드 슬롯 할당기
// 각 스레드의 활동/선언 epoch 테이블
static std::atomic<bool> g_active[MAX_THREADS];
static std::atomic<size_t> g_announced[MAX_THREADS]; // active일 때 의미
struct Retired {
void* p;
void (*deleter)(void*);
size_t retire_epoch;
};
// 스레드별 컨텍스트
class ThreadCtx {
public:
ThreadCtx() {
id_ = g_next_id.fetch_add(1, std::memory_order_relaxed);
if (id_ >= MAX_THREADS) {
std::cerr << "Too many threads for EBR\n";
std::terminate();
}
g_active[id_].store(false, std::memory_order_relaxed);
g_announced[id_].store(0, std::memory_order_relaxed);
}
~ThreadCtx() {
// 남은 것 최대한 회수
scan_and_reclaim();
// 비활성 표기
g_active[id_].store(false, std::memory_order_release);
}
// ===== EBR 진입/이탈 =====
inline void enter() {
g_active[id_].store(true, std::memory_order_release);
// 현재 전역 epoch을 읽어 선언
size_t e = g_epoch.load(std::memory_order_acquire);
g_announced[id_].store(e, std::memory_order_release);
}
inline void exit() {
// 이 스레드는 더 이상 보호 구간에 없음
g_active[id_].store(false, std::memory_order_release);
}
// ===== retire / reclaim =====
inline void retire(void* p, void (*deleter)(void*)) {
size_t e = g_epoch.load(std::memory_order_acquire);
retired_.push_back({p, deleter, e});
if (retired_.size() >= RECLAIM_THRESHOLD) scan_and_reclaim();
}
// 모든 활동 스레드가 최소 어느 epoch 이상을 **지나갔는지** 확인
// retire_epoch < safe_epoch 이면 삭제 가능
void scan_and_reclaim() {
try_advance_global_epoch(); // 가능하면 전역 epoch 전진
size_t safe_epoch = min_active_epoch();
// 관례상 retire_epoch < safe_epoch 인 것들을 해제
std::vector<Retired> keep;
keep.reserve(retired_.size());
for (auto& r : retired_) {
if (r.retire_epoch < safe_epoch) {
r.deleter(r.p);
} else {
keep.push_back(r);
}
}
retired_.swap(keep);
}
private:
size_t id_{0};
std::vector<Retired> retired_;
// 현재 활동(active=true) 중인 스레드들의 announced epoch 중 최솟값
static size_t min_active_epoch() {
size_t min_e = SIZE_MAX;
bool any_active = false;
for (size_t i = 0; i < MAX_THREADS; ++i) {
if (g_active[i].load(std::memory_order_acquire)) {
any_active = true;
size_t e = g_announced[i].load(std::memory_order_acquire);
if (e < min_e) min_e = e;
}
}
if (!any_active) {
// 아무도 활동 중이 아니면 전부 지나감 → 전역 epoch+1을 안전선으로
return g_epoch.load(std::memory_order_acquire) + 1;
}
return min_e;
}
// 모든 활동 스레드가 **현재 전역 epoch 이상**을 선언했으면 전역 epoch 한 칸 전진
static void try_advance_global_epoch() {
size_t cur = g_epoch.load(std::memory_order_acquire);
for (size_t i = 0; i < MAX_THREADS; ++i) {
if (g_active[i].load(std::memory_order_acquire)) {
size_t e = g_announced[i].load(std::memory_order_acquire);
if (e < cur) {
return; // 아직 못 따라온 스레드가 있음
}
}
}
// 모두 따라왔으면 전진
g_epoch.fetch_add(1, std::memory_order_acq_rel);
}
};
// 스레드 로컬
inline thread_local ThreadCtx tls;
// 편의 래퍼
inline void enter() { tls.enter(); }
inline void exit() { tls.exit(); }
inline void retire(void* p, void (*deleter)(void*)) { tls.retire(p, deleter); }
inline void scan() { tls.scan_and_reclaim(); }
} // namespace ebr
// ===== Treiber Stack with EBR =====
template <typename T>
class LockFreeStack {
struct Node {
T value;
Node* next;
explicit Node(T v) : value(std::move(v)), next(nullptr) {}
};
std::atomic<Node*> head_{nullptr};
static void delete_node_void(void* p) {
delete static_cast<Node*>(p);
}
public:
void push(T v) {
auto* n = new Node(std::move(v));
n->next = head_.load(std::memory_order_relaxed);
while (!head_.compare_exchange_weak(
n->next, n,
std::memory_order_release,
std::memory_order_relaxed)) {}
}
// pop: 성공 시 값, 실패(빈 스택) 시 nullopt
std::optional<T> pop() {
ebr::enter(); // === 보호 구간 진입 ===
Node* h = head_.load(std::memory_order_acquire);
while (true) {
if (!h) { ebr::exit(); return std::nullopt; }
Node* next = h->next;
if (head_.compare_exchange_weak(
h, next,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
// 이 시점부터 h는 스택에서 분리됨 → retire로 지연 해제
T out = std::move(h->value);
ebr::exit(); // === 보호 구간 이탈 ===
ebr::retire(static_cast<void*>(h), &delete_node_void);
return out;
}
// CAS 실패 시 h 가 업데이트되어 돌아오므로 루프 계속
}
}
// 테스트용: 가능한 한 다 꺼내기
std::vector<T> drain() {
std::vector<T> res;
for (;;) {
auto v = pop();
if (!v) break;
res.push_back(std::move(*v));
}
return res;
}
};
// ===== 간단 테스트 =====
int main() {
LockFreeStack<int> s;
const int producers = 4;
const int per_producer = 10000;
std::vector<std::thread> ths;
ths.reserve(producers);
for (int p = 0; p < producers; ++p) {
ths.emplace_back([&, base = p * per_producer] {
for (int i = 0; i < per_producer; ++i) {
s.push(base + i);
if ((i & 255) == 0) ebr::scan(); // 가끔 회수 시도
}
});
}
std::atomic<bool> done{false};
std::thread consumer([&] {
size_t popped = 0;
const size_t total = size_t(producers) * per_producer;
while (!done.load(std::memory_order_acquire) || popped < total) {
auto v = s.pop();
if (v) ++popped;
if ((popped & 1023) == 0) ebr::scan();
}
ebr::scan();
std::cout << "Popped total: " << popped << "\n";
});
for (auto& t : ths) t.join();
done.store(true, std::memory_order_release);
consumer.join();
auto rest = s.drain();
std::cout << "Remaining after drain: " << rest.size() << "\n";
ebr::scan(); // 마지막 정리
return 0;
}
반응형
'서버 개발 > 멀티쓰레드 프로그래밍' 카테고리의 다른 글
ABA 문제와 사례 (0) | 2025.09.18 |
---|---|
대표적인 락/동기화 도구 : Event, Semaphore, Condition Variable, Interlocked 함수, Spin Lock, Slim Tools (0) | 2025.09.18 |
SRWLock / Critical Section / Mutex 장단점 비교 (0) | 2025.09.18 |
위험 포인터(Hazard Pointer) (0) | 2025.09.18 |