본문 바로가기

STUDY/linux

Synchronization in LINUX - POSIX Semaphore로 producer-consumer 해결하기

동기화 기법 중 semaphore를 사용하는 방법을 알아보자

 

 

synchronization 2

지난 포스트에서 동기화 기법인 Lock에 대해서 살펴보았다. 하지만 spinlock이나 disabling interrupt는 short, simple critical section에서만 효과적이다. Mutual exclusion을 제외하고는 별달리 뭘 해주지 않기..

josushell.tistory.com

 

Semaphore Implementation 은 두가지로 되어있다.

- POSIX semaphore : 주로 스레드 동기화에 사용됨

- System V Semaphore : 주로 프로세스 동기화에 사용됨

 

이 중에서 IEEE에서 정한 UNIX의 표준 (syscall 기준)인 POSIX library로 알아보자.


Producer-Consumer Problem 

producer-consumer problem에 대한 자세한 내용은 위 포스팅을 참고하는 것을 추천한다.

thread level에서 발생 가능한 producer-problem 문제는 다음과 같다.

 

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>

#define LOOP 20
#define MAX 2

typedef struct {
	int	buf[MAX];
	int in;
	int out;
	int counter;
}	BounderBufferType;
	
	
BoundedBufferType Buf;

void producer(void *dummy)
{
	int	i;
	printf("===== Producer started ======\n");

	for (int i = 0;i<LOOP;i++)
	{
		if (Buf.counter == MAX)
		{
			printf("Producer: buffer full. waiting...\n");
			while (Buf.counter == MAX)
				;
		}

		printf("Producer: producing data\n");
		Buf.buf[Buf.in] = 1;
		Buf.in = (Buf.in + 1) % MAX;
		Buf.counter++;
	}

	printf("Producer total: %d itmes\n", Buf.counter);

	pthread_exit(NULL);
}

void consumer(void *dummy)
{
	int i, data;
	printf("===== Consumer started ======\n");

	for (int i = 0;i<LOOP;i++)
	{
		if (Buf.counter == 0)
		{
			printf("Consumer: buffer empty. waiting...\n");
			while (Buf.counter == 0)
				;
		}

		printf("Consumer: consuming data\n");
		data = Buf.buf[Buf.out];
		Buf.out = (Buf.out + 1) % MAX;
		Buf.counter--;
	}

	printf("Consumer total: %d itmes\n", Buf.counter);

	pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
	pthread_t	tid1, tid2;

	if (pthread_creat(&tid1, NULL, (void *)producer, NULL) < 0)
		exit(1);

	if (pthread_creat(&tid2, NULL, (void *)consumer, NULL) < 0)
		exit(1);

	if (pthread_join(tid1, NULL) < 0)
		exit(1);

	if (pthread_join(tid2, NULL) < 0)
		exit(1);

	printf("%d items in buffer\n", Buf.counter);

	return 0;
}

 

여기서 신경써야 하는 부분은 counter 변수에 대한 동기화, 데이터에 대한 동기화 두 부분이다.


Producer - Consumer Problem solution sudo code

이 동기화 문제를 해결하는 방법은 다음과 같다. 이를 POSIX로 구현해보자.

 

// semaphore
mutex = 1
empty = N
full = 0

// semaphore를 사용한 producer
void produce(data)
{
    wait(empty);
    wait(mutex);
    buffer[in] = data;
    in = (in + 1) % N;
    signal(mutex);
    signal(full);
}

// semaphore를 사용한 consumer
void consumer(data)
{
    wait(full);
    wait(mutex);
    data = buffer[out];
    out = (out + 1) % N;
    signal(mutex);
    signal(empty);
}

POSIX Semaphore Library

# include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

 

함수 명세

return 성공 시 0, 실패 시 -1
description semaphore 를 생성하고 초기화를 진행한다.

 

파라미터

sem_t *sem semaphore로 사용할 pointer
int pshared nothing
int value semaphore 초기화에 사용될 초깃값

 

int sem_wait(sem_t *sem);

함수 명세

return 성공 시 0, 실패 시 -1
description semaphore -1 한다. 즉 자원을 쓰기 위해 semaphore를 요청 및 기다린다.

 

파라미터

sem_t *sem semaphore로 사용할 pointer

 

int sem_trywait(sem_t *sem);

함수 명세

return 성공 시 0, 실패 시 -1
description sem_wait()과 같지만, 사용 가능한 semaphore가 없는 경우 기다리지 않고바로 false로 return 한다.

 

파라미터

sem_t *sem semaphore로 사용할 pointer

 

int sem_post(sem_t *sem);

함수 명세

return 성공 시 0, 실패 시 -1
description semaphore +1 한다. 즉 자원을 반납한다.

 

파라미터

sem_t *sem semaphore로 사용할 pointer

 

int sem_getvalue(sem_t *sem, int *sval);

함수 명세

return 성공 시 0, 실패 시 -1
description 현재 semaphore의 갯수를 sval로 전달한다. (out)

 

파라미터

sem_t *sem semaphore로 사용할 pointer
int *sval semaphore의 갯수가 return 되는 변수

 

int sem_destroy(sem_t *sem);

함수 명세

return 성공 시 0, 실패 시 -1
description sempahore를 삭제한다.

 

파라미터

sem_t *sem semaphore로 사용할 pointer

Producer - Consumer Problem solution code

 

1. 먼저 사용하고자 하는 semaphore를 선언해야 한다.

 

이 semaphore들은 전역변수로 선언해서 모든 함수에서 사용할 수 있도록 했다.

producer-consumer problem 해결에는 semaphore가 총 3개 필요하기 때문이다.

읽기 쓰기 담당 1개, 버퍼가 차있는 경우와 비어있는 경우 busy waiting을 해결하기 위한 2개로 총 3개이다.

자세한 내용은 이를 참고하면 좋다.

 

synchronization 2

지난 포스트에서 동기화 기법인 Lock에 대해서 살펴보았다. 하지만 spinlock이나 disabling interrupt는 short, simple critical section에서만 효과적이다. Mutual exclusion을 제외하고는 별달리 뭘 해주지 않기..

josushell.tistory.com

 

 

sem_t emptySem, fullSem, mutexSem;

2. semaphore를 초기화 해준다.

 

이때 sem_init() 함수가 사용된다.

 

int main(int argc, char *argv[])
{
	pthread_t	tid1, tid2;

	sem_init(&emptySem, 0, MAX);
	sem_init(&fullSem, 0, 0);
	sem_init(&mutexSem, 0, 1);
    
    // 뒤에 중략

 

맨 처음의 경우에는 버퍼가 다 비어있는 상태이므로 빈 버퍼를 위한 semaphore(여기서는 emptySem)를 MAX개로 초기화해준다.

fullSem은 당연히 버퍼가 다 비어있으므로 0이다.

mutexSem은 처음에는 읽기, 쓰기가 가능하기 때문에 1로 초기화한다.


3. 동기화가 필요한 곳에서 sem_wait(), sem_post()를 사용한다.

Producer 함수에서는...

 

이때 중요한 점은 producer의 특징이다.

procuder는 데이터를 만들어낸다. 즉 빈 버퍼가 있어야만 데이터를 생산 가능하다.

 

따라서 빈 버퍼가 있을 때까지 기다리는 것이므로 sem_wait(&emptySem) 으로 비어있는 버퍼를 기다리는 것이다.

왜냐하면 emptySem이 바로 빈 버퍼가 있는지 확인하고 빌 때 까지 기다리기 때문이다.

다시 한번 말하지만 이 로직은 race condition 처리와는 관련 있는 것이 아니라, while문으로 무한정 대기하도록 로직을 작성했을 때 cpu를 낭비하게 되는 busy waiting을 해결하기 위함이다.

 

같은 논리로 읽기, 쓰기가 가능한지 확인하기 위해서 sem_wait(&mutexSem) 으로 작업이 가능할 때까지 대기한다.

이는 race condition을 해결하기 위함이다.

void producer(void *dummy)
{
	int	i;
	int sem_value;
	printf("===== Producer started ======\n");

	for (int i = 0;i<LOOP;i++)
	{
		sem_getvalue(&emptySem, &sem_value);
		if (sem_value == 0)
		{
			printf("Producer: buffer full. waiting...\n");
			
			sem_wait(&empty_sem);
			sem_wait(&mutex_sem);
		}

		printf("Producer: producing data\n");
		Buf.buf[Buf.in] = 1;
		Buf.in = (Buf.in + 1) % MAX;
		Buf.counter++;

		sem_post(&mutexSem);
		sem_post(&fullSem);
	}

	printf("Producer total: %d itmes\n", Buf.counter);

	pthread_exit(NULL);
}

critical section을 벗어나게 된 경우에는 다시 다른 프로세스에게 shared data를 사용가능하다는 것을 알리기 위해 sem_post(&mutexSem)로 자원을 해제해야 한다.

 

이때 버퍼에는 자원이 하나 생긴 것이므로 sem_post(&fullSem) 으로 차있는 버퍼가 증가했다는 것을 알려야 한다.


Consumer 함수에서는...

 

producer 함수에서 적용된 semaphore 로직과 똑같다.

다른 부분은 sem_wait(&fullSem) 을 한다는 것이다.

 

이는 consumer 함수의 특성 때문이다. consumer 함수는 데이터를 소비하는 것이기 때문에 버퍼에 1개의 데이터라도 있어야 한다. 버퍼가 빈 경우는 데이터를 읽을 수 없다.

fullSem은 차있는 버퍼를 알려주는 semaphore이므로 sem_wait(&fullSem)으로 차있는 버퍼를 한개 사용하겠다고 요청하는 것이다.

 

void consumer(void *dummy)
{
	int i, data;
	int sem_value;

	printf("===== Consumer started ======\n");

	for (int i = 0;i<LOOP;i++)
	{
		sem_getvalue(&fullSem, &sem_value);
		if (sem_value == 0)
		{
			printf("Consumer: buffer empty. waiting...\n");
			
			sem_wait(&fullSem);
			sem_wait(&mutexSem);
		}

		printf("Consumer: consuming data\n");
		data = Buf.buf[Buf.out];
		Buf.out = (Buf.out + 1) % MAX;
		Buf.counter--;

		sem_post(&mutexSem);
		sem_post(&emptySem);
	}

	printf("Consumer total: %d itmes\n", Buf.counter);

	pthread_exit(NULL);
}

 

똑같은 논리로 semaphoref를 해제할 때에도 sem_post(&emptySem) 을 통해서 비어 있는 버퍼가 증가했음을 알려준다.

왜냐하면 consumer로 버퍼의 데이터를 하나 읽었기 때문이다.


 

4. 끝나고 난 뒤에는 semaphore를 해제한다.

 

int main(int argc, char *argv[])
{
	pthread_t	tid1, tid2;

	sem_init(&emptySem, 0, MAX);
	sem_init(&fullSem, 0, 0);
	sem_init(&mutexSem, 0, 1);

	// 중략

	sem_destroy(&emptySem);
	sem_destroy(&fullSem);
	sem_destroy(&mutexSem);

	return 0;
}

 


전체 코드

semaphore를 사용하여 producer-consumer problem을 해결하는 전체 코드는 다음과 같다.

race condition과 busy waiting 모두 해결되었다.

 

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#include <semaphore.h>

#define LOOP 20
#define MAX 2

typedef struct {
	int	buf[MAX];
	int in;
	int out;
	int counter;
}	BounderBufferType;
	
	
BoundedBufferType Buf;

sem_t emptySem, fullSem, mutexSem;

void producer(void *dummy)
{
	int	i;
	int sem_value;

	printf("===== Producer started ======\n");

	for (int i = 0;i<LOOP;i++)
	{
		sem_getvalue(&emptySem, &sem_value);
		if (sem_value == 0)
		{
			printf("Producer: buffer full. waiting...\n");
			
			sem_wait(&empty_sem);
			sem_wait(&mutex_sem);
		}

		printf("Producer: producing data\n");
		Buf.buf[Buf.in] = 1;
		Buf.in = (Buf.in + 1) % MAX;
		Buf.counter++;

		sem_post(&mutexSem);
		sem_post(&fullSem);
	}

	printf("Producer total: %d itmes\n", Buf.counter);

	pthread_exit(NULL);
}

void consumer(void *dummy)
{
	int i, data;
	int sem_value;

	printf("===== Consumer started ======\n");

	for (int i = 0;i<LOOP;i++)
	{
		sem_getvalue(&fullSem, &sem_value);
		if (sem_value == 0)
		{
			printf("Consumer: buffer empty. waiting...\n");
			
			sem_wait(&fullSem);
			sem_wait(&mutexSem);
		}

		printf("Consumer: consuming data\n");
		data = Buf.buf[Buf.out];
		Buf.out = (Buf.out + 1) % MAX;
		Buf.counter--;

		sem_post(&mutexSem);
		sem_post(&emptySem);
	}

	printf("Consumer total: %d itmes\n", Buf.counter);

	pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
	pthread_t	tid1, tid2;

	sem_init(&emptySem, 0, MAX);
	sem_init(&fullSem, 0, 0);
	sem_init(&mutexSem, 0, 1);


	if (pthread_creat(&tid1, NULL, (void *)producer, NULL) < 0)
		exit(1);

	if (pthread_creat(&tid2, NULL, (void *)consumer, NULL) < 0)
		exit(1);

	if (pthread_join(tid1, NULL) < 0)
		exit(1);

	if (pthread_join(tid2, NULL) < 0)
		exit(1);

	printf("%d items in buffer\n", Buf.counter);

	sem_destroy(&emptySem);
	sem_destroy(&fullSem);
	sem_destroy(&mutexSem);

	return 0;
}

'STUDY > linux' 카테고리의 다른 글

IPC in LINUX - Shared Memory  (0) 2022.04.04
IPC in LINUX - Message Passing  (0) 2022.04.02
Files and Directories in LINUX  (0) 2022.03.31
SIGNAL in LINUX  (0) 2022.03.29
Process and Threads in LINUX  (0) 2022.03.28