본문 바로가기

락 프리(lock free)

2. 락프리 스택의 기본 구현 (참고코드 첨부 및 그 구현, 테스트 코드 작성)



 

 


락프리 스택의 참고코드와 그 구현

 

 

the art of multiprocessor programming 서적을 참고한 락프리 스택의 Java 코드는 다음과 같다.

public class Node {
	public T value;
	public Node next;
	public Node(T value) {
		value = value;
		next = null;
	}
}

public class LockFreeStack<T> {
	AtomicReference<Node> top = new AtomicReference<Node>(null);
	static final int MIN_DELAY = ...;
	static final int MAX_DELAY = ...;
	Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);

	protected boolean tryPush(Node node) {
		Node oldTop = top.get();
		node.next = oldTop;
		return(top.compareAndSet(oldTop, node));
	}
    
	public void push(T value) {
		Node node = new Node(value);
		while (true) {
			if (tryPush(node)) {
				return;
			}
			else {
				backoff.backoff();
			}
		}
	}

	protected Node tryPop() throws EmptyException {
		Node oldTop = top.get();
		if (oldTop == null) {
			throw new EmptyException();
		}
		Node newTop = oldTop.next;
		if (top.compareAndSet(oldTop, newTop)) {
			return oldTop;
		}
		else {
			return null;
		}
	}
    
	public T pop() throws EmptyException {
		while (true) {
			Node returnNode = tryPop();
			if (returnNode != null) {
				return returnNode.value;
			}
			else {
				backoff.backoff();
			}
		}
	}
}

 

위 java코드에서 backoff는 top을 대상으로 한 compareAndSet 동작에 실패했을 경우 top을 다른 스레드에서 변경한 상황 이므로 잠시간 대기하는 메소드이다.

 

backoff 기능은 lock free stack의 필수요소가 아니라, 시스템 전체의 자원을 조금 더 효율적으로 쓰기 위한 부가기능이므로 본 포스팅에서는 제외하기로 한다.

 

또한 AtomicReference는 C++의 Atomic, volatile + Interlocked 연산으로 대체된다.

compareAndSet은 CAS 연산이므로 compare_exchange, InterlockedCompareExcamge로 대체된다.

 

 

 

이를 c++ 코드로 작성한 코드이다. (try~ 함수의 래핑을 풀었다)

 

windows API ver

// "컴파일러 최적화 옵션을 OFF로 하여 volatile 키워드를 생략하였다."
#include <Windows.h>
#include <atomic>


namespace winAPI
{
	template <typename T>
	struct node
	{
		node(T v) : value(v), next(nullptr) {
		};

		T value;
		node* next;
	};

	template <typename T>
	class LockFreeStack {
	public:
		LockFreeStack() : _size(0), top(nullptr) {
		}
		~LockFreeStack() {};


		void push(T v)
		{
			node<T>* newNode = new node<T>(v);
			node<T>* oldTop;

			while (true)
			{
				oldTop = top;
				newNode->next = oldTop;

				if (InterlockedCompareExchangePointer((PVOID*)&top, newNode, oldTop) == oldTop)
				{
					InterlockedIncrement(&_size);
					break;
				}

			}
		}

		T pop()
		{
			node<T>* popNode;
			node<T>* oldTop;
			T ret;

			while (true)
			{
				oldTop = top;
				if (oldTop == nullptr)
					throw;

				popNode = oldTop;
				if (InterlockedCompareExchangePointer((PVOID*)&top, popNode->next, oldTop) == oldTop)
				{
					ret = popNode->value;
					InterlockedDecrement(&_size);
					break;
				}

			}

			delete popNode;
			return ret;
		}

		size_t size()
		{
			return _size;
		}

		size_t _size;
		node<T>* top;
	};
}

 

c++ 17 standard ver

#include <Windows.h>
#include <atomic>


namespace standard
{
	using namespace std;

	template <typename T>
	struct node
	{
		node(T v) : value(v), next(nullptr) {
		};
		
		T value;
		node* next;
	};

	template <typename T>
	class LockFreeStack {
	public:
		LockFreeStack() : _size(0), top(nullptr){
		}
		~LockFreeStack() {};


		void push(T v)
		{
			node<T>* newNode = new node<T>(v);
			node<T>* oldTop;

			newNode->next = top;

			while (true)
			{
				if (top.compare_exchange_weak(newNode->next, newNode))
				{
					_size++;
					break;
				}

			}
		}

		T pop()
		{
			node<T>* popNode = top;
			int ret;

			while (true)
			{
				if (popNode == nullptr)
					throw;

				if (top.compare_exchange_weak(popNode, popNode->next))
				{
					ret = popNode->value;
					_size--;
					break;
				}

			}

			delete popNode;
			return ret;
		}

		size_t size()
		{
			return _size;
		}

		atomic<size_t> _size;
		atomic<node<T>*> top;
	};
}

 


락프리 스택의 테스트

 

 

우선 락프리 스택이 올바르게(thread safety 하게) 작동하는지 테스트하기 위해서는 다음과 같은 항목들을 체크해야 한다.

1. 한번 push 된 노드(값)가 중복해서 pop 되지는 않는가.

2. 한번 push 된 노드(값)가 pop 되지 않는 경우는 없는가. (= 노드가 분실되지는 않는가)

3. 특정 스레드가 dead lock에 걸리거나, 루프에서 빠져나오지 못하거나(무한 대기), 런타임 에러가 발생하는 등 스레드의 비 정상적 동작은 없는가.

 

이를 확인하는 테스트를 설계하자면

  •  N개의 스레드가 X회 PUSH 후 X회 POP 하는 동작을 반복한다.
    • 이때 중복된 값을 POP 하는지 확인하기 위해 N개의 스레드가 PUSH 하는 값은 모두 유니크해야 한다.
    • 이때 POP 한 값은 따로 저장해 둔다. 중복 체크와 개수 체크를 위함이다.
    • 이때 POP 한 값을 저장할 공간은 각 스레드가 독점하는 컨테이너를 준비한다. 이 저장공간에서 발생하는 경합과 스레드 간 직렬화 때문에 테스트에 영향을 주면 안 되기 때문이다.
    • 값을 저장할 때, 파일 쓰기나 콘솔 출력을 사용하지 않는다. 콘솔과 파일 둘 다 각 스레드가 독점하는 자원은 아니기 때문이다. 각 스레드가 독점하는 메모리에 값을 저장하고, 그 메모리를 확인할 것이다.
  • 임의의 시점 (시간, 횟수, 입력 등)에 스레드의 동작을 중지하고 스택에 남은 노드의 개수를 확인한다.
    • 이때 스레드는 X회 POP을 마무리 한 뒤 종료되어야 한다. 모든 스레드가 X회 PUSH, X회 POP을 한 뒤 종료되었다면 정상적인 스택의 크기는 0이 되어야 한다.
  • 스택의 크기가 정상이라면 저장된 값을 확인한다.
    • 본 테스트에서는 스레드 2개를 이용할 것이며, 각 스레드의 초기 시작 번호를 1, 2로 지정한다.
    • 각 스레드는 (이전 번호 + 2)의 값을 X회 PUSH 할 것이다. 즉 1번 스레드는 (1, 3, 5, 7...)을 PUSH 할 것이다.
    • 각 스레드의 실행 시간 차이 때문에 1부터 연속적으로 증가하는 값이 PUSH 되지는 않을 것이다. 하지만 나중에 POP 된 값을 모아놓으면 1부터 연속적으로 증가하는 값이 1개씩 저장되어 있어야 한다.

 

이를 구현한 테스트 코드는 다음과 같다.

#include <iostream>
#include <thread>
#include <vector>

#include "lockFreeStack_windows.h"
#include "lockFreeStack_standard.h"

using namespace std;

winAPI::LockFreeStack<unsigned long long int> stack;
//standard::LockFreeStack<unsigned long long int> stack;


bool threadRun;
int threadCount;
unsigned long long int testCount;
vector<vector<unsigned long long int>> result;

void PushPop_process(int startNumber, int pushPopSize)
{
	while (!threadRun)
	{
		//wait for start signal
	};

	unsigned long long int currentNumber = startNumber;
	while (threadRun)
	{
		if (currentNumber > testCount)
			break;

		for (int i = 0; i < pushPopSize; i++)
		{
			stack.push(currentNumber);
			currentNumber += threadCount;
		}

		for (int i = 0; i < pushPopSize; i++)
		{
			result[startNumber].push_back(stack.pop());
		}
	}
}

void main()
{
	testCount = 10000;
	threadCount = 2;
	// 테스트 스레드 4개 생성

	vector<thread> threads;
	result.resize(threadCount);

	for (int i = 0; i < threadCount; i++)
		threads.push_back(thread(PushPop_process, i, 10));
	

	threadRun = true;
	Sleep(1000);
	threadRun = false;
	for (int i = 0; i < threadCount; i++)
		threads[i].join();

	// 이지점에서 break 후 result 확인
	cout << stack.size();
	cout << "break here";
}

 

code

 

GitHub - YoungWoo93/lockFreeStack-Queue-sample: 블로그 포스팅 내용과 연계

블로그 포스팅 내용과 연계. Contribute to YoungWoo93/lockFreeStack-Queue-sample development by creating an account on GitHub.

github.com


테스트 결과

 

테스트 결과 확인할 항목은 총 3종류이다.


1. 동작 종료 후 스택의 크기는 0인가

2. 동작 종료 후 result의 값을 확인했을 때, 1부터 목표치까지 모든 값이 1개씩 들어있는가

3. 동작 종료까지 런타임 에러는 없는가

 

result[0]과 [1]의 값을 모두 확인한다

 

 

 

 

testCount = 10000 테스트 결과 windows API 버전 코드에서 문제가 발견되었다.

다음 포스팅에서는 왜 이러한 결과가 발생했는지 사고실험을 진행하고, 그 증거를 찾기 위한 새로운 테스트와 수정방안을 적기로 한다.

 

 

lockFreeStack_test_1.xlsx
0.09MB

 

testCount = -1로 장시간 반복 결과 winAPI 버전과 standard 버전 모두에서 에러가 발생했다.

이 또한 다음 포스팅에서 다루도록 한다.

 

popNode == nullptr 조건에 걸렸다.

 

popNode를 delete 하는 과정에서 문제가 발생했다.