락프리 스택의 참고코드와 그 구현
the art of multiprocessor programming 서적을 참고한 락프리 스택의 Java 코드는 다음과 같다.
public class Node {
public T value;
public Node next;
public Node(T value) {
value = value;
next = null;
}
}
public class LockFreeStack<T> {
AtomicReference<Node> top = new AtomicReference<Node>(null);
static final int MIN_DELAY = ...;
static final int MAX_DELAY = ...;
Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
protected boolean tryPush(Node node) {
Node oldTop = top.get();
node.next = oldTop;
return(top.compareAndSet(oldTop, node));
}
public void push(T value) {
Node node = new Node(value);
while (true) {
if (tryPush(node)) {
return;
}
else {
backoff.backoff();
}
}
}
protected Node tryPop() throws EmptyException {
Node oldTop = top.get();
if (oldTop == null) {
throw new EmptyException();
}
Node newTop = oldTop.next;
if (top.compareAndSet(oldTop, newTop)) {
return oldTop;
}
else {
return null;
}
}
public T pop() throws EmptyException {
while (true) {
Node returnNode = tryPop();
if (returnNode != null) {
return returnNode.value;
}
else {
backoff.backoff();
}
}
}
}
위 java코드에서 backoff는 top을 대상으로 한 compareAndSet 동작에 실패했을 경우 top을 다른 스레드에서 변경한 상황 이므로 잠시간 대기하는 메소드이다.
backoff 기능은 lock free stack의 필수요소가 아니라, 시스템 전체의 자원을 조금 더 효율적으로 쓰기 위한 부가기능이므로 본 포스팅에서는 제외하기로 한다.
또한 AtomicReference는 C++의 Atomic, volatile + Interlocked 연산으로 대체된다.
compareAndSet은 CAS 연산이므로 compare_exchange, InterlockedCompareExcamge로 대체된다.
이를 c++ 코드로 작성한 코드이다. (try~ 함수의 래핑을 풀었다)
windows API ver
// "컴파일러 최적화 옵션을 OFF로 하여 volatile 키워드를 생략하였다."
#include <Windows.h>
#include <atomic>
namespace winAPI
{
template <typename T>
struct node
{
node(T v) : value(v), next(nullptr) {
};
T value;
node* next;
};
template <typename T>
class LockFreeStack {
public:
LockFreeStack() : _size(0), top(nullptr) {
}
~LockFreeStack() {};
void push(T v)
{
node<T>* newNode = new node<T>(v);
node<T>* oldTop;
while (true)
{
oldTop = top;
newNode->next = oldTop;
if (InterlockedCompareExchangePointer((PVOID*)&top, newNode, oldTop) == oldTop)
{
InterlockedIncrement(&_size);
break;
}
}
}
T pop()
{
node<T>* popNode;
node<T>* oldTop;
T ret;
while (true)
{
oldTop = top;
if (oldTop == nullptr)
throw;
popNode = oldTop;
if (InterlockedCompareExchangePointer((PVOID*)&top, popNode->next, oldTop) == oldTop)
{
ret = popNode->value;
InterlockedDecrement(&_size);
break;
}
}
delete popNode;
return ret;
}
size_t size()
{
return _size;
}
size_t _size;
node<T>* top;
};
}
c++ 17 standard ver
#include <Windows.h>
#include <atomic>
namespace standard
{
using namespace std;
template <typename T>
struct node
{
node(T v) : value(v), next(nullptr) {
};
T value;
node* next;
};
template <typename T>
class LockFreeStack {
public:
LockFreeStack() : _size(0), top(nullptr){
}
~LockFreeStack() {};
void push(T v)
{
node<T>* newNode = new node<T>(v);
node<T>* oldTop;
newNode->next = top;
while (true)
{
if (top.compare_exchange_weak(newNode->next, newNode))
{
_size++;
break;
}
}
}
T pop()
{
node<T>* popNode = top;
int ret;
while (true)
{
if (popNode == nullptr)
throw;
if (top.compare_exchange_weak(popNode, popNode->next))
{
ret = popNode->value;
_size--;
break;
}
}
delete popNode;
return ret;
}
size_t size()
{
return _size;
}
atomic<size_t> _size;
atomic<node<T>*> top;
};
}
락프리 스택의 테스트
우선 락프리 스택이 올바르게(thread safety 하게) 작동하는지 테스트하기 위해서는 다음과 같은 항목들을 체크해야 한다.
1. 한번 push 된 노드(값)가 중복해서 pop 되지는 않는가.
2. 한번 push 된 노드(값)가 pop 되지 않는 경우는 없는가. (= 노드가 분실되지는 않는가)
3. 특정 스레드가 dead lock에 걸리거나, 루프에서 빠져나오지 못하거나(무한 대기), 런타임 에러가 발생하는 등 스레드의 비 정상적 동작은 없는가.
이를 확인하는 테스트를 설계하자면
- N개의 스레드가 X회 PUSH 후 X회 POP 하는 동작을 반복한다.
- 이때 중복된 값을 POP 하는지 확인하기 위해 N개의 스레드가 PUSH 하는 값은 모두 유니크해야 한다.
- 이때 POP 한 값은 따로 저장해 둔다. 중복 체크와 개수 체크를 위함이다.
- 이때 POP 한 값을 저장할 공간은 각 스레드가 독점하는 컨테이너를 준비한다. 이 저장공간에서 발생하는 경합과 스레드 간 직렬화 때문에 테스트에 영향을 주면 안 되기 때문이다.
- 값을 저장할 때, 파일 쓰기나 콘솔 출력을 사용하지 않는다. 콘솔과 파일 둘 다 각 스레드가 독점하는 자원은 아니기 때문이다. 각 스레드가 독점하는 메모리에 값을 저장하고, 그 메모리를 확인할 것이다.
- 임의의 시점 (시간, 횟수, 입력 등)에 스레드의 동작을 중지하고 스택에 남은 노드의 개수를 확인한다.
- 이때 스레드는 X회 POP을 마무리 한 뒤 종료되어야 한다. 모든 스레드가 X회 PUSH, X회 POP을 한 뒤 종료되었다면 정상적인 스택의 크기는 0이 되어야 한다.
- 스택의 크기가 정상이라면 저장된 값을 확인한다.
- 본 테스트에서는 스레드 2개를 이용할 것이며, 각 스레드의 초기 시작 번호를 1, 2로 지정한다.
- 각 스레드는 (이전 번호 + 2)의 값을 X회 PUSH 할 것이다. 즉 1번 스레드는 (1, 3, 5, 7...)을 PUSH 할 것이다.
- 각 스레드의 실행 시간 차이 때문에 1부터 연속적으로 증가하는 값이 PUSH 되지는 않을 것이다. 하지만 나중에 POP 된 값을 모아놓으면 1부터 연속적으로 증가하는 값이 1개씩 저장되어 있어야 한다.
이를 구현한 테스트 코드는 다음과 같다.
#include <iostream>
#include <thread>
#include <vector>
#include "lockFreeStack_windows.h"
#include "lockFreeStack_standard.h"
using namespace std;
winAPI::LockFreeStack<unsigned long long int> stack;
//standard::LockFreeStack<unsigned long long int> stack;
bool threadRun;
int threadCount;
unsigned long long int testCount;
vector<vector<unsigned long long int>> result;
void PushPop_process(int startNumber, int pushPopSize)
{
while (!threadRun)
{
//wait for start signal
};
unsigned long long int currentNumber = startNumber;
while (threadRun)
{
if (currentNumber > testCount)
break;
for (int i = 0; i < pushPopSize; i++)
{
stack.push(currentNumber);
currentNumber += threadCount;
}
for (int i = 0; i < pushPopSize; i++)
{
result[startNumber].push_back(stack.pop());
}
}
}
void main()
{
testCount = 10000;
threadCount = 2;
// 테스트 스레드 4개 생성
vector<thread> threads;
result.resize(threadCount);
for (int i = 0; i < threadCount; i++)
threads.push_back(thread(PushPop_process, i, 10));
threadRun = true;
Sleep(1000);
threadRun = false;
for (int i = 0; i < threadCount; i++)
threads[i].join();
// 이지점에서 break 후 result 확인
cout << stack.size();
cout << "break here";
}
code
테스트 결과
테스트 결과 확인할 항목은 총 3종류이다.
1. 동작 종료 후 스택의 크기는 0인가
2. 동작 종료 후 result의 값을 확인했을 때, 1부터 목표치까지 모든 값이 1개씩 들어있는가
3. 동작 종료까지 런타임 에러는 없는가
testCount = 10000 테스트 결과 windows API 버전 코드에서 문제가 발견되었다.
다음 포스팅에서는 왜 이러한 결과가 발생했는지 사고실험을 진행하고, 그 증거를 찾기 위한 새로운 테스트와 수정방안을 적기로 한다.
testCount = -1로 장시간 반복 결과 winAPI 버전과 standard 버전 모두에서 에러가 발생했다.
이 또한 다음 포스팅에서 다루도록 한다.
'락 프리(lock free)' 카테고리의 다른 글
4. 락프리 스택 기본 구현체의 문제 1 (ABA 문제 확인, 수정) (0) | 2023.09.21 |
---|---|
3. 락프리 스택의 문제 추론 (문제점 정리와 사고실험, 로깅) (1) | 2023.09.21 |
1. 락 프리(lock free) 카테고리 설명 및 목차 (0) | 2023.09.21 |