반응형
Hazard Pointer(위험 포인터)는 Lock-free 자료구조에서 안전한 메모리 회수(memory reclamation)를 위해 고안된 기법입니다.
문제의 배경은 ABA 문제와 연결됩니다:
- 어떤 스레드가 노드를 참조 중일 때, 다른 스레드가 그 노드를 해제하고 재활용하면, 첫 번째 스레드는 더 이상 유효하지 않은 메모리를 건드리게 됩니다(Use-After-Free).
- Hazard Pointer는 각 스레드가 내가 지금 참조하고 있는, 아직 해제되면 안 되는 노드의 주소를 표시하는 방식입니다.
동작 방식
각 스레드마다 Hazard Pointer 배열(혹은 슬롯)을 둡니다.
어떤 스레드가 lock-free 구조에서 노드를 읽을 때:
- 노드 포인터를 읽기 전에 Hazard Pointer에 등록(= 보호 마킹).
- 이 포인터는 “아직 해제하면 안 됨”을 의미.
다른 스레드가 해당 노드를 삭제하려고 할 때:
- 먼저 모든 스레드의 Hazard Pointer를 검사.
- 그 노드가 어느 스레드에서도 Hazard Pointer에 등록되어 있지 않으면 안전하게 free 가능.
- 등록되어 있다면 당장 해제하지 않고 **Retire List(연기 해제 목록)**에 넣어둠.
주기적으로 Retire List를 확인하여, Hazard Pointer에 걸려 있지 않은 노드를 해제.
장점은?
- 락 불필요: Lock-free 자료구조의 특성을 유지하면서 안전한 메모리 해제를 보장.
- ABA 문제 완화: 포인터 재활용 시점에서 “누군가 참조 중인지” 확인 가능.
- 간단하고 직관적: RCU(읽기-복사-업데이트) 같은 방법보다 구현이 단순.
단점은?
- 추가 오버헤드: 매번 포인터를 Hazard Pointer에 등록하고, Retire List를 검사해야 함.
- 스레드별 메모리 사용량 증가: 스레드마다 Hazard Pointer 슬롯과 Retire List 필요.
- 지연된 메모리 회수: 즉시 free가 어려워 Retire List가 쌓일 수 있음.
// g++ -std=c++17 -O2 -pthread hp_stack_demo.cpp && ./a.out
#include <atomic>
#include <vector>
#include <thread>
#include <optional>
#include <functional>
#include <iostream>
#include <unordered_set>
#include <cassert>
// ====== Hazard Pointer: 간단 구현 ======
namespace hp {
constexpr size_t MAX_THREADS = 128; // 동시에 참여 가능한 최대 스레드 수
constexpr size_t HAZARDS_PER_THREAD = 1; // 본 예제에선 pop 보호용 1슬롯만 사용
struct HazardSlot {
std::atomic<void*> ptr{nullptr};
};
static HazardSlot g_hazards[MAX_THREADS * HAZARDS_PER_THREAD];
static std::atomic<size_t> g_next_slot{0};
struct Retired {
void* p;
void (*deleter)(void*);
};
// 스레드별 컨텍스트: 자신의 hazard 슬롯 범위와 retire 리스트 보유
class ThreadContext {
public:
ThreadContext() {
// 본 스레드에 고유 슬롯 배정
size_t base = g_next_slot.fetch_add(HAZARDS_PER_THREAD, std::memory_order_relaxed);
if (base + HAZARDS_PER_THREAD > MAX_THREADS * HAZARDS_PER_THREAD) {
std::cerr << "Too many threads for hazard pointers\n";
std::terminate();
}
start_index_ = base;
// 슬롯 초기화
for (size_t i = 0; i < HAZARDS_PER_THREAD; ++i) {
slot(i).ptr.store(nullptr, std::memory_order_relaxed);
}
}
~ThreadContext() {
// 스레드 종료 시 남은 retire들을 가능한 만큼 정리
scan_and_reclaim();
// 슬롯 비우기
for (size_t i = 0; i < HAZARDS_PER_THREAD; ++i) {
slot(i).ptr.store(nullptr, std::memory_order_release);
}
}
// i번째 hazard 슬롯을 보호 포인터로 세팅
void protect(size_t i, void* p) {
assert(i < HAZARDS_PER_THREAD);
slot(i).ptr.store(p, std::memory_order_release);
}
// i번째 슬롯 클리어
void clear(size_t i) {
assert(i < HAZARDS_PER_THREAD);
slot(i).ptr.store(nullptr, std::memory_order_release);
}
// 객체를 retire: deleter는 타입 지운 void* 삭제 함수
void retire(void* p, void (*deleter)(void*)) {
retired_.push_back({p, deleter});
// 임계치(적당히) 넘으면 스캔해서 안전한 것들 해제
if (retired_.size() >= SCAN_THRESHOLD) {
scan_and_reclaim();
}
}
// retire 리스트를 훑어보고, 어떤 hazard에도 잡혀있지 않은 포인터만 delete
void scan_and_reclaim() {
// 1) hazard 스냅샷 수집
std::unordered_set<void*> hazards_snapshot;
hazards_snapshot.reserve(MAX_THREADS * HAZARDS_PER_THREAD);
for (size_t i = 0; i < MAX_THREADS * HAZARDS_PER_THREAD; ++i) {
void* p = g_hazards[i].ptr.load(std::memory_order_acquire);
if (p) hazards_snapshot.insert(p);
}
// 2) retired 중 안전한 것만 실제 해제
std::vector<Retired> keep;
keep.reserve(retired_.size());
for (auto& r : retired_) {
if (hazards_snapshot.find(r.p) == hazards_snapshot.end()) {
// 어떤 스레드도 보호하지 않음 → 해제
r.deleter(r.p);
} else {
keep.push_back(r);
}
}
retired_.swap(keep);
}
private:
static constexpr size_t SCAN_THRESHOLD = 64; // 필요시 조절
size_t start_index_{0};
std::vector<Retired> retired_;
static HazardSlot& at(size_t i) { return g_hazards[i]; }
HazardSlot& slot(size_t i) { return at(start_index_ + i); }
};
// 스레드 로컬 컨텍스트
inline thread_local ThreadContext tls_ctx;
// 편의 래퍼
inline void protect(size_t i, void* p) { tls_ctx.protect(i, p); }
inline void clear(size_t i) { tls_ctx.clear(i); }
inline void retire(void* p, void (*deleter)(void*)) { tls_ctx.retire(p, deleter); }
inline void scan() { tls_ctx.scan_and_reclaim(); }
} // namespace hp
// ====== Lock-free Stack (Treiber) + Hazard Pointer 보호 ======
template <typename T>
class LockFreeStack {
struct Node {
T value;
Node* next;
explicit Node(T v) : value(std::move(v)), next(nullptr) {}
};
std::atomic<Node*> head_{nullptr};
static void delete_node_void(void* p) {
delete static_cast<Node*>(p);
}
public:
void push(T v) {
Node* n = new Node(std::move(v));
n->next = head_.load(std::memory_order_relaxed);
// 단순한 Treiber push 루프
while (!head_.compare_exchange_weak(
n->next, n,
std::memory_order_release,
std::memory_order_relaxed)) {}
}
// pop: 성공 시 값 반환, 실패(빈 스택) 시 nullopt
std::optional<T> pop() {
using namespace hp;
Node* h = head_.load(std::memory_order_acquire);
while (true) {
if (!h) return std::nullopt;
// 1) 현재 head를 Hazard로 보호
protect(0, h);
// 2) 보호 후에도 head가 같은지 재확인(TOCTTOU 방지)
if (h != head_.load(std::memory_order_acquire)) {
// head가 바뀌었으면 다시 시도
h = head_.load(std::memory_order_acquire);
continue;
}
Node* next = h->next;
// 3) CAS로 head를 next로 이동
if (head_.compare_exchange_weak(
h, next,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
// 4) Hazard 해제 후 안전한 retire
clear(0);
T out = std::move(h->value);
retire(static_cast<void*>(h), &delete_node_void);
return out;
}
// 실패 시 h는 최신 값으로 갱신되어 돌아오므로 루프 계속
}
}
// 테스트/정리 용: 남은 노드 모두 pop
std::vector<T> drain() {
std::vector<T> res;
for (;;) {
auto v = pop();
if (!v) break;
res.push_back(std::move(*v));
}
return res;
}
};
// ====== 간단한 테스트 ======
int main() {
LockFreeStack<int> s;
// 다중 프로듀서
const int producers = 4;
const int per_producer = 10000;
std::vector<std::thread> ths;
ths.reserve(producers);
for (int p = 0; p < producers; ++p) {
ths.emplace_back([&, base = p * per_producer] {
for (int i = 0; i < per_producer; ++i) {
s.push(base + i);
if ((i & 255) == 0) hp::scan(); // 가끔 스캔
}
});
}
// 소비자
std::atomic<bool> done{false};
std::thread consumer([&] {
size_t popped = 0;
while (!done.load(std::memory_order_acquire) || popped < size_t(producers * per_producer)) {
auto v = s.pop();
if (v) ++popped;
if ((popped & 1023) == 0) hp::scan(); // 가끔 스캔
}
// 마지막 한 번 더
hp::scan();
std::cout << "Popped total: " << popped << "\n";
});
for (auto& t : ths) t.join();
done.store(true, std::memory_order_release);
consumer.join();
// 스택이 비었는지 확인
auto rest = s.drain();
std::cout << "Remaining after drain: " << rest.size() << "\n";
// 마지막 전체 스캔(메모리 회수 마무리)
hp::scan();
return 0;
}
반응형
'서버 개발 > 멀티쓰레드 프로그래밍' 카테고리의 다른 글
ABA 문제와 사례 (0) | 2025.09.18 |
---|---|
대표적인 락/동기화 도구 : Event, Semaphore, Condition Variable, Interlocked 함수, Spin Lock, Slim Tools (0) | 2025.09.18 |
SRWLock / Critical Section / Mutex 장단점 비교 (0) | 2025.09.18 |
Epoch-based Reclamation (EBR, 시기 기반 메모리 회수) (0) | 2025.09.18 |