ZMQ 와 Message Pack 의 간단 한 사용

55420 단어 message
최근 에 비교적 복잡 한 프로젝트 를 만 들 었 는데 그 중에서 오픈 소스 소프트웨어 인 ZMQ 와 Message Pack 을 사용 했다.ZMQ 는 바 텀 네트워크 통신 을 패키지 하여 메시지 처리 큐 라 이브 러 리 로 사용 하기에 매우 편리 하 다.Message Pack 은 바 이 너 리 를 기반 으로 하 는 대상 직렬 화 라 이브 러 리 로 언어 간 특성 을 가지 고 있어 사용 하기 쉽다.내 가 하 는 프로젝트 에서 메시지 류 는 Message Pack 을 통 해 압축 을 한 다음 에 ZMQ 의 메시지 구조 체 를 기록 하고 ZMQ 를 통 해 전달 하 며 마지막 에 수신 자 는 Message Pack 을 이용 하여 압축 을 풀 어 명령 을 분석한다.제 가 영어 수준 이 정말 높 지 않 기 때문에 저 는 그들의 설명 문 서 를 읽 어서 그들 을 이해 하지 못 했 고 그들의 예시 코드 를 통 해 탐색 만 했 습 니 다.이 로 인해 풀 리 지 않 는 문제 에 부 딪 혔 지만 이런 방식 은 나 에 게 많은 시간 을 절약 해 주 었 다.하지만 영 어 를 잘 하 는 사람 에 게 는 설명 문 서 를 읽 고 알 아야 한다.
이 를 어떻게 사용 하 는 지 설명 하기 위해 사용 장면 을 만 듭 니 다.N 개의 Client,하나의 Server,M 개의 Agent,Client 는 ZMQ 의 요청-응답 모드 와 Server 통신 을 사용 합 니 다.Server 는 Client 의 명령 을 받 은 후 ZMQ 의 게시-구독 모드 를 통 해 각 Agent 와 통신 합 니 다.아래 코드 는 ZMQ 와 Message Pack 을 패키지 하고 사 용 했 습 니 다.간편 하기 위해 저 는 클래스 의 정의 와 실현 을 모두 헤더 파일 에 썼 습 니 다.
1.ZMQ 에 대한 간단 한 패키지:
  1 #include"Msgpack.h"

  2 #include<zmq.h>

  3 #include<string>

  4 #include<cassert>

  5 #include<iostream>

  6 

  7 namespace Tool

  8 {

  9     //     

 10     class Network

 11     {

 12     public:

 13 

 14         //    :    。

 15         //    : 。

 16         //    : 。

 17         Network() : m_socket(NULL) { }

 18 

 19         //    :   socket。

 20         //    :zmqType  ZMQ   ,address  socket       。

 21         //    :true       ,false    。

 22         bool Init(int zmqType,const std::string& address)

 23         {

 24             try

 25             {

 26                 m_socket = zmq_socket(Context,zmqType);

 27                 return SetSocket(zmqType,address);

 28             }

 29             catch(...)

 30             {

 31                 std::cout << "Network     。" << std::endl;

 32                 return false;

 33             }

 34         }

 35 

 36         //    :    。

 37         //    :  Msgpack   ,isRelease   true             。

 38         //    :true      ,false      。

 39         bool SendMessage(Msgpack *msgpack,bool isRelease = true) const

 40         {

 41             try

 42             {

 43                 zmq_msg_t msg;

 44                 zmq_msg_init(&msg);

 45                 if(isRelease)

 46                 {

 47                     zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),Tool::Network::Release,msgpack);

 48                 }

 49                 else

 50                 {

 51                     zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),0,0);

 52                 }

 53                 zmq_msg_send(&msg,m_socket,0);

 54                 return true;

 55             }

 56             catch(...)

 57             {

 58                 std::cout << "Network    。" << std::endl;

 59                 return false;

 60             }

 61         }

 62 

 63         //    :    。

 64         //    : 。

 65         //    :       。

 66         zmq_msg_t* ReceiveMessage() const

 67         {

 68             zmq_msg_t *reply = NULL;

 69             try

 70             {

 71                 reply = new zmq_msg_t();

 72                 zmq_msg_init(reply);

 73                 zmq_msg_recv(reply,m_socket,0);

 74                 return reply;

 75             }

 76             catch(...)

 77             {

 78                 if( reply != NULL )

 79                 {

 80                     delete reply;

 81                 }

 82                 return NULL;

 83             }

 84         }

 85 

 86         //    :    。

 87         //    :       。

 88         //    : 。

 89         void CloseMsg(zmq_msg_t* msg)

 90         {

 91             try

 92             {

 93                 zmq_msg_close(msg);

 94                 msg = NULL;

 95             }

 96             catch(...)

 97             {

 98                 msg = NULL;

 99             }

100         }

101 

102         //    :    。

103         //    : 。

104         //    : 。

105         ~Network()

106         {

107             if( m_socket != NULL )

108             {

109                 zmq_close(m_socket);

110                 m_socket = NULL;

111             }

112         }

113 

114     private:

115 

116         //  socket

117         void *m_socket;

118 

119         //    

120         static void *Context;

121 

122     private:

123 

124         //    :  socket。

125         //    :zmqType  ZMQ   ,address  socket       。

126         //    :true      ,false      。

127         bool SetSocket(int zmqType,const std::string& address)

128         {

129             int result = -1;

130             switch(zmqType)

131             {

132             case ZMQ_REP:

133             case ZMQ_PUB:

134                 result = zmq_bind(m_socket,address.c_str());

135                 break;

136             case ZMQ_REQ:

137                 result = zmq_connect(m_socket,address.c_str());

138                 break;

139             case ZMQ_SUB:

140                 result = zmq_connect(m_socket,address.c_str());

141                 assert(result == 0);

142                 result = zmq_setsockopt(m_socket,ZMQ_SUBSCRIBE,"",0);

143                 break;

144             default:

145                 return false;

146             }

147             assert( result == 0 );

148             return true;

149         }

150 

151         //    :      ,      。

152         //    :function     ,hint          。

153         //    : 。

154         static void Release(void *function, void *hint)

155         {

156             Msgpack *msgpack = (Msgpack*)hint;

157             if( msgpack != NULL )

158             {

159                 delete msgpack;

160                 msgpack = NULL;

161             }

162         }

163     };

164 

165     //        context

166     void *Tool::Network::Context = zmq_ctx_new();

167 };

설명:
(1)zmqctx_new 가 만 든 Context 는 전체 응용 프로그램 이 하 나 를 공유 하면 됩 니 다.구체 적 인 통신 은 zmq 입 니 다.socket 에서 만 든 socket 으로 완 성 됩 니 다.상기 코드 에 Context 가 가리 키 는 자원 을 방출 하지 않 았 습 니 다.
(2)zmqmsg_init_data 함수 의 매개 변 수 는 자원 을 방출 하 는 함수 주 소 를 입력 하고 ZMQ 에서 메 시 지 를 보 낸 후에 이 함수 로 자원 을 방출 해 야 합 니 다.이 인자 가 들 어 오지 않 고 들 어 오 는 정보 가 임시 변수 라면 수신 자가 정 보 를 받 지 못 하고 이상 을 던 질 가능성 이 높다.이 인자 가 들 어 오지 않 으 면 자원 을 스스로 방출 하 는 것 을 기억 해 야 한다.
2.Message Pack 에 대한 간단 한 패키지:
  1 #include"BaseMessage.h"

  2 #include"ClientMessage.h"

  3 #include"ServerMessage.h"

  4 #include<zmq.h>

  5 #include<msgpack.hpp>

  6 

  7 namespace Tool

  8 {

  9     using namespace Message;

 10 

 11     //  /     

 12     class Msgpack

 13     {

 14     public:

 15 

 16         //    :    。

 17         //    : 。

 18         //    : 。

 19         Msgpack(void) { }

 20 

 21         //    :    。

 22         //    : 。

 23         //    : 。

 24         ~Msgpack(void) { }

 25 

 26         //    :    。

 27         //    :      。

 28         //    :true      。

 29         template<typename T>

 30         bool Pack(const T& t)

 31         {

 32             try

 33             {

 34                 Release();

 35                 msgpack::pack(m_sbuf,t);

 36                 return true;

 37             }

 38             catch(...)

 39             {

 40                 std::cout << "Msgpack      。" << std::endl;

 41                 return false;

 42             }

 43         }

 44 

 45         //    :    。

 46         //    :zmq   。

 47         //    :           。

 48         BaseMessage* Unpack(zmq_msg_t& msg)

 49         {

 50             try

 51             {

 52                 int size = zmq_msg_size(&msg);

 53                 if( size > 0 )

 54                 {

 55                     Release();

 56                     m_sbuf.write((char*)zmq_msg_data(&msg),size);

 57                     size_t offset = 0;

 58                     msgpack::zone z;

 59                     msgpack::object obj;

 60                     msgpack::unpack(m_sbuf.data(),m_sbuf.size(),&offset,&z,&obj);

 61                     return GetMessage(obj);

 62                 }

 63             }

 64             catch(...)

 65             {

 66                 //    

 67             }

 68             return NULL;

 69         }

 70 

 71         //    :    /    。

 72         //    : 。

 73         //    :  /    。

 74         inline msgpack::sbuffer& GetSbuf()

 75         {

 76             return m_sbuf;

 77         }

 78 

 79     private:

 80 

 81         //  /    

 82         msgpack::sbuffer m_sbuf;

 83 

 84     private:

 85 

 86         //    :          。

 87         //    : 。

 88         //    : 。

 89         void Release()

 90         {

 91             m_sbuf.clear();

 92             m_sbuf.release();

 93         }

 94         

 95         //    :    。

 96         //    :     msgpack::object。

 97         //    :         。

 98         BaseMessage* GetMessage(const msgpack::object& obj)

 99         {

100             BaseMessage bmessage;

101             obj.convert(&bmessage);

102             switch(bmessage.Type)

103             {

104             case 1024:

105                 return Convert<ClientMessage>(obj);

106             case 2048:

107                 return Convert<ServerMessage>(obj);

108             default:

109                 return NULL;

110             }

111         }

112 

113         //    :              。

114         //    :     msgpack::object。

115         //    :  T   。

116         template<typename T>

117         T* Convert(const msgpack::object& obj)

118         {

119             T *t = new T();

120             obj.convert(t);

121             return t;

122         }

123     };

124 };

설명:
압축 시 zmqmsg_t 메시지 체 는 msgpack:sbuffer 에 압축 된 다음 에 이 메시지 체 를 닫 을 수 있 습 니 다.패 킷 을 해제 한 데 이 터 를 구체 적 인 유형 으로 바 꾸 려 면 이 유형 이 어떤 유형 인지 알 아야 합 니 다.여기에 세 가지 방법 이 있 습 니 다.
(1)수신 자 에 게 어떤 메 시 지 를 받 을 지 먼저 메 시 지 를 보 낸 다음 에 수신 자 는 메 시 지 를 해제 한 후에 해당 하 는 클래스 로 전환 할 수 있다.이런 방식 은 추가 적 인 통신 이 필요 하 며 사용 을 권장 하지 않 는 다.
(2)모든 정 보 는 하나의 기본 클래스 에서 계승 되 며,이 기본 클래스 는 메시지 형식의 필드 를 저장 합 니 다.패 킷 을 풀 고 데 이 터 를 기본 클래스 로 변환 한 다음 유형 에 따라 구체 적 인 파생 클래스 로 변환 합 니 다.이런 방식 은 한 번 더 바 꿔 야 하고 위의 코드 도 바로 이런 방식 을 채택 해 야 한다.
(3)가방 을 누 를 때 먼저 메시지 류 를 누 른 다음 에 표지 하 나 를 누 르 면 이 소식 이 어떤 유형의 표지 류 인지,즉 두 번 누 르 는 것 이다.패 킷 을 풀 때 먼저 패 킷 표지 류 를 풀 고 메시지 류 의 구체 적 인 유형 을 알 게 된 다음 에 패 킷 정보 류,즉 패 킷 을 두 번 풀 고 두 번 전환 합 니 다.(2)에 비해 더 많은 압축 해제,하 도 급 작업 을 해 야 하 는 것 외 에 도 하 도 급 의 오프셋 을 계산 해 야 한다.그렇지 않 으 면 실수 하기 쉽다.
3.사 용 된 메시지 종류:
namespace Message

{

    //    

    class  BaseMessage

    {

    public:



        MSGPACK_DEFINE(Type);



        //    

        int Type;



        //      

        BaseMessage()

        {

            Type = 0;

        }

    };



    //        

    class ClientMessage : public BaseMessage

    {

    public:



        MSGPACK_DEFINE(Type,Information);



        //  

        std::string Information;



        //      

        ClientMessage()

        {

            Type = 1024;

        }

    };



    //        

    class ServerMessage : public BaseMessage

    {

    public:



        MSGPACK_DEFINE(Type,Information);



        //  

        std::vector<std::string> Information;



        //      

        ServerMessage()

        {

            Type = 2048;

        }

    };

}; 

설명:
  (1)MSPACK_DEFINE 는 어떤 멤버 들 이 압축/해 지 를 할 수 있 는 지 를 표시 했다.파생 류 중의 MSGPACKDEFINE 는 기본 클래스 의 구성원 을 적어 야 합 니 다.그렇지 않 으 면 Message Pack 패키지 에 대한 두 번 째 방법 을 사용 할 수 없습니다.
(2)C++버 전의 Message Pack 압축/해 지 데이터 구성원 은 하나의 클래스,구조 또는 연합 체 일 뿐 포인터(boost 라 이브 러 리 의 스마트 포인터 포함),배열 을 사용 할 수 없고 매 거 진 값 도 적용 되 지 않 습 니 다.따라서 BaseMessage 는 int 값 을 사용 하여 파생 류 가 어떤 유형 에 속 하 는 지 표시 합 니 다.C\#버 전의 Message Pack 은 매 거 진 값 을 압축 할 수 있 습 니 다.
4.Client 의 예제 코드:
 1 int _tmain(int argc, _TCHAR* argv[])

 2 {

 3     Network network;

 4     bool result = network.Init(ZMQ_REQ,"tcp://192.168.10.179:8888");

 5     if(result)

 6     {

 7         ClientMessage cmessage;

 8         cmessage.Information = "I come form Client.";

 9 

10         Msgpack msgpack;

11         result = msgpack.Pack<ClientMessage>(cmessage);

12         if(result)

13         {

14             result = network.SendMessageW(&msgpack,false);

15             if(result)

16             {

17                 zmq_msg_t *msg = network.ReceiveMessage();

18                 if( msg != NULL )

19                 {

20                     BaseMessage *bmessage = msgpack.Unpack(*msg);

21                     network.CloseMsg(msg);

22                     if( bmessage != NULL && bmessage->Type == 2048 )

23                     {

24                         ServerMessage *smessage = static_cast<ServerMessage*>(bmessage);

25                         if( smessage != NULL && smessage->Information.size() > 0 )

26                         {

27                             std::cout << smessage->Information[0] << std::endl;

28                         }

29                         delete smessage;

30                         smessage = NULL;

31                         bmessage = NULL;

32                     }

33                 }

34             }

35         }

36     }

37 

38     system("pause");

39     return 0;

40 }

5.Server 의 예제 코드:
 1 int _tmain(int argc, _TCHAR* argv[])

 2 {

 3     Network responder;

 4     bool result = responder.Init(ZMQ_REP,"tcp://192.168.10.179:8888");

 5     if(result)

 6     {

 7         Network publisher;

 8         result = publisher.Init(ZMQ_PUB,"tcp://192.168.10.179:9999");

 9         if(result)

10         {

11             Msgpack msgpack;

12             while(true)

13             {

14                 zmq_msg_t *msg = responder.ReceiveMessage();

15                 BaseMessage *bmessage = msgpack.Unpack(*msg);

16                 responder.CloseMsg(msg);

17 

18                 ServerMessage smessage;

19                 smessage.Information.push_back("I come from Server.");

20                 msgpack.Pack<ServerMessage>(smessage);

21                 result = responder.SendMessageW(&msgpack,false);

22 

23                 if( result )

24                 {

25                     if( bmessage != NULL && bmessage->Type == 1024 )

26                     {

27                         ClientMessage *cmessage = static_cast<ClientMessage*>(bmessage);

28                         if( cmessage != NULL )

29                         {

30                             std::cout << cmessage->Information << std::endl;

31                             for( int counter = 0 ; counter < 100 ; counter++ )

32                             {

33                                 publisher.SendMessageW(&msgpack,false);

34                             }

35                         }

36                         delete cmessage;

37                         cmessage = NULL;

38                         bmessage = NULL;

39                     }

40                 }

41             }

42         }

43     }

44 

45     return 0;

46 }

6.Agent 의 예제 코드:
int _tmain(int argc, _TCHAR* argv[])

{

    Network network;

    bool result = network.Init(ZMQ_SUB,"tcp://192.168.10.179:9999");

    if(result)

    {

        zmq_msg_t *msg = network.ReceiveMessage();

        if( msg != NULL )

        {

            Msgpack msgpack;

            BaseMessage *bmessage = msgpack.Unpack(*msg);

            network.CloseMsg(msg);

            if( bmessage->Type == 2048 )

            {

                ServerMessage *smessage = static_cast<ServerMessage*>(bmessage);

                if( smessage->Information.size() > 0 )

                {

                    std::cout << smessage->Information[0] << std::endl;

                }

                delete smessage;

                smessage = NULL;

                bmessage = NULL;

            }

        }

    }



    system("pause");

    return 0;

}

7.이 세 프로그램 을 시작 하면 클 라 이언 트 는 보 낼 메 시 지 를 압축 하여 서버 에 보 냅 니 다.서버 는 메 시 지 를 받 은 후에 클 라 이언 트 에 게 메 시 지 를 피드백 한 다음 에 순환 적 으로 메 시 지 를 에이전트 에 보 냅 니 다.에이전트 는 서버 에 답장 할 필요 가 없습니다.마지막 으로 두 가 지 를 중점적으로 설명 한다.
(1)ZMQ 가 만 든 socket 전송 데이터 와 수신 데 이 터 는 같은 라인 에 있어 야 합 니 다.서버 가 클 라 이언 트 의 데 이 터 를 받 은 후,클 라 이언 트 에 게 정 보 를 피드백 할 수 없 으 며,데 이 터 를 받 은 스 레 드 에서 정 보 를 피드백 해 야 합 니 다.
(2)ZMQ 는 발송 자 와 수신 자 에 게 일정한 시작 순 서 를 요구 하지 않 지만 서버 에서 한 번 의 메시지 만 발표 하면 에이전트 가 정 보 를 받 지 못 할 가능성 이 높다.에이전트 가 먼저 시작 하 든 서버 가 먼저 시작 하 든 에이전트 가 정 보 를 받 지 못 할 수 있 습 니 다.서버 코드 에 서 는 에이전트 가 정 보 를 받 을 수 있 도록 백 번 반복 적 으로 발표 합 니 다.실제 응용 에 서 는 요청-응답 모드 를 결합 하여 구독 자 들 이 게시 자의 메 시 지 를 받 았 음 을 보증 할 수 있 습 니 다.
 
 참고 자료:
ZMQ: http://zguide.zeromq.org/page:all
MessagePack: http://wiki.msgpack.org/pages/viewpage.action?pageId=1081387#QuickStartforC%2B%2B-ImplementationStatus

좋은 웹페이지 즐겨찾기