Disruptor 를 위 한 간단 하고 실 용적 인. Net 확장


disruptor   사용 자 는 자신의 소비 자 를 봉인 하고 소비 자 를 소비자 용기 에 주입 하 며 소비자 용 기 는 자동 으로 캐 시 대기 열, 생산 자 를 생 성 한다.
 글 에 쓰 이 는 것 disruptor   C \ # 이식 소스 코드
 https://github.com/bingyang001/disruptor-net-3.3.0-alpha
 저자 블 로그 http://www.cnblogs.com/liguo/p/3296166.html
 
 소비자 용기:
/// <summary>
    ///       
    /// </summary>
    /// <typeparam name="TProduct">  </typeparam>
    public class Workers<TProduct> where TProduct : Producer<TProduct>, new()
    {
        private readonly WorkerPool<TProduct> _workerPool;

        public Workers(List<IWorkHandler<TProduct>> handers, IWaitStrategy waitStrategy = null, int bufferSize = 1024*64)
        {
            if (handers == null || handers.Count == 0)
                throw new ArgumentNullException("          !");
            if (handers.Count == 1)
                _ringBuffer = RingBuffer<TProduct>.CreateSingleProducer(() => new TProduct(), bufferSize,
                    waitStrategy ?? new YieldingWaitStrategy());
            else
            {
                _ringBuffer = RingBuffer<TProduct>.CreateMultiProducer(() => new TProduct(), bufferSize,
                    waitStrategy ?? new YieldingWaitStrategy());
            }
            _workerPool = new WorkerPool<TProduct>(_ringBuffer
                , _ringBuffer.NewBarrier()
                , new FatalExceptionHandler()
                , handers.ToArray());
            _ringBuffer.AddGatingSequences(_workerPool.getWorkerSequences());
        }

        public void Start()
        {
            _workerPool.start(TaskScheduler.Default);
        }

        public Producer<TProduct> CreateOneProducer()
        {
            return new Producer<TProduct>(this._ringBuffer);
        } 
        public void DrainAndHalt()
        {
            _workerPool.drainAndHalt();
        }

        private readonly RingBuffer<TProduct> _ringBuffer;
    }

 
  생산자 (제품): 모든 제품 은 생산자 에 게 물 려 받 아야 한다.
/// <summary>
    ///      
    /// </summary>
    /// <typeparam name="TProduct">    </typeparam>
    public class Producer<TProduct> where TProduct:Producer<TProduct>
    {

        long _sequence;
        private RingBuffer<TProduct> _ringBuffer;
        public Producer()
        {
            
        }
        public Producer(RingBuffer<TProduct> ringBuffer )
        {
            _ringBuffer = ringBuffer;
        }
        /// <summary>
        ///         
        /// </summary>
        /// <returns></returns>
        public Producer<TProduct> Enqueue()
        {
            long sequence = _ringBuffer.Next();
            Producer<TProduct> producer = _ringBuffer[sequence];
            producer._sequence = sequence;
            if (producer._ringBuffer == null)
                producer._ringBuffer = _ringBuffer;
            return producer;
        }
        /// <summary>
        ///       
        /// </summary>
        public void Commit()
        {
            _ringBuffer.Publish(_sequence);
        }
    }

 
  --------------------------------------------------------
  이상 구현, 테스트 코드
  먼저 제품 대상 만 들 기:
  
/// <summary>
        ///   /     
        /// </summary>
        public class Product : Producer<Product>
        {
            //           ,   ,            
            public long Value { get; set; }
            public string Guid { get; set; }
        }

소비자 대상 만 들 기
 /// <summary>
        ///       
        /// </summary>
        public class WorkHandler : IWorkHandler<Product>
        {
         
            public void OnEvent(Product @event)
            {
                //Test         (          )
                Test.UpdateCacheByOut(@event.Guid);
                //    ,        

            }

        }

  테스트 코드:
  1 개 이상 의 생산자 대상, 소비자 처리 대상 을 만 들 수 있 습 니 다.많 을 지도 모 르 고 많 을 지도 모 르 고 빠 를 지도 모른다.생산자 에 게 하 나 를 만 들 면 됩 니 다. 다 중 스 레 드 로 생산자 대상 을 조작 하 는 것 을 권장 합 니 다.소비자 대상 은 실제 상황 에 따라 몇 개 를 만 들 수 있 습 니까?
  
           //  2    ,2    , 2      ,    2           
Workers<Product> workers = new Workers<Product>( new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()}); Producer<Product> producerWorkers = workers.CreateOneProducer(); Producer<Product> producerWorkers1 = workers.CreateOneProducer();
//
  workers.Start();

 제품 생산:
 생산 자 를 인용 하 는 모든 곳 에서 제품 을 대열 에 넣 을 수 있 습 니 다. 여기에 대열 에 넣 는 방법 은 평소 와 다 릅 니 다. 여기 서 사용 하 는 것 은 대열 에서 한 자 리 를 가 져 간 후에 제품 을 넣 는 것 이다.구체 적 인 방법 은 생산 자 를 찾 아 제품 대상 을 얻 은 다음 에 제품 속성 을 수정 하고 마지막 으로 수정 을 제출 합 니 다.
  var obj = producer.Enqueue();
           //      
                obj.Commit();


 
  이상 이 핵심 코드 입 니 다:
완전한 테스트 클래스: 테스트 데이터 의 정확성 을 포함 합 니 다. 성능, 정확성 을 검증 하지 않 을 때 초당 ops 1 천만 정도. 
 class Test
    {
        public static long PrePkgInCount = 0;
        public static long PrePkgOutCount = 0;
        public static long PkgInCount = 0;
        public static long PkgOutCount = 0;
        static ConcurrentDictionary<string, string> InCache = new ConcurrentDictionary<string, string>();
        static ConcurrentDictionary<string, string> OutCache = new ConcurrentDictionary<string, string>();
        private static long Seconds;

        static void Main(string[] args)
        {
            Workers<Product> workers = new Workers<Product>(
            new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()});

            Producer<Product> producerWorkers = workers.CreateOneProducer();
            Producer<Product> producerWorkers1 = workers.CreateOneProducer();

            workers.Start();
            Task.Run(delegate
            {
                while (true)
                {
                    Thread.Sleep(1000);
                    Seconds++;
                    long intemp = PkgInCount;
                    long outemp = PkgOutCount;
                    Console.WriteLine(
                        $"In ops={intemp - PrePkgInCount},out ops={outemp - PrePkgOutCount},inCacheCount={InCache.Count},OutCacheCount={OutCache.Count},RunningTime={Seconds}");
                    PrePkgInCount = intemp;
                    PrePkgOutCount = outemp;
                }

            });
            Task.Run(delegate { Run(producerWorkers); });
            Task.Run(delegate { Run(producerWorkers); });
            Task.Run(delegate { Run(producerWorkers1); });
            Console.Read();

        }

        public static void Run(Producer<Product> producer)
        {
            for (int i = 0; i < int.MaxValue; i++)
            {

                var obj = producer.Enqueue();
                CheckRelease(obj as Product);
                obj.Commit();
            }
        }

        public static  void CheckRelease(Product publisher)
        {
            Interlocked.Increment(ref PkgInCount);
            return; //      
            publisher.Guid = Guid.NewGuid().ToString();
            InCache.TryAdd(publisher.Guid, string.Empty);
          
        }

        public static void UpdateCacheByOut(string guid)
        {
            Interlocked.Increment(ref Test.PkgOutCount);
            if (guid != null)
                if (InCache.ContainsKey(guid))
                {
                    string str;
                    InCache.TryRemove(guid, out str);
                }
                else
                {
                    OutCache.TryAdd(guid, string.Empty);
                }

        }
        /// <summary>
        ///   /     
        /// </summary>
        public class Product : Producer<Product>
        {
            //           ,   ,            
            public long Value { get; set; }
            public string Guid { get; set; }
        }

        /// <summary>
        ///       
        /// </summary>
        public class WorkHandler : IWorkHandler<Product>
        {
         
            public void OnEvent(Product @event)
            {

                Test.UpdateCacheByOut(@event.Guid);
                //    ,        

            }

        }
    }

좋은 웹페이지 즐겨찾기