c \ # 오픈 소스 메시지 큐 미들웨어 EQueue 튜 토리 얼

11530 단어 C#
다음으로 이동:http://www.2cto.com/kf/201403/287817.html
프로필
 
EQueue 는 RocketMQ 를 참조 하여 실 현 된 오픈 소스 메시지 큐 미들웨어 로 모 노 를 호 환 합 니 다. 구체 적 으로 는 작가 의 글 인 'c \ # 작성 한 오픈 소스 분포 식 메시지 큐 eque 공유' 를 참조 할 수 있 습 니 다.프로젝트 원본 주소:https://github.com/tangxuehua/equeue프로젝트 에는 대기 열의 모든 소스 코드 와 어떻게 사용 하 는 지 에 대한 예제 가 포함 되 어 있 습 니 다.
 
2. EQueue 설치
 
Producer, Consumer, Broker 는 분포 식 배 치 를 지원 합 니 다. EQueue 를 설치 하려 면. NET 4 가 필요 합 니 다. Visual Studio 2010 / 2012 / 2013. 현재 EQueue 는 클래스 라 이브 러 리 입 니 다. Broker 의 숙주 를 스스로 실현 해 야 합 니 다. QuickStart 를 참조 하여 QuickStart. BrokerServer 프로젝트 를 만 들 수 있 습 니 다. Visual Studio 의 Nuget 을 통 해 eque 를 찾 을 수 있 습 니 다.
using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.BrokerServer {
    class Program {
        static void Main(string[] args) {
            InitializeEQueue();
            var setting = new BrokerSetting();
            setting.NotifyWhenMessageArrived = false;
            setting.DeleteMessageInterval = 1000;
            new BrokerController(setting).Initialize().Start();
            Console.ReadLine();
        }

        static void InitializeEQueue() {
            Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
        }
    }
}

InitializeQueue 방법 은 EQueue 의 환경 을 초기 화하 고 Autofac 를 IOC 용기 로 사용 하 며 log 4 Net 기록 로 그 를 사용 합 니 다. RegisterEQueue Components 방법 을 살 펴 보 겠 습 니 다.
public static class ConfigurationExtensions {
    public static Configuration RegisterEQueueComponents(this Configuration configuration) {
        configuration.SetDefault < IAllocateMessageQueueStrategy,
        AverageAllocateMessageQueueStrategy > ();
        configuration.SetDefault < IQueueSelector,
        QueueHashSelector > ();
        configuration.SetDefault < ILocalOffsetStore,
        DefaultLocalOffsetStore > ();
        configuration.SetDefault < IMessageStore,
        InMemoryMessageStore > ();
        configuration.SetDefault < IMessageService,
        MessageService > ();
        configuration.SetDefault < IOffsetManager,
        InMemoryOffsetManager > ();
        return configuration;
    }
}
코드 에 6 개의 구성 요소 가 포함 되 어 있 습 니 다.
IAllocateMessageQueueStrategy
IQueueSelector
ILocalOffsetStore
IMessageStore
IMessageService
IOffsetManager
deleteMessage Interval 이 속성 은 eque 의 정시 삭제 간격 을 설정 하 는 데 사 용 됩 니 다. 단 위 는 밀리초 이 고 기본 값 은 한 시간 입 니 다.또한 Producer SocketSetting 과 Consumer SocketSetting 은 각각 Producer 연결 Broker 와 Consumer 연결 Broker 의 IP 와 포트 를 설정 하 는 데 사용 되 며 기본 포트 는 5000 과 5001 이다.
public class BrokerSetting {
    public SocketSetting ProducerSocketSetting {
        get;
        set;
    }
    public SocketSetting ConsumerSocketSetting {
        get;
        set;
    }
    public bool NotifyWhenMessageArrived {
        get;
        set;
    }
    public int DeleteMessageInterval {
        get;
        set;
    }

    public BrokerSetting() {
        ProducerSocketSetting = new SocketSetting {
            Address = SocketUtils.GetLocalIPV4().ToString(),
            Port = 5000,
            Backlog = 5000
        };
        ConsumerSocketSetting = new SocketSetting {
            Address = SocketUtils.GetLocalIPV4().ToString(),
            Port = 5001,
            Backlog = 5000
        };
        NotifyWhenMessageArrived = true;
        DeleteMessageInterval = 1000 * 60 * 60;
    }
}
프로젝트 를 실행 합 니 다. 다음 과 같은 내용 을 표시 하면 Broker 시작 성공 을 설명 합 니 다.
2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]
3. Visual Studio 에서 개발 테스트
1. VS 프로젝트 QuickStart. ProducerClient 를 만 들 고 Nuget 을 통 해 EQueue 를 참조 하여 다음 Producer 코드 를 작성 합 니 다.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ProducerClient {
    class Program {
        static void Main(string[] args) {
            InitializeEQueue();

            var scheduleService = ObjectContainer.Resolve < IScheduleService > ();
            var producer = new Producer().Start();
            var total = 1000;
            var parallelCount = 10;
            var finished = 0;
            var messageIndex = 0;
            var watch = Stopwatch.StartNew();

            var action = new Action(() = >{
                for (var index = 1; index <= total; index++) {
                    var message = "message" + Interlocked.Increment(ref messageIndex);
                    producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask = >{
                        var finishedCount = Interlocked.Increment(ref finished);
                        if (finishedCount % 1000 == 0) {
                            Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
                        }
                    });
                }
            });

            var actions = new List < Action > ();
            for (var index = 0; index < parallelCount; index++) {
                actions.Add(action);
            }

            Parallel.Invoke(actions.ToArray());

            Console.ReadLine();
        }

        static void InitializeEQueue() {
            Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
        }
    }
}

Producer 대상 은 사용 하기 전에 Start 초기 화 를 호출해 야 합 니 다. 한 번 초기 화하 면 됩 니 다. 주의: 메 시 지 를 보 낼 때마다 Start 방법 을 호출해 서 는 안 됩 니 다.Producer 는 기본적으로 이 컴퓨터 에 연 결 된 5000 포트 를 ProducerSetting 을 통 해 설정 할 수 있 으 며 아래 코드 를 참조 할 수 있 습 니 다.
public class ProducerSetting {
    public string BrokerAddress {
        get;
        set;
    }
    public int BrokerPort {
        get;
        set;
    }
    public int SendMessageTimeoutMilliseconds {
        get;
        set;
    }
    public int UpdateTopicQueueCountInterval {
        get;
        set;
    }

    public ProducerSetting() {
        BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
        BrokerPort = 5000;
        SendMessageTimeoutMilliseconds = 1000 * 10;
        UpdateTopicQueueCountInterval = 1000 * 5;
    }

2. VS 프로젝트 QuickStart. ConsumerClient 를 만 들 고 Nuget 을 통 해 EQueue 를 참조 하여 다음 Consumer 코드 를 작성 합 니 다.
using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ConsumerClient {
    class Program {
        static void Main(string[] args) {
            InitializeEQueue();

            var messageHandler = new MessageHandler();
            var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);

            Console.WriteLine("Start consumer load balance, please wait for a moment.");
            var scheduleService = ObjectContainer.Resolve < IScheduleService > ();
            var waitHandle = new ManualResetEvent(false);
            var taskId = scheduleService.ScheduleTask(() = >{
                var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x = >x.QueueId);
                var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x = >x.QueueId);
                var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x = >x.QueueId);
                var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x = >x.QueueId);
                if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1) {
                    Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}", string.Join(",", c1AllocatedQueueIds), string.Join(",", c2AllocatedQueueIds), string.Join(",", c3AllocatedQueueIds), string.Join(",", c4AllocatedQueueIds)));
                    waitHandle.Set();
                }
            },
            1000, 1000);

            waitHandle.WaitOne();
            scheduleService.ShutdownTask(taskId);

            Console.ReadLine();
        }

        static void InitializeEQueue() {
            Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();
        }
    }

    class MessageHandler: IMessageHandler {
        private int _handledCount;

        public void Handle(QueueMessage message, IMessageContext context) {
            var count = Interlocked.Increment(ref _handledCount);
            if (count % 1000 == 0) {
                Console.WriteLine("Total handled {0} messages.", count);
            }
            context.OnMessageHandled(message);
        }
    }
}

사용 방식 은 사용자 에 게 메시지 가 EQueue 서버 에서 응용 클 라 이언 트 로 밀 려 온 것 처럼 느껴 집 니 다.그러나 실제 Consumer 내 부 는 긴 폴 링 풀 방식 으로 EQueue 서버 에서 메 시 지 를 끌 어 낸 다음 사용자 Listener 방법 을 되 돌려 줍 니 다.Consumer 는 기본적으로 이 컴퓨터 에 연 결 된 5001 포트 를 Consumer Setting 을 통 해 설정 할 수 있 습 니 다. 아래 코드 를 참조 할 수 있 습 니 다.
public class ConsumerSetting {
    public string BrokerAddress {
        get;
        set;
    }
    public int BrokerPort {
        get;
        set;
    }
    public int RebalanceInterval {
        get;
        set;
    }
    public int UpdateTopicQueueCountInterval {
        get;
        set;
    }
    public int HeartbeatBrokerInterval {
        get;
        set;
    }
    public int PersistConsumerOffsetInterval {
        get;
        set;
    }
    public PullRequestSetting PullRequestSetting {
        get;
        set;
    }
    public MessageModel MessageModel {
        get;
        set;
    }
    public MessageHandleMode MessageHandleMode {
        get;
        set;
    }

    public ConsumerSetting() {
        BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
        BrokerPort = 5001;
        RebalanceInterval = 1000 * 5;
        HeartbeatBrokerInterval = 1000 * 5;
        UpdateTopicQueueCountInterval = 1000 * 5;
        PersistConsumerOffsetInterval = 1000 * 5;
        PullRequestSetting = new PullRequestSetting();
        MessageModel = MessageModel.Clustering;
        MessageHandleMode = MessageHandleMode.Parallel;
    }

좋은 웹페이지 즐겨찾기