서버 개발/멀티쓰레드 프로그래밍

위험 포인터(Hazard Pointer)

지노윈 2025. 9. 18. 10:50
반응형

 

Hazard Pointer(위험 포인터)Lock-free 자료구조에서 안전한 메모리 회수(memory reclamation)를 위해 고안된 기법입니다.

 

문제의 배경은 ABA 문제와 연결됩니다:

  • 어떤 스레드가 노드를 참조 중일 때, 다른 스레드가 그 노드를 해제하고 재활용하면, 첫 번째 스레드는 더 이상 유효하지 않은 메모리를 건드리게 됩니다(Use-After-Free).
  • Hazard Pointer는 각 스레드가 내가 지금 참조하고 있는, 아직 해제되면 안 되는 노드의 주소를 표시하는 방식입니다.

동작 방식

각 스레드마다 Hazard Pointer 배열(혹은 슬롯)을 둡니다.

 

어떤 스레드가 lock-free 구조에서 노드를 읽을 때:

  • 노드 포인터를 읽기 전에 Hazard Pointer에 등록(= 보호 마킹).
  • 이 포인터는 “아직 해제하면 안 됨”을 의미.

다른 스레드가 해당 노드를 삭제하려고 할 때:

  • 먼저 모든 스레드의 Hazard Pointer를 검사.
  • 그 노드가 어느 스레드에서도 Hazard Pointer에 등록되어 있지 않으면 안전하게 free 가능.
  • 등록되어 있다면 당장 해제하지 않고 **Retire List(연기 해제 목록)**에 넣어둠.

주기적으로 Retire List를 확인하여, Hazard Pointer에 걸려 있지 않은 노드를 해제.

장점은?

  • 락 불필요: Lock-free 자료구조의 특성을 유지하면서 안전한 메모리 해제를 보장.
  • ABA 문제 완화: 포인터 재활용 시점에서 “누군가 참조 중인지” 확인 가능.
  • 간단하고 직관적: RCU(읽기-복사-업데이트) 같은 방법보다 구현이 단순.

단점은?

  • 추가 오버헤드: 매번 포인터를 Hazard Pointer에 등록하고, Retire List를 검사해야 함.
  • 스레드별 메모리 사용량 증가: 스레드마다 Hazard Pointer 슬롯과 Retire List 필요.
  • 지연된 메모리 회수: 즉시 free가 어려워 Retire List가 쌓일 수 있음.

 

// g++ -std=c++17 -O2 -pthread hp_stack_demo.cpp && ./a.out
#include <atomic>
#include <vector>
#include <thread>
#include <optional>
#include <functional>
#include <iostream>
#include <unordered_set>
#include <cassert>

// ====== Hazard Pointer: 간단 구현 ======
namespace hp {

constexpr size_t MAX_THREADS = 128;      // 동시에 참여 가능한 최대 스레드 수
constexpr size_t HAZARDS_PER_THREAD = 1; // 본 예제에선 pop 보호용 1슬롯만 사용

struct HazardSlot {
    std::atomic<void*> ptr{nullptr};
};

static HazardSlot g_hazards[MAX_THREADS * HAZARDS_PER_THREAD];
static std::atomic<size_t> g_next_slot{0};

struct Retired {
    void* p;
    void (*deleter)(void*);
};

// 스레드별 컨텍스트: 자신의 hazard 슬롯 범위와 retire 리스트 보유
class ThreadContext {
public:
    ThreadContext() {
        // 본 스레드에 고유 슬롯 배정
        size_t base = g_next_slot.fetch_add(HAZARDS_PER_THREAD, std::memory_order_relaxed);
        if (base + HAZARDS_PER_THREAD > MAX_THREADS * HAZARDS_PER_THREAD) {
            std::cerr << "Too many threads for hazard pointers\n";
            std::terminate();
        }
        start_index_ = base;
        // 슬롯 초기화
        for (size_t i = 0; i < HAZARDS_PER_THREAD; ++i) {
            slot(i).ptr.store(nullptr, std::memory_order_relaxed);
        }
    }

    ~ThreadContext() {
        // 스레드 종료 시 남은 retire들을 가능한 만큼 정리
        scan_and_reclaim();
        // 슬롯 비우기
        for (size_t i = 0; i < HAZARDS_PER_THREAD; ++i) {
            slot(i).ptr.store(nullptr, std::memory_order_release);
        }
    }

    // i번째 hazard 슬롯을 보호 포인터로 세팅
    void protect(size_t i, void* p) {
        assert(i < HAZARDS_PER_THREAD);
        slot(i).ptr.store(p, std::memory_order_release);
    }

    // i번째 슬롯 클리어
    void clear(size_t i) {
        assert(i < HAZARDS_PER_THREAD);
        slot(i).ptr.store(nullptr, std::memory_order_release);
    }

    // 객체를 retire: deleter는 타입 지운 void* 삭제 함수
    void retire(void* p, void (*deleter)(void*)) {
        retired_.push_back({p, deleter});
        // 임계치(적당히) 넘으면 스캔해서 안전한 것들 해제
        if (retired_.size() >= SCAN_THRESHOLD) {
            scan_and_reclaim();
        }
    }

    // retire 리스트를 훑어보고, 어떤 hazard에도 잡혀있지 않은 포인터만 delete
    void scan_and_reclaim() {
        // 1) hazard 스냅샷 수집
        std::unordered_set<void*> hazards_snapshot;
        hazards_snapshot.reserve(MAX_THREADS * HAZARDS_PER_THREAD);
        for (size_t i = 0; i < MAX_THREADS * HAZARDS_PER_THREAD; ++i) {
            void* p = g_hazards[i].ptr.load(std::memory_order_acquire);
            if (p) hazards_snapshot.insert(p);
        }

        // 2) retired 중 안전한 것만 실제 해제
        std::vector<Retired> keep;
        keep.reserve(retired_.size());
        for (auto& r : retired_) {
            if (hazards_snapshot.find(r.p) == hazards_snapshot.end()) {
                // 어떤 스레드도 보호하지 않음 → 해제
                r.deleter(r.p);
            } else {
                keep.push_back(r);
            }
        }
        retired_.swap(keep);
    }

private:
    static constexpr size_t SCAN_THRESHOLD = 64; // 필요시 조절
    size_t start_index_{0};
    std::vector<Retired> retired_;

    static HazardSlot& at(size_t i) { return g_hazards[i]; }
    HazardSlot& slot(size_t i) { return at(start_index_ + i); }
};

// 스레드 로컬 컨텍스트
inline thread_local ThreadContext tls_ctx;

// 편의 래퍼
inline void protect(size_t i, void* p) { tls_ctx.protect(i, p); }
inline void clear(size_t i) { tls_ctx.clear(i); }
inline void retire(void* p, void (*deleter)(void*)) { tls_ctx.retire(p, deleter); }
inline void scan() { tls_ctx.scan_and_reclaim(); }

} // namespace hp

// ====== Lock-free Stack (Treiber) + Hazard Pointer 보호 ======
template <typename T>
class LockFreeStack {
    struct Node {
        T value;
        Node* next;
        explicit Node(T v) : value(std::move(v)), next(nullptr) {}
    };

    std::atomic<Node*> head_{nullptr};

    static void delete_node_void(void* p) {
        delete static_cast<Node*>(p);
    }

public:
    void push(T v) {
        Node* n = new Node(std::move(v));
        n->next = head_.load(std::memory_order_relaxed);
        // 단순한 Treiber push 루프
        while (!head_.compare_exchange_weak(
            n->next, n,
            std::memory_order_release,
            std::memory_order_relaxed)) {}
    }

    // pop: 성공 시 값 반환, 실패(빈 스택) 시 nullopt
    std::optional<T> pop() {
        using namespace hp;

        Node* h = head_.load(std::memory_order_acquire);
        while (true) {
            if (!h) return std::nullopt;

            // 1) 현재 head를 Hazard로 보호
            protect(0, h);

            // 2) 보호 후에도 head가 같은지 재확인(TOCTTOU 방지)
            if (h != head_.load(std::memory_order_acquire)) {
                // head가 바뀌었으면 다시 시도
                h = head_.load(std::memory_order_acquire);
                continue;
            }

            Node* next = h->next;
            // 3) CAS로 head를 next로 이동
            if (head_.compare_exchange_weak(
                    h, next,
                    std::memory_order_acq_rel,
                    std::memory_order_acquire)) {
                // 4) Hazard 해제 후 안전한 retire
                clear(0);
                T out = std::move(h->value);
                retire(static_cast<void*>(h), &delete_node_void);
                return out;
            }
            // 실패 시 h는 최신 값으로 갱신되어 돌아오므로 루프 계속
        }
    }

    // 테스트/정리 용: 남은 노드 모두 pop
    std::vector<T> drain() {
        std::vector<T> res;
        for (;;) {
            auto v = pop();
            if (!v) break;
            res.push_back(std::move(*v));
        }
        return res;
    }
};

// ====== 간단한 테스트 ======
int main() {
    LockFreeStack<int> s;

    // 다중 프로듀서
    const int producers = 4;
    const int per_producer = 10000;

    std::vector<std::thread> ths;
    ths.reserve(producers);

    for (int p = 0; p < producers; ++p) {
        ths.emplace_back([&, base = p * per_producer] {
            for (int i = 0; i < per_producer; ++i) {
                s.push(base + i);
                if ((i & 255) == 0) hp::scan(); // 가끔 스캔
            }
        });
    }

    // 소비자
    std::atomic<bool> done{false};
    std::thread consumer([&] {
        size_t popped = 0;
        while (!done.load(std::memory_order_acquire) || popped < size_t(producers * per_producer)) {
            auto v = s.pop();
            if (v) ++popped;
            if ((popped & 1023) == 0) hp::scan(); // 가끔 스캔
        }
        // 마지막 한 번 더
        hp::scan();
        std::cout << "Popped total: " << popped << "\n";
    });

    for (auto& t : ths) t.join();
    done.store(true, std::memory_order_release);
    consumer.join();

    // 스택이 비었는지 확인
    auto rest = s.drain();
    std::cout << "Remaining after drain: " << rest.size() << "\n";

    // 마지막 전체 스캔(메모리 회수 마무리)
    hp::scan();
    return 0;
}
반응형