Disruptor 를 위 한 간단 하고 실 용적 인. Net 확장
disruptor 사용 자 는 자신의 소비 자 를 봉인 하고 소비 자 를 소비자 용기 에 주입 하 며 소비자 용 기 는 자동 으로 캐 시 대기 열, 생산 자 를 생 성 한다.
글 에 쓰 이 는 것 disruptor C \ # 이식 소스 코드
https://github.com/bingyang001/disruptor-net-3.3.0-alpha
저자 블 로그 http://www.cnblogs.com/liguo/p/3296166.html
소비자 용기:
/// <summary>
///
/// </summary>
/// <typeparam name="TProduct"> </typeparam>
public class Workers<TProduct> where TProduct : Producer<TProduct>, new()
{
private readonly WorkerPool<TProduct> _workerPool;
public Workers(List<IWorkHandler<TProduct>> handers, IWaitStrategy waitStrategy = null, int bufferSize = 1024*64)
{
if (handers == null || handers.Count == 0)
throw new ArgumentNullException(" !");
if (handers.Count == 1)
_ringBuffer = RingBuffer<TProduct>.CreateSingleProducer(() => new TProduct(), bufferSize,
waitStrategy ?? new YieldingWaitStrategy());
else
{
_ringBuffer = RingBuffer<TProduct>.CreateMultiProducer(() => new TProduct(), bufferSize,
waitStrategy ?? new YieldingWaitStrategy());
}
_workerPool = new WorkerPool<TProduct>(_ringBuffer
, _ringBuffer.NewBarrier()
, new FatalExceptionHandler()
, handers.ToArray());
_ringBuffer.AddGatingSequences(_workerPool.getWorkerSequences());
}
public void Start()
{
_workerPool.start(TaskScheduler.Default);
}
public Producer<TProduct> CreateOneProducer()
{
return new Producer<TProduct>(this._ringBuffer);
}
public void DrainAndHalt()
{
_workerPool.drainAndHalt();
}
private readonly RingBuffer<TProduct> _ringBuffer;
}
생산자 (제품): 모든 제품 은 생산자 에 게 물 려 받 아야 한다.
/// <summary>
///
/// </summary>
/// <typeparam name="TProduct"> </typeparam>
public class Producer<TProduct> where TProduct:Producer<TProduct>
{
long _sequence;
private RingBuffer<TProduct> _ringBuffer;
public Producer()
{
}
public Producer(RingBuffer<TProduct> ringBuffer )
{
_ringBuffer = ringBuffer;
}
/// <summary>
///
/// </summary>
/// <returns></returns>
public Producer<TProduct> Enqueue()
{
long sequence = _ringBuffer.Next();
Producer<TProduct> producer = _ringBuffer[sequence];
producer._sequence = sequence;
if (producer._ringBuffer == null)
producer._ringBuffer = _ringBuffer;
return producer;
}
/// <summary>
///
/// </summary>
public void Commit()
{
_ringBuffer.Publish(_sequence);
}
}
--------------------------------------------------------
이상 구현, 테스트 코드
먼저 제품 대상 만 들 기:
/// <summary>
/// /
/// </summary>
public class Product : Producer<Product>
{
// , ,
public long Value { get; set; }
public string Guid { get; set; }
}
소비자 대상 만 들 기
/// <summary>
///
/// </summary>
public class WorkHandler : IWorkHandler<Product>
{
public void OnEvent(Product @event)
{
//Test ( )
Test.UpdateCacheByOut(@event.Guid);
// ,
}
}
테스트 코드:
1 개 이상 의 생산자 대상, 소비자 처리 대상 을 만 들 수 있 습 니 다.많 을 지도 모 르 고 많 을 지도 모 르 고 빠 를 지도 모른다.생산자 에 게 하 나 를 만 들 면 됩 니 다. 다 중 스 레 드 로 생산자 대상 을 조작 하 는 것 을 권장 합 니 다.소비자 대상 은 실제 상황 에 따라 몇 개 를 만 들 수 있 습 니까?
// 2 ,2 , 2 , 2
Workers<Product> workers = new Workers<Product>(
new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()});
Producer<Product> producerWorkers = workers.CreateOneProducer();
Producer<Product> producerWorkers1 = workers.CreateOneProducer();
//
workers.Start();
제품 생산:
생산 자 를 인용 하 는 모든 곳 에서 제품 을 대열 에 넣 을 수 있 습 니 다. 여기에 대열 에 넣 는 방법 은 평소 와 다 릅 니 다. 여기 서 사용 하 는 것 은 대열 에서 한 자 리 를 가 져 간 후에 제품 을 넣 는 것 이다.구체 적 인 방법 은 생산 자 를 찾 아 제품 대상 을 얻 은 다음 에 제품 속성 을 수정 하고 마지막 으로 수정 을 제출 합 니 다.
var obj = producer.Enqueue();
//
obj.Commit();
이상 이 핵심 코드 입 니 다:
완전한 테스트 클래스: 테스트 데이터 의 정확성 을 포함 합 니 다. 성능, 정확성 을 검증 하지 않 을 때 초당 ops 1 천만 정도.
class Test
{
public static long PrePkgInCount = 0;
public static long PrePkgOutCount = 0;
public static long PkgInCount = 0;
public static long PkgOutCount = 0;
static ConcurrentDictionary<string, string> InCache = new ConcurrentDictionary<string, string>();
static ConcurrentDictionary<string, string> OutCache = new ConcurrentDictionary<string, string>();
private static long Seconds;
static void Main(string[] args)
{
Workers<Product> workers = new Workers<Product>(
new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()});
Producer<Product> producerWorkers = workers.CreateOneProducer();
Producer<Product> producerWorkers1 = workers.CreateOneProducer();
workers.Start();
Task.Run(delegate
{
while (true)
{
Thread.Sleep(1000);
Seconds++;
long intemp = PkgInCount;
long outemp = PkgOutCount;
Console.WriteLine(
$"In ops={intemp - PrePkgInCount},out ops={outemp - PrePkgOutCount},inCacheCount={InCache.Count},OutCacheCount={OutCache.Count},RunningTime={Seconds}");
PrePkgInCount = intemp;
PrePkgOutCount = outemp;
}
});
Task.Run(delegate { Run(producerWorkers); });
Task.Run(delegate { Run(producerWorkers); });
Task.Run(delegate { Run(producerWorkers1); });
Console.Read();
}
public static void Run(Producer<Product> producer)
{
for (int i = 0; i < int.MaxValue; i++)
{
var obj = producer.Enqueue();
CheckRelease(obj as Product);
obj.Commit();
}
}
public static void CheckRelease(Product publisher)
{
Interlocked.Increment(ref PkgInCount);
return; //
publisher.Guid = Guid.NewGuid().ToString();
InCache.TryAdd(publisher.Guid, string.Empty);
}
public static void UpdateCacheByOut(string guid)
{
Interlocked.Increment(ref Test.PkgOutCount);
if (guid != null)
if (InCache.ContainsKey(guid))
{
string str;
InCache.TryRemove(guid, out str);
}
else
{
OutCache.TryAdd(guid, string.Empty);
}
}
/// <summary>
/// /
/// </summary>
public class Product : Producer<Product>
{
// , ,
public long Value { get; set; }
public string Guid { get; set; }
}
/// <summary>
///
/// </summary>
public class WorkHandler : IWorkHandler<Product>
{
public void OnEvent(Product @event)
{
Test.UpdateCacheByOut(@event.Guid);
// ,
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.