c \ # 오픈 소스 메시지 큐 미들웨어 EQueue 튜 토리 얼
11530 단어 C#
프로필
EQueue 는 RocketMQ 를 참조 하여 실 현 된 오픈 소스 메시지 큐 미들웨어 로 모 노 를 호 환 합 니 다. 구체 적 으로 는 작가 의 글 인 'c \ # 작성 한 오픈 소스 분포 식 메시지 큐 eque 공유' 를 참조 할 수 있 습 니 다.프로젝트 원본 주소:https://github.com/tangxuehua/equeue프로젝트 에는 대기 열의 모든 소스 코드 와 어떻게 사용 하 는 지 에 대한 예제 가 포함 되 어 있 습 니 다.
2. EQueue 설치
Producer, Consumer, Broker 는 분포 식 배 치 를 지원 합 니 다. EQueue 를 설치 하려 면. NET 4 가 필요 합 니 다. Visual Studio 2010 / 2012 / 2013. 현재 EQueue 는 클래스 라 이브 러 리 입 니 다. Broker 의 숙주 를 스스로 실현 해 야 합 니 다. QuickStart 를 참조 하여 QuickStart. BrokerServer 프로젝트 를 만 들 수 있 습 니 다. Visual Studio 의 Nuget 을 통 해 eque 를 찾 을 수 있 습 니 다.
using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;
namespace QuickStart.BrokerServer {
class Program {
static void Main(string[] args) {
InitializeEQueue();
var setting = new BrokerSetting();
setting.NotifyWhenMessageArrived = false;
setting.DeleteMessageInterval = 1000;
new BrokerController(setting).Initialize().Start();
Console.ReadLine();
}
static void InitializeEQueue() {
Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
}
}
}
InitializeQueue 방법 은 EQueue 의 환경 을 초기 화하 고 Autofac 를 IOC 용기 로 사용 하 며 log 4 Net 기록 로 그 를 사용 합 니 다. RegisterEQueue Components 방법 을 살 펴 보 겠 습 니 다.
public static class ConfigurationExtensions {
public static Configuration RegisterEQueueComponents(this Configuration configuration) {
configuration.SetDefault < IAllocateMessageQueueStrategy,
AverageAllocateMessageQueueStrategy > ();
configuration.SetDefault < IQueueSelector,
QueueHashSelector > ();
configuration.SetDefault < ILocalOffsetStore,
DefaultLocalOffsetStore > ();
configuration.SetDefault < IMessageStore,
InMemoryMessageStore > ();
configuration.SetDefault < IMessageService,
MessageService > ();
configuration.SetDefault < IOffsetManager,
InMemoryOffsetManager > ();
return configuration;
}
}
코드 에 6 개의 구성 요소 가 포함 되 어 있 습 니 다.IAllocateMessageQueueStrategy
IQueueSelector
ILocalOffsetStore
IMessageStore
IMessageService
IOffsetManager
deleteMessage Interval 이 속성 은 eque 의 정시 삭제 간격 을 설정 하 는 데 사 용 됩 니 다. 단 위 는 밀리초 이 고 기본 값 은 한 시간 입 니 다.또한 Producer SocketSetting 과 Consumer SocketSetting 은 각각 Producer 연결 Broker 와 Consumer 연결 Broker 의 IP 와 포트 를 설정 하 는 데 사용 되 며 기본 포트 는 5000 과 5001 이다.
public class BrokerSetting {
public SocketSetting ProducerSocketSetting {
get;
set;
}
public SocketSetting ConsumerSocketSetting {
get;
set;
}
public bool NotifyWhenMessageArrived {
get;
set;
}
public int DeleteMessageInterval {
get;
set;
}
public BrokerSetting() {
ProducerSocketSetting = new SocketSetting {
Address = SocketUtils.GetLocalIPV4().ToString(),
Port = 5000,
Backlog = 5000
};
ConsumerSocketSetting = new SocketSetting {
Address = SocketUtils.GetLocalIPV4().ToString(),
Port = 5001,
Backlog = 5000
};
NotifyWhenMessageArrived = true;
DeleteMessageInterval = 1000 * 60 * 60;
}
}
프로젝트 를 실행 합 니 다. 다음 과 같은 내용 을 표시 하면 Broker 시작 성공 을 설명 합 니 다.2014-03-23 20:10:30,255 INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]
3. Visual Studio 에서 개발 테스트
1. VS 프로젝트 QuickStart. ProducerClient 를 만 들 고 Nuget 을 통 해 EQueue 를 참조 하여 다음 Producer 코드 를 작성 합 니 다.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;
namespace QuickStart.ProducerClient {
class Program {
static void Main(string[] args) {
InitializeEQueue();
var scheduleService = ObjectContainer.Resolve < IScheduleService > ();
var producer = new Producer().Start();
var total = 1000;
var parallelCount = 10;
var finished = 0;
var messageIndex = 0;
var watch = Stopwatch.StartNew();
var action = new Action(() = >{
for (var index = 1; index <= total; index++) {
var message = "message" + Interlocked.Increment(ref messageIndex);
producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask = >{
var finishedCount = Interlocked.Increment(ref finished);
if (finishedCount % 1000 == 0) {
Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
}
});
}
});
var actions = new List < Action > ();
for (var index = 0; index < parallelCount; index++) {
actions.Add(action);
}
Parallel.Invoke(actions.ToArray());
Console.ReadLine();
}
static void InitializeEQueue() {
Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
}
}
}
Producer 대상 은 사용 하기 전에 Start 초기 화 를 호출해 야 합 니 다. 한 번 초기 화하 면 됩 니 다. 주의: 메 시 지 를 보 낼 때마다 Start 방법 을 호출해 서 는 안 됩 니 다.Producer 는 기본적으로 이 컴퓨터 에 연 결 된 5000 포트 를 ProducerSetting 을 통 해 설정 할 수 있 으 며 아래 코드 를 참조 할 수 있 습 니 다.
public class ProducerSetting {
public string BrokerAddress {
get;
set;
}
public int BrokerPort {
get;
set;
}
public int SendMessageTimeoutMilliseconds {
get;
set;
}
public int UpdateTopicQueueCountInterval {
get;
set;
}
public ProducerSetting() {
BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
BrokerPort = 5000;
SendMessageTimeoutMilliseconds = 1000 * 10;
UpdateTopicQueueCountInterval = 1000 * 5;
}
2. VS 프로젝트 QuickStart. ConsumerClient 를 만 들 고 Nuget 을 통 해 EQueue 를 참조 하여 다음 Consumer 코드 를 작성 합 니 다.
using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;
namespace QuickStart.ConsumerClient {
class Program {
static void Main(string[] args) {
InitializeEQueue();
var messageHandler = new MessageHandler();
var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);
Console.WriteLine("Start consumer load balance, please wait for a moment.");
var scheduleService = ObjectContainer.Resolve < IScheduleService > ();
var waitHandle = new ManualResetEvent(false);
var taskId = scheduleService.ScheduleTask(() = >{
var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x = >x.QueueId);
var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x = >x.QueueId);
var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x = >x.QueueId);
var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x = >x.QueueId);
if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1) {
Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}", string.Join(",", c1AllocatedQueueIds), string.Join(",", c2AllocatedQueueIds), string.Join(",", c3AllocatedQueueIds), string.Join(",", c4AllocatedQueueIds)));
waitHandle.Set();
}
},
1000, 1000);
waitHandle.WaitOne();
scheduleService.ShutdownTask(taskId);
Console.ReadLine();
}
static void InitializeEQueue() {
Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
}
}
class MessageHandler: IMessageHandler {
private int _handledCount;
public void Handle(QueueMessage message, IMessageContext context) {
var count = Interlocked.Increment(ref _handledCount);
if (count % 1000 == 0) {
Console.WriteLine("Total handled {0} messages.", count);
}
context.OnMessageHandled(message);
}
}
}
사용 방식 은 사용자 에 게 메시지 가 EQueue 서버 에서 응용 클 라 이언 트 로 밀 려 온 것 처럼 느껴 집 니 다.그러나 실제 Consumer 내 부 는 긴 폴 링 풀 방식 으로 EQueue 서버 에서 메 시 지 를 끌 어 낸 다음 사용자 Listener 방법 을 되 돌려 줍 니 다.Consumer 는 기본적으로 이 컴퓨터 에 연 결 된 5001 포트 를 Consumer Setting 을 통 해 설정 할 수 있 습 니 다. 아래 코드 를 참조 할 수 있 습 니 다.
public class ConsumerSetting {
public string BrokerAddress {
get;
set;
}
public int BrokerPort {
get;
set;
}
public int RebalanceInterval {
get;
set;
}
public int UpdateTopicQueueCountInterval {
get;
set;
}
public int HeartbeatBrokerInterval {
get;
set;
}
public int PersistConsumerOffsetInterval {
get;
set;
}
public PullRequestSetting PullRequestSetting {
get;
set;
}
public MessageModel MessageModel {
get;
set;
}
public MessageHandleMode MessageHandleMode {
get;
set;
}
public ConsumerSetting() {
BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
BrokerPort = 5001;
RebalanceInterval = 1000 * 5;
HeartbeatBrokerInterval = 1000 * 5;
UpdateTopicQueueCountInterval = 1000 * 5;
PersistConsumerOffsetInterval = 1000 * 5;
PullRequestSetting = new PullRequestSetting();
MessageModel = MessageModel.Clustering;
MessageHandleMode = MessageHandleMode.Parallel;
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
WebView2를 Visual Studio 2017 Express에서 사용할 수 있을 때까지Evergreen .Net Framework SDK 4.8 VisualStudio2017에서 NuGet을 사용하기 때문에 패키지 관리 방법을 packages.config 대신 PackageReference를 사용해야...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.