ZeroMQ 는 네트워크 소켓 관리를 위한 라이브러리이다.

이번 프로젝트를 진행하면서 Libevent + ZeroMQ 를 사용하게 되었는데 너무 효과가 좋아서 그 내용을 정리한다.

이곳에 올려진 모든 ZeroMQ 관련 내용은 모두 zguide.zeromq.org 에 있는 내용을 바탕으로 정리해 놓은 것이며, 모든 원문 내용은 해당 사이트에서 무료로 볼 수 있다.

모든 내용은 C 를 기반으로 작성되며, 다른 언어의 경우, 마찬가지로 해당 사이트에서 확인할 수 있다.

먼저 ZeroMQ를 설치하자.

ZeroMQ 설치는 간단하다. Ubuntu 의 경우, 다음의 명령어를 입력하면 된다.

$ sudo apt-get install libzmq

라이브러리 설치가 완료되었으면 곧바로 예제 프로그램을 작성해보자.

예제 프로그램은 당연히 Hello world 프로그램이다.

두 개의 프로그램을 작성한다.  Server/Client.

server 와 client는 서로 tcp/ip 로 통신하며, server 는 프로그램을 시작하면 client 의 접속을 기다린다. 이후, client 가 동작하면 곧바로 server 로 접속을 시도한다.

client 가 server 접속을 하면, “Hello” 메시지를 송신하며, server 는 메시지 수신 이후, “World”라는 응답을 보낸다.

main.c
#include
#include
#include
#include
#include

int main(int argc, char** argv)
{
// Socket to talk to clients
void* context = zmq_ctx_new();
void* responder = zmq_socket(context, ZMQ_REP);
int rc = zmq_bind(responder, "tcp://*:5555");
assert(rc == 0);

while(1)
{
char buffer[10];
zmq_recv(responder, buffer, 10, 0);
printf("Received Hellon");
sleep(1); // Do some 'work'
zmq_send(responder, "World", 5, 0);
}

return 0;
}

Client 도 작성해 주자.

main.c


// Hello World client
#include
#include
#include
#include

int main(int argc, char **argv)
{
printf("Connecting to hello world server...n");
void* context = zmq_ctx_new();
void* requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:5555");

int request_nbr;
for(request_nbr = 0; request_nbr != 10; request_nbr++)
{
char buffer[10];
printf("Sending Hello %d...n", request_nbr);
zmq_send(requester, "Hello", 5, 0);
zmq_recv(requester, buffer, 10, 0);
printf("Received World %dn", request_nbr);
}

zmq_close(requester);
zmq_ctx_destroy(context);

return 0;
}

자, 이제 서버를 먼저 실행하고, Client 를 실행해보자.

Server 실행 결과

Received Hello

Received Hello

Received Hello

Received Hello

Received Hello

Received Hello

Received Hello

Received Hello

Received Hello

Received Hello

Client 실행 결과

Connecting to hello world server…

Sending Hello 0…

Received World 0

Sending Hello 1…

Received World 1

Sending Hello 2…

Received World 2

Sending Hello 3…

Received World 3

Sending Hello 4…

Received World 4

Sending Hello 5…

Received World 5

Sending Hello 6…

Received World 6

Sending Hello 7…

Received World 7

Sending Hello 8…

Received World 8

Sending Hello 9…

Received World 9

자, 이제 내용을 살펴보자. 무슨일이 일어났던 것일까?

ZeroMQ 에서 사용되는 모든 소켓 연결은 패턴으로 결합된다.

패턴이란, 우리가 흔히 사용하는 소켓을 이용한 데이터 전송 방식을 몇가지의 그룹으로 나뉘어 놓은 것을 이야기한다.

한가지 예를 들면, 흔히, 우리가 네트워크 프로그래밍을 한다면 client 가 요청하고 server 응답하는 방식을 생각한다.

즉, client가 먼저 질의(Request)하고, server가 응답(reply)하는 방식인데, 이를 Request-Reply 패턴이라 한다.

REQ-REP

fig2

REQ-REP 소켓 연결은 굉장히 엄격한 통신 방식이다. Client while 루프 안에서 zmq_send() 를 호출하고 zmq_recv 를 호출한다.

만약, zmq_send() 와 zmq_recv 사이에 다른 어떤 Sequence(예를 들어 zmq_send() 를 한번 더 호출한다던지..)하게 되면 해당 호출 결과는 에러처리될 것이다.

Server 역시 마찬가지로, 만약 zmq_recv 와 zmq_send 사이에 다른 Sequence 를 진행하게 되면, 에러로 처리된다.

보통의 소켓 프로그램에서 Server 보다 Client 가 먼저 실행될 경우, 어떤 현상이 나타나게 될지 생각해보자. 그리고 한번 위의 예제를 client 부터 실행을 해보자.

어떻게 되는가? 🙂

ZeroMQ String

ZeroMQ는 오직 전송되는 데이터의 사이즈만 확인한다. 이 말은 어떤 데이터 형식이라도 OK 이지만, 데이터 관리는 개발자 스스로가 해야 한다는 것이다.

가장 대표적인 예로 String 과 관련된 특징이 있다.

다음을 생각해보자.

만약 C 에서 NULL 바이트로 종료되는 문자열을 전송한다고 한다면, 아마 다음과 같이 입력할 수 있을 것이다.

zmq_send(requester, “Hello”, 6, 0);

하지만 만약 다른 언어에서라면 다르게 입력된 것이다. Python 을 예로 들어보자.

socket.send(“Hello”)

이런 경우(Python의 경우), 실제로 전송되는 데이터의 내용은 다음과 같다.

그리고, 만약 C 프로그램에서 이를 수신하게 된다면, 아마도 String 으로 보여지게 될 것이다. 하지만, 위의 그림에서 보듯이 문자열 종료 구분자(NULL)가 없다.

이는 사용자가 ZeroMQ 를 통하여 데이터를 송/수신할 때는 Server/Client 가 데이터 형식에 주의를 기울여야 함을 뜻한다.

fig3그렇다면, ZeroMQ 에서 String 문자열을 받기 위해서는 어떻게 해야 할까?

아래 예제 코드처럼 작성하면 된다.


// Receive 0MQ string from socket and convert into C string
// Chops string at 255 chars, if it's longer
static char* s_recv(
void* socket
)
{
char buffer[256];
int size = zmq_recv(socket, buffer, 255, 0);
if(size == −1)
return NULL;
if(size > 255)
size = 255;
buffer[size] = 0;
return strdup(buffer);
}

앞으로 등장하는 예제들은 대부분 위와 같은 개량된(?) 함수를 사용할 것이다. 위와 같은 함수들을 모아서 별도의 파일로 만든것이 zhelpers.h 파일이다. zhelpers.h 파일을 다운받기를 원한다면 http://zguide.zeromq.org/ 를 방문하면 된다.

Version Reporting

ZeroMQ는 많은 수의 버전을 가지고 있다. 만약, ZeroMQ를 사용하던 도중 문제가 발생했다면, 대부분은 이후의 버전에서 수정되어 있을 것이다. 현재 사용중인(Link 되어 있는) 정확한 ZeroMQ의 버전 정보는 다음의 코드로 확인이 가능하다.


// Report 0MQ version

#include

int main(void)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
printf("Current 0MQ version is %d.%d.%dn", major, minor, patch);
return 0;
}

프로그램을 실행해보면 다음과 같이 나온다.

Current 0MQ version is 3.2.3

Getting the Message Out

두번째로 다룰 패턴은 하나의 Server와 다수개의 Client로 이루어지는 One-way(일방향) 데이터 배포 패턴이다. 예를 들어 우편 번호, 온도, 습도 를 포함하는 날씨 정보를 배포하는 시스템을 만들어보자. 진짜 기상 센터가 하는 것처럼 날씨 정보도 랜덤하게 만들어 보자.

Server쪽 코드이다. 5556 포트를 사용한다.


// Weather update server
// Bind PUB socket to tcp://*:5556
// Publishes random weather updates

#include "zhelpers.h"

int main(int argc, char **argv)
{
// Prepare our context and publisher
void* context = zmq_ctx_new();
void* publisher = zmq_socket(context, ZMQ_PUB);
int rc;

rc = zmq_bind(publisher, "tcp://*:5556");
assert(rc == 0);

rc = zmq_bind(publisher, "ipc://weather.ipc");
assert(rc == 0);

// Initialize random number generator
srandom((unsigned)time(NULL));
while(1)
{
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = randof(100000);
temperature = randof(215) − 80;
relhumidity = randof(50) + 10;

// Send message to all subscribers
char update[20];
sprintf(update, "%05d %d %d", zipcode, temperature, relhumidity);
s_send(publisher, update);
}

zmq_close(publisher);
zmq_ctx_destroy(context);

return 0;

}

시작도 끝도 없는 Broadcasting 이다. 그냥 무조건 데이터를 전송한다.

이제 Client 를 작성해보자. Client의 요지는 모든 데이터에 주의를 기울이되, 특정 zip code(우편번호) 데이터만 수신하는 것이다. 기본으로 뉴욕시의 우편번호를 사용하도록 하겠다. 뉴욕은 뭔가 모험을 시작하기에 좋은 장소이므로.. ㅎㅎ


// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode

#include "zhelpers.h"

int main(int argc, char** argv)
{
// Socket to talk to server
printf("Collecting updates from weather server...n");
void* context = zmq_ctx_new();
void* subscriber = zmq_socket(context, ZMQ_SUB);
int rc;

rc = zmq_connect(subscriber, "tcp://localhost:5556");
assert(rc == 0);

// Subscriber to zipcode, default is NYC, 10001
char* filter = (argc > 1) ? argv[1]:"10001";
rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
assert(rc == 0);

// Process 100 updates
int update_nbr;
long total_temp = 0;
for(update_nbr = 0; update_nbr < 100; update_nbr++) { char* string = s_recv(subscriber); int zipcode, temperature, relhumidity; sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity); printf("[%d]: %sn", update_nbr, string); total_temp += temperature; free(string); } printf("Average temperature for zipcode '%s' was %dFn", filter, (int)(total_temp / update_nbr)); zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }

실행하면 아래와 같이 나온다.

Collecting updates from weather server...

[0]: 10001 -20 32

[1]: 10001 -51 11

[2]: 10001 9 13

...

[96]: 10001 -35 21

[97]: 10001 82 52

[98]: 10001 43 53

[99]: 10001 -39 56

Average temperature for zipcode '10001' was 37F

fig4한가지 염두해두어야 하는 점은 SUB 소켓을 사용할 때는, 반드시 zmq_setsockopt() 와 SUBSCRIBE 지시자를 코드안에 입력하여 이용하여 수신할 데이터를 설정해야 한다는 것이다. 만약 수신할 데이터(subscription)을 지정하지 않는다면, 그 어떤 메시지도 수신할 수 없다. 이는 이제 막 ZeroMQ 를 사용하기 시작하는 사람들이 하는 흔한 실수이다. 하나의 구독 지시자(Subscriber)는 한번에 여러개의 수신할 데이터(Subscription) 지정이 가능하다. 물론, Subscription 지정 취소도 가능하다. 그리고 Subscription 은 반드시 String 이 아니어도 상관없다. 자세한 설정은 zmq_setsockopt() 설명을 참고 바란다.

PUB-SUB 소켓은 비동기로 작동한다. Client 는 그저 zmq_recv() 만을 수행할 뿐이다. 만약 SUB 소켓을 통해 send 를 하고자 한다면, 에러가 발생한다. 그리고 Server 의 경우, zmq_send() 를 통해 데이터 전송만을 할 뿐이다. 마찬가지로, PUB 소켓으로 zmq_recv() 를 한다면 에러가 발생한다.

ZeroMQ에서는 누가 connect 를 수행하고, 누가 bind 를 수행하는지는 중요하지 않다. 하지만 실제적으로는 다른점이 있다. 그건 나중에 다루어보도록 하고... 지금은 bind 는 PUB 가 수행하고, connect 는 SUB가 수행한다고만 알아두자.

PUB-SUB 소켓에 대해서 한가지 더 알아야 할 중요한 사실이 있다. Publisher 에서는 누가 언제부터 구독을 시작했는지 모른다는 점이다. 그리고 만약  Subscriber 를 먼저 시작하고, Publisher 를 나중에 시작했다 하더라도 Publisher 가 전송하는 첫번째 메시지는 항상 수신하지 못할 것이다. Subscriber 가 Publisher 에게 접속을 하고 구독을 시작하는 시간이 매우 짧다고는 하나 0초가 아니기 때문이다. 그리고, 그 짧은 시간동안 Publisher는 소켓으로 이미 데이터를 전송했기 때문이다.

이를 "slow joiner"라고 표한다. ZeroMQ 는 Background 에서 동작하는 비동기 I/O라는 점을 기억하라.

3 Comments on ZeroMQ – 1

  1. 부탁드립니다ㅠㅠ says:

    안녕하세요. 올려주신 예제를 통해 실습을 해보고싶어서 글 남깁니다. 제 화면에선 어떤 헤더파일들이 포함되는지 볼 수 없는데 혹시 알려주실수있으신가요? #include “x” x부분이 아무것도 안나옵니다.
    그리고 궁금한게 있는데 이 예제를 사용하고자 만드는 파일들의 위치는 상관없나요? 아니면 zeromq를 설치한 폴더 내에서 실행해야하나요? ㅠㅠ
    이메일로 답장 주시면 감사하겠습니다. ㅠㅠ

  2. pchero says:

    zhelpers.h 파일을 말씀하시는 것 같습니다. 아래 위치에서 다운로드 가능하세요. 🙂

    https://github.com/imatix/zguide/blob/master/examples/C/zhelpers.h

    그리고, 위 문서는 libzmq-2.x 버전 가이드 문서를 번역했던 것으로 기억합니다. 현재는 libzmq-3.x 버전을 많이 사용하고 있습니다.

    덧붙여, 완벽하지는 않지만 zguide(libzmq-3.x upper) 문서를 아래 링크에서 번역하고 있습니다. 참고하시면 도움이 될것이라 생각합니다. 🙂

    http://wiki.pchero21.com/wiki/Category:Libzmq

  3. 나그네 says:

    Libevent + ZeroMQ 를 같이 사용해서 효과가 좋았다고 하셨는데, 이둘에 대한 경험을 공유해주실수 있을까요? 🙂

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.