Coding/Unreal, C++

I/O multiplexing: select(), poll(), kqueue()

select()

특징

  • 등록한 fd를 전부 체크해서 이벤트를 알아내야 한다.
  • 커널 공간과 유저 공간 사이에 데이터 복사가 일어난다.
  • 사용이 간편하고 지원 OS가 많아 portability이 높음

struct fd_set

  • select()는 여러 fd를 한꺼번에 관찰하기 위해 fd_set 구조체를 활용한다.
  • fd_set 구조체는 각 fd의 상태를 바트로 표현한다. fd가 겹칠 일은 없으니까 그 fd를 인덱스로 사용해서 해당 fd의 상태를 알아낼 수 있다.
    상상 예시) short state = (fd_set >> fd) & 1;
  • 이렇게 되면 각 fd를 추가하거나 확인할 때마다 귀찮은 비트 연산을 해야한다. 그래서 이를 위한 매크로가 있다.
FD_ZERO(fd_set* set);        //fdset을초기화
FD_SET(int fd, fd_set* set);  //fd를 set에 등록
FD_CLR(int fd, fd_set* set);  //fd를 set에서 삭제
FD_ISSET(int fd, fd_set* set);//fd가 준비되었는지 확인

select()

  • select()의 원형은 다음과 같다.
int select(
	int maxfdNum, // fd 관찰 범위
    fd_set *restrict readfds, // read I/O를 통지받을 fd_set의 주소
    fd_set *restrict writefds, // write I/O를 통지받을 fd_set의 주소
    fd_set *restrict errorfds, // error I/O를 통지받을 fd_set의 주소
    struct timeval *restrict timeout // null이면 무한 대기, 아니면 주어진 시간만큼 대기
);
  • select()를 활용한 예제
...
struct timeval timeout;     //타임 아웃에 사용할 timeval 변수
fd_set reads, cpy_reads;    //read용 FD_SET과 그 사본을 저장할 변수
int fd_max = 0, fd_num = 0; //관찰 범위, 변경된 fd 개수
...
FD_ZERO(&reads);             //reads초기화
FD_SET(server_sock, &reads); //server_socket 등록
max_fd = server_socket;      //server_socket부터 관찰 범위에 추가

while(TRUE){
   cpy_reads = reads;   //FD_SET보존을 위한 복사
   timeout.tv_sec = 5;  //time out 값 설정
   timeout.tv_usec = 5000;

   fd_num = select(fd_max + 1, &cpy_reads, 0, 0, &timeout);  //FD_SET사본으로 select 호출

   if(fd_num == -1)
       break; //에러
   if(fd_num == 0)
       continue; //timeout

   for(int fd = 0; fd < fd_max + 1 ; ++fd)
   {
      if(FD_ISSET(fd, &cpy_reads)) //fd가 준비 완료
      {
         if(i == server_socket) //fd가 서버인 경우  
         {
            //accpet 처리 (FD_SET으로 등록할 것)
         }
         else //fd가 클라이언트 세션인 경우
         {
            //recv및 closesocket 처리 (FD_CLR로 삭제할 것)
         }      
      }
}
close(server_socket);
return 0;

poll()

특징

  • select에 비해 더 많은 정보를 반환해준다.
  • poll()이 multiplexing을 구현한 방법은 select와 동일하다. 정해진 파일 지시자의 이벤트를 기다리다가 이벤트가 발생하면 poll에서 block이 해제된다. 이후 pollfd 구조체를 검사해서 어떤 이벤트가 있었는지 검사해서 처리하는 방식이다.
  • 좀더 low level에 근접한 코드로 시스템 콜이 select()보다 적다.
  • 대신 이식성이 select에 비해 나쁘다.
  • select의 경우, 각 fd마다 3bit의 체크마스크를 쓰는데, poll은 64bit를 써서 접속수가 많으면 성능이 떨어진다.

poll()

  • poll()의 원형은 다음과 같다.
    int poll(struct pollfd *ufds, unsigned int nfds, int timeout);
  • nfds는 pollfd의 배열의 크기를 의미한다.
  • timeout은 이벤트를 기다리는 시간을 의미한다.
    • 값을 지정하지 않으면 영원히 기다린다.
    • 0일 경우는 기다리지 않고 곧바로 다음 루틴을 진행한다.
    • 0보다 크면 해당 시간만큼 기다린다. 해당 시간 안에 어떤 이벤트가 발생하면 즉시 되돌려준다. 시간을 초과할 때까지 이벤트가 없으면 0을 반환한다.
  • 반환값은 에러일 경우 -1, 아니면 revent의 원소 개수를 반환한다.

struct pollfd

  • 결국 poll의 리턴값이라고 할 수 있는 pollfd를 파악하면 poll의 대부분을 이해한 것과 진배없다.
struct pollfd
{
	int fd; // 관심있어하는 파일지시자 (프로그래머가 채워서 넣어줘야 한다)
    short events; // 관심있어하는 이벤트 (채워서 넣어줘야 한다.)
    short revents; // 해당 이벤트를 커널이 처리한 결과 (리턴값)
}
  • 프로그래머는 살펴볼 fd와 그 fd에서 어떤 이벤트를 볼 것인지 세팅한 후에 poll()에 넣어주면 된다. 그러면 poll()은 이벤트가 일어나는지 보다가 일어나면 revents를 채워서 돌려준다.
  •  우리가 설정할 수 있는 events는 <sys/poll.h> 에 정의되어 있고 다음과 같다.
#define POLLIN 0x0001 // 읽을 데이터가 있다.
#define POLLPRI 0x0002 // 급하게 읽을 데이터가 있다.
#define POLLOUT 0x0004 // 쓰기가 봉쇄(block)가 아니다.
#define POLLERR 0x0008 // 에러가 발생했다.
#define POLLHUP 0x0010 // 연결이 끊겼다.
#define POLLNVAL 0x0020 // 잘못된 요청

특징

  • BSD 계열에서 지원하는 Event 관리 system call. linux 계열의 epoll과 비슷하게 동작한다.
  • select()와 poll()을 개선한 버전이라고 이해할 수 있다.
  • select()와 poll()은 등록한 모든 fd를 돌면서 상태 체크를 해야 한다. kqueue()는 이벤트가 발생한 fd에 대한 배열을 리턴해서 모든 fd를 검사할 필요가 없게 해준다. O(N) -> O(1)

kqueue()

#include <sys/time.h>
#include <sys/event.h>
#include <sys/types.h>

int kqueue(void);
  • kqueue()를 통해 커널에 새로운 event queue를 만든다. 그 event queue의 fd를 반환한다. 이 fd를 통해 kqueue()에서 이벤트를 등록, 관리할 수 있다. 이 kqueue()는 fork(2)로 만든 자식 프로세스에 상속되지 않는다.

kevent()

int kevent (
	int kq, // kqueue의 fd
    const struct kevent *changelist, // 등록하고자 하는 이벤트
    int nchanges, // changelist의 길이
    struct kevent *eventlist, // 발생한 이벤트들
    int nevents, // eventlist의 길이
    const struct timespec *timeout // timeout
);
  • kevent()에서 중심이 되는 데이터가 struct kevent다.
struct kevent {
	uintptr_t ident; // 이벤트에 대한 식별자, fd 번호
    int16_t filter; // 이벤트에 대한 식별자, 이벤트의 종류에 해당
    uint16_t flags; // event를 적용시키거나, event가 return 됐을 때 flag
    uint32_t fflags; // filter에 대한 flag
    intptr_t data; // filter에 대한 data
    void *udata; // user data
};
  • ident
    identifier, 이벤트를 식별할 때 사용한다. filter에 따라 결정되는데, 주로 fd다.
  • filter
    이벤트를 식별할 때 사용된다.
  • flags
    어떤 액션을 취할 지 결정한다.
    EV_ADD: 이벤트를 kqueue에 추가한다.
    EV_ENABLE: kevent()가 해당 이벤트가 발동됐을 때, 반환할 수 있도록 한다
    EV_DISABLE: kevent()가 해당 이벤트를 무시하도록 한다.
    참고
  • kevent 구조체를 만들 때는 EV_SET() 매크로를 쓰면 편하다.

kqueue를 이용한 멀티플렉싱 Echo Server

출처

#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>

#include <iostream>
#include <map>
#include <vector>

using namespace std;

void exit_with_perror(const string& msg)
{
    cerr << msg << endl;
    exit(EXIT_FAILURE);
}

void change_events(vector<struct kevent>& change_list, uintptr_t ident, int16_t filter,
        uint16_t flags, uint32_t fflags, intptr_t data, void *udata)
{
    struct kevent temp_event;

    EV_SET(&temp_event, ident, filter, flags, fflags, data, udata);
    change_list.push_back(temp_event);
}

void disconnect_client(int client_fd, map<int, string>& clients)
{
    cout << "client disconnected: " << client_fd << endl;
    close(client_fd);
    clients.erase(client_fd);
}

int main()
{
    /* init server socket and listen */
    int server_socket;
    struct sockaddr_in server_addr;

    if ((server_socket = socket(PF_INET, SOCK_STREAM, 0)) == -1)
        exit_with_perror("socket() error\n" + string(strerror(errno)));

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(8080);
    if (bind(server_socket, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1)
        exit_with_perror("bind() error\n" + string(strerror(errno)));

    if (listen(server_socket, 5) == -1)
        exit_with_perror("listen() error\n" + string(strerror(errno)));
    fcntl(server_socket, F_SETFL, O_NONBLOCK);
    
    /* init kqueue */
    int kq;
    if ((kq = kqueue()) == -1)
        exit_with_perror("kqueue() error\n" + string(strerror(errno)));


    map<int, string> clients; // map for client socket:data
    vector<struct kevent> change_list; // kevent vector for changelist
    struct kevent event_list[8]; // kevent array for eventlist

    /* add event for server socket */
    change_events(change_list, server_socket, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, NULL);
    cout << "echo server started" << endl;

    /* main loop */
    int new_events;
    struct kevent* curr_event;
    while (1)
    {
        /*  apply changes and return new events(pending events) */
        new_events = kevent(kq, &change_list[0], change_list.size(), event_list, 8, NULL);
        if (new_events == -1)
            exit_with_perror("kevent() error\n" + string(strerror(errno)));

        change_list.clear(); // clear change_list for new changes

        for (int i = 0; i < new_events; ++i)
        {
            curr_event = &event_list[i];

            /* check error event return */
            if (curr_event->flags & EV_ERROR)
            {
                if (curr_event->ident == server_socket)
                    exit_with_perror("server socket error");
                else
                {
                    cerr << "client socket error" << endl;
                    disconnect_client(curr_event->ident, clients);
                }
            }
            else if (curr_event->filter == EVFILT_READ)
            {
                if (curr_event->ident == server_socket)
                {
                    /* accept new client */
                    int client_socket;
                    if ((client_socket = accept(server_socket, NULL, NULL)) == -1)
                        exit_with_perror("accept() error\n" + string(strerror(errno)));
                    cout << "accept new client: " << client_socket << endl;
                    fcntl(client_socket, F_SETFL, O_NONBLOCK);

                    /* add event for client socket - add read && write event */
                    change_events(change_list, client_socket, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, NULL);
                    change_events(change_list, client_socket, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL);
                    clients[client_socket] = "";
                }
                else if (clients.find(curr_event->ident)!= clients.end())
                {
                    /* read data from client */
                    char buf[1024];
                    int n = read(curr_event->ident, buf, sizeof(buf));

                    if (n <= 0)
                    {
                        if (n < 0)
                            cerr << "client read error!" << endl;
                        disconnect_client(curr_event->ident, clients);
                    }
                    else
                    {
                        buf[n] = '\0';
                        clients[curr_event->ident] += buf;
                        cout << "received data from " << curr_event->ident << ": " << clients[curr_event->ident] << endl;
                    }
                }
            }
            else if (curr_event->filter == EVFILT_WRITE)
            {
                /* send data to client */
                map<int, string>::iterator it = clients.find(curr_event->ident);
                if (it != clients.end())
                {
                    if (clients[curr_event->ident] != "")
                    {
                        int n;
                        if ((n = write(curr_event->ident, clients[curr_event->ident].c_str(),
                                        clients[curr_event->ident].size()) == -1))
                        {
                            cerr << "client write error!" << endl;
                            disconnect_client(curr_event->ident, clients);  
                        }
                        else
                            clients[curr_event->ident].clear();
                    }
                }
            }
        }

    }
    return (0);
}

참고

 

입출력 다중화 : poll

#include #include #include #include #include #include #include #include #include #include #include // 받아들일수 있는 클라이언트의 크기 #define OPEN_MAX 600 int main(int argc, char **argv) { int server_sockfd, client_sockfd, sockfd; int i, m

www.joinc.co.kr

 

select 와 epoll

select I/O 통지모델의 할아버지 select select는 싱글쓰레드로 다중 I/O를 처리하는 멀티플렉싱 통지모델의 가장 대표적인 방법이다. 해당 파일 디스크립터가 I/O를 할 준비가 되었는지 알 수 있다면,

ozt88.tistory.com

 

[UNIX] I/O Multiplexing을 위한 kqueue 사용법

kqueue는 BSD 계열에서 지원하는 Event 관리 system call로, Linux 계열에서 select를 개선한 epoll과 비슷하게 사용되고 동작한다. 여러 fd를 모니터링하고, fd에 대한 동작(read, write)이 준비되었는지 알아내..

hyeonski.tistory.com