ZMQ 와 Message Pack 의 간단 한 사용
55420 단어 message
이 를 어떻게 사용 하 는 지 설명 하기 위해 사용 장면 을 만 듭 니 다.N 개의 Client,하나의 Server,M 개의 Agent,Client 는 ZMQ 의 요청-응답 모드 와 Server 통신 을 사용 합 니 다.Server 는 Client 의 명령 을 받 은 후 ZMQ 의 게시-구독 모드 를 통 해 각 Agent 와 통신 합 니 다.아래 코드 는 ZMQ 와 Message Pack 을 패키지 하고 사 용 했 습 니 다.간편 하기 위해 저 는 클래스 의 정의 와 실현 을 모두 헤더 파일 에 썼 습 니 다.
1.ZMQ 에 대한 간단 한 패키지:
1 #include"Msgpack.h"
2 #include<zmq.h>
3 #include<string>
4 #include<cassert>
5 #include<iostream>
6
7 namespace Tool
8 {
9 //
10 class Network
11 {
12 public:
13
14 // : 。
15 // : 。
16 // : 。
17 Network() : m_socket(NULL) { }
18
19 // : socket。
20 // :zmqType ZMQ ,address socket 。
21 // :true ,false 。
22 bool Init(int zmqType,const std::string& address)
23 {
24 try
25 {
26 m_socket = zmq_socket(Context,zmqType);
27 return SetSocket(zmqType,address);
28 }
29 catch(...)
30 {
31 std::cout << "Network 。" << std::endl;
32 return false;
33 }
34 }
35
36 // : 。
37 // : Msgpack ,isRelease true 。
38 // :true ,false 。
39 bool SendMessage(Msgpack *msgpack,bool isRelease = true) const
40 {
41 try
42 {
43 zmq_msg_t msg;
44 zmq_msg_init(&msg);
45 if(isRelease)
46 {
47 zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),Tool::Network::Release,msgpack);
48 }
49 else
50 {
51 zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),0,0);
52 }
53 zmq_msg_send(&msg,m_socket,0);
54 return true;
55 }
56 catch(...)
57 {
58 std::cout << "Network 。" << std::endl;
59 return false;
60 }
61 }
62
63 // : 。
64 // : 。
65 // : 。
66 zmq_msg_t* ReceiveMessage() const
67 {
68 zmq_msg_t *reply = NULL;
69 try
70 {
71 reply = new zmq_msg_t();
72 zmq_msg_init(reply);
73 zmq_msg_recv(reply,m_socket,0);
74 return reply;
75 }
76 catch(...)
77 {
78 if( reply != NULL )
79 {
80 delete reply;
81 }
82 return NULL;
83 }
84 }
85
86 // : 。
87 // : 。
88 // : 。
89 void CloseMsg(zmq_msg_t* msg)
90 {
91 try
92 {
93 zmq_msg_close(msg);
94 msg = NULL;
95 }
96 catch(...)
97 {
98 msg = NULL;
99 }
100 }
101
102 // : 。
103 // : 。
104 // : 。
105 ~Network()
106 {
107 if( m_socket != NULL )
108 {
109 zmq_close(m_socket);
110 m_socket = NULL;
111 }
112 }
113
114 private:
115
116 // socket
117 void *m_socket;
118
119 //
120 static void *Context;
121
122 private:
123
124 // : socket。
125 // :zmqType ZMQ ,address socket 。
126 // :true ,false 。
127 bool SetSocket(int zmqType,const std::string& address)
128 {
129 int result = -1;
130 switch(zmqType)
131 {
132 case ZMQ_REP:
133 case ZMQ_PUB:
134 result = zmq_bind(m_socket,address.c_str());
135 break;
136 case ZMQ_REQ:
137 result = zmq_connect(m_socket,address.c_str());
138 break;
139 case ZMQ_SUB:
140 result = zmq_connect(m_socket,address.c_str());
141 assert(result == 0);
142 result = zmq_setsockopt(m_socket,ZMQ_SUBSCRIBE,"",0);
143 break;
144 default:
145 return false;
146 }
147 assert( result == 0 );
148 return true;
149 }
150
151 // : , 。
152 // :function ,hint 。
153 // : 。
154 static void Release(void *function, void *hint)
155 {
156 Msgpack *msgpack = (Msgpack*)hint;
157 if( msgpack != NULL )
158 {
159 delete msgpack;
160 msgpack = NULL;
161 }
162 }
163 };
164
165 // context
166 void *Tool::Network::Context = zmq_ctx_new();
167 };
설명:
(1)zmqctx_new 가 만 든 Context 는 전체 응용 프로그램 이 하 나 를 공유 하면 됩 니 다.구체 적 인 통신 은 zmq 입 니 다.socket 에서 만 든 socket 으로 완 성 됩 니 다.상기 코드 에 Context 가 가리 키 는 자원 을 방출 하지 않 았 습 니 다.
(2)zmqmsg_init_data 함수 의 매개 변 수 는 자원 을 방출 하 는 함수 주 소 를 입력 하고 ZMQ 에서 메 시 지 를 보 낸 후에 이 함수 로 자원 을 방출 해 야 합 니 다.이 인자 가 들 어 오지 않 고 들 어 오 는 정보 가 임시 변수 라면 수신 자가 정 보 를 받 지 못 하고 이상 을 던 질 가능성 이 높다.이 인자 가 들 어 오지 않 으 면 자원 을 스스로 방출 하 는 것 을 기억 해 야 한다.
2.Message Pack 에 대한 간단 한 패키지:
1 #include"BaseMessage.h"
2 #include"ClientMessage.h"
3 #include"ServerMessage.h"
4 #include<zmq.h>
5 #include<msgpack.hpp>
6
7 namespace Tool
8 {
9 using namespace Message;
10
11 // /
12 class Msgpack
13 {
14 public:
15
16 // : 。
17 // : 。
18 // : 。
19 Msgpack(void) { }
20
21 // : 。
22 // : 。
23 // : 。
24 ~Msgpack(void) { }
25
26 // : 。
27 // : 。
28 // :true 。
29 template<typename T>
30 bool Pack(const T& t)
31 {
32 try
33 {
34 Release();
35 msgpack::pack(m_sbuf,t);
36 return true;
37 }
38 catch(...)
39 {
40 std::cout << "Msgpack 。" << std::endl;
41 return false;
42 }
43 }
44
45 // : 。
46 // :zmq 。
47 // : 。
48 BaseMessage* Unpack(zmq_msg_t& msg)
49 {
50 try
51 {
52 int size = zmq_msg_size(&msg);
53 if( size > 0 )
54 {
55 Release();
56 m_sbuf.write((char*)zmq_msg_data(&msg),size);
57 size_t offset = 0;
58 msgpack::zone z;
59 msgpack::object obj;
60 msgpack::unpack(m_sbuf.data(),m_sbuf.size(),&offset,&z,&obj);
61 return GetMessage(obj);
62 }
63 }
64 catch(...)
65 {
66 //
67 }
68 return NULL;
69 }
70
71 // : / 。
72 // : 。
73 // : / 。
74 inline msgpack::sbuffer& GetSbuf()
75 {
76 return m_sbuf;
77 }
78
79 private:
80
81 // /
82 msgpack::sbuffer m_sbuf;
83
84 private:
85
86 // : 。
87 // : 。
88 // : 。
89 void Release()
90 {
91 m_sbuf.clear();
92 m_sbuf.release();
93 }
94
95 // : 。
96 // : msgpack::object。
97 // : 。
98 BaseMessage* GetMessage(const msgpack::object& obj)
99 {
100 BaseMessage bmessage;
101 obj.convert(&bmessage);
102 switch(bmessage.Type)
103 {
104 case 1024:
105 return Convert<ClientMessage>(obj);
106 case 2048:
107 return Convert<ServerMessage>(obj);
108 default:
109 return NULL;
110 }
111 }
112
113 // : 。
114 // : msgpack::object。
115 // : T 。
116 template<typename T>
117 T* Convert(const msgpack::object& obj)
118 {
119 T *t = new T();
120 obj.convert(t);
121 return t;
122 }
123 };
124 };
설명:
압축 시 zmqmsg_t 메시지 체 는 msgpack:sbuffer 에 압축 된 다음 에 이 메시지 체 를 닫 을 수 있 습 니 다.패 킷 을 해제 한 데 이 터 를 구체 적 인 유형 으로 바 꾸 려 면 이 유형 이 어떤 유형 인지 알 아야 합 니 다.여기에 세 가지 방법 이 있 습 니 다.
(1)수신 자 에 게 어떤 메 시 지 를 받 을 지 먼저 메 시 지 를 보 낸 다음 에 수신 자 는 메 시 지 를 해제 한 후에 해당 하 는 클래스 로 전환 할 수 있다.이런 방식 은 추가 적 인 통신 이 필요 하 며 사용 을 권장 하지 않 는 다.
(2)모든 정 보 는 하나의 기본 클래스 에서 계승 되 며,이 기본 클래스 는 메시지 형식의 필드 를 저장 합 니 다.패 킷 을 풀 고 데 이 터 를 기본 클래스 로 변환 한 다음 유형 에 따라 구체 적 인 파생 클래스 로 변환 합 니 다.이런 방식 은 한 번 더 바 꿔 야 하고 위의 코드 도 바로 이런 방식 을 채택 해 야 한다.
(3)가방 을 누 를 때 먼저 메시지 류 를 누 른 다음 에 표지 하 나 를 누 르 면 이 소식 이 어떤 유형의 표지 류 인지,즉 두 번 누 르 는 것 이다.패 킷 을 풀 때 먼저 패 킷 표지 류 를 풀 고 메시지 류 의 구체 적 인 유형 을 알 게 된 다음 에 패 킷 정보 류,즉 패 킷 을 두 번 풀 고 두 번 전환 합 니 다.(2)에 비해 더 많은 압축 해제,하 도 급 작업 을 해 야 하 는 것 외 에 도 하 도 급 의 오프셋 을 계산 해 야 한다.그렇지 않 으 면 실수 하기 쉽다.
3.사 용 된 메시지 종류:
namespace Message
{
//
class BaseMessage
{
public:
MSGPACK_DEFINE(Type);
//
int Type;
//
BaseMessage()
{
Type = 0;
}
};
//
class ClientMessage : public BaseMessage
{
public:
MSGPACK_DEFINE(Type,Information);
//
std::string Information;
//
ClientMessage()
{
Type = 1024;
}
};
//
class ServerMessage : public BaseMessage
{
public:
MSGPACK_DEFINE(Type,Information);
//
std::vector<std::string> Information;
//
ServerMessage()
{
Type = 2048;
}
};
};
설명:
(1)MSPACK_DEFINE 는 어떤 멤버 들 이 압축/해 지 를 할 수 있 는 지 를 표시 했다.파생 류 중의 MSGPACKDEFINE 는 기본 클래스 의 구성원 을 적어 야 합 니 다.그렇지 않 으 면 Message Pack 패키지 에 대한 두 번 째 방법 을 사용 할 수 없습니다.
(2)C++버 전의 Message Pack 압축/해 지 데이터 구성원 은 하나의 클래스,구조 또는 연합 체 일 뿐 포인터(boost 라 이브 러 리 의 스마트 포인터 포함),배열 을 사용 할 수 없고 매 거 진 값 도 적용 되 지 않 습 니 다.따라서 BaseMessage 는 int 값 을 사용 하여 파생 류 가 어떤 유형 에 속 하 는 지 표시 합 니 다.C\#버 전의 Message Pack 은 매 거 진 값 을 압축 할 수 있 습 니 다.
4.Client 의 예제 코드:
1 int _tmain(int argc, _TCHAR* argv[])
2 {
3 Network network;
4 bool result = network.Init(ZMQ_REQ,"tcp://192.168.10.179:8888");
5 if(result)
6 {
7 ClientMessage cmessage;
8 cmessage.Information = "I come form Client.";
9
10 Msgpack msgpack;
11 result = msgpack.Pack<ClientMessage>(cmessage);
12 if(result)
13 {
14 result = network.SendMessageW(&msgpack,false);
15 if(result)
16 {
17 zmq_msg_t *msg = network.ReceiveMessage();
18 if( msg != NULL )
19 {
20 BaseMessage *bmessage = msgpack.Unpack(*msg);
21 network.CloseMsg(msg);
22 if( bmessage != NULL && bmessage->Type == 2048 )
23 {
24 ServerMessage *smessage = static_cast<ServerMessage*>(bmessage);
25 if( smessage != NULL && smessage->Information.size() > 0 )
26 {
27 std::cout << smessage->Information[0] << std::endl;
28 }
29 delete smessage;
30 smessage = NULL;
31 bmessage = NULL;
32 }
33 }
34 }
35 }
36 }
37
38 system("pause");
39 return 0;
40 }
5.Server 의 예제 코드:
1 int _tmain(int argc, _TCHAR* argv[])
2 {
3 Network responder;
4 bool result = responder.Init(ZMQ_REP,"tcp://192.168.10.179:8888");
5 if(result)
6 {
7 Network publisher;
8 result = publisher.Init(ZMQ_PUB,"tcp://192.168.10.179:9999");
9 if(result)
10 {
11 Msgpack msgpack;
12 while(true)
13 {
14 zmq_msg_t *msg = responder.ReceiveMessage();
15 BaseMessage *bmessage = msgpack.Unpack(*msg);
16 responder.CloseMsg(msg);
17
18 ServerMessage smessage;
19 smessage.Information.push_back("I come from Server.");
20 msgpack.Pack<ServerMessage>(smessage);
21 result = responder.SendMessageW(&msgpack,false);
22
23 if( result )
24 {
25 if( bmessage != NULL && bmessage->Type == 1024 )
26 {
27 ClientMessage *cmessage = static_cast<ClientMessage*>(bmessage);
28 if( cmessage != NULL )
29 {
30 std::cout << cmessage->Information << std::endl;
31 for( int counter = 0 ; counter < 100 ; counter++ )
32 {
33 publisher.SendMessageW(&msgpack,false);
34 }
35 }
36 delete cmessage;
37 cmessage = NULL;
38 bmessage = NULL;
39 }
40 }
41 }
42 }
43 }
44
45 return 0;
46 }
6.Agent 의 예제 코드:
int _tmain(int argc, _TCHAR* argv[])
{
Network network;
bool result = network.Init(ZMQ_SUB,"tcp://192.168.10.179:9999");
if(result)
{
zmq_msg_t *msg = network.ReceiveMessage();
if( msg != NULL )
{
Msgpack msgpack;
BaseMessage *bmessage = msgpack.Unpack(*msg);
network.CloseMsg(msg);
if( bmessage->Type == 2048 )
{
ServerMessage *smessage = static_cast<ServerMessage*>(bmessage);
if( smessage->Information.size() > 0 )
{
std::cout << smessage->Information[0] << std::endl;
}
delete smessage;
smessage = NULL;
bmessage = NULL;
}
}
}
system("pause");
return 0;
}
7.이 세 프로그램 을 시작 하면 클 라 이언 트 는 보 낼 메 시 지 를 압축 하여 서버 에 보 냅 니 다.서버 는 메 시 지 를 받 은 후에 클 라 이언 트 에 게 메 시 지 를 피드백 한 다음 에 순환 적 으로 메 시 지 를 에이전트 에 보 냅 니 다.에이전트 는 서버 에 답장 할 필요 가 없습니다.마지막 으로 두 가 지 를 중점적으로 설명 한다.
(1)ZMQ 가 만 든 socket 전송 데이터 와 수신 데 이 터 는 같은 라인 에 있어 야 합 니 다.서버 가 클 라 이언 트 의 데 이 터 를 받 은 후,클 라 이언 트 에 게 정 보 를 피드백 할 수 없 으 며,데 이 터 를 받 은 스 레 드 에서 정 보 를 피드백 해 야 합 니 다.
(2)ZMQ 는 발송 자 와 수신 자 에 게 일정한 시작 순 서 를 요구 하지 않 지만 서버 에서 한 번 의 메시지 만 발표 하면 에이전트 가 정 보 를 받 지 못 할 가능성 이 높다.에이전트 가 먼저 시작 하 든 서버 가 먼저 시작 하 든 에이전트 가 정 보 를 받 지 못 할 수 있 습 니 다.서버 코드 에 서 는 에이전트 가 정 보 를 받 을 수 있 도록 백 번 반복 적 으로 발표 합 니 다.실제 응용 에 서 는 요청-응답 모드 를 결합 하여 구독 자 들 이 게시 자의 메 시 지 를 받 았 음 을 보증 할 수 있 습 니 다.
참고 자료:
ZMQ: http://zguide.zeromq.org/page:all
MessagePack: http://wiki.msgpack.org/pages/viewpage.action?pageId=1081387#QuickStartforC%2B%2B-ImplementationStatus
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Xcode8.x의 메시지 창에 표시되는 시스템 메시지를 숨기기 【Xcode8.x】이런 녀석↓ 메뉴 : Product → Schema → Edit Schema로 추적하고 Argument를 선택. Enviroment Variables 의 + 를 클릭하고 OS_ACTIVITY_MODE 를 추가하여 d...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.