자바 다 중 스 레 드 의 Disruptor 입문

7531 단어 JavaDisruptor
1.Disruptor 소개
Disruptor 는 현재 세계 에서 가장 빠 른 단기 메시지 큐 로 영국 외환 거래 회사 LMAX 가 개발 하고 있 으 며,개발 의 취 지 는 메모리 큐 의 지연 문 제 를 해결 하 는 것 이다(성능 테스트 에서 I/O 조작 과 같은 수량 급 에 있 음 을 발견 했다).디 스 럽 터 를 기반 으로 개발 한 시스템 싱글 스 레 드 는 초당 600 만 주문 을 지탱 할 수 있 으 며 2010 년 큐 콘 강연 이후 업계 의 주목 을 받 았 다.2011 년 에 기업 응용 소프트웨어 전문가 인 Martin Fowler 는 전문 적 으로 장문 소 개 를 썼 다.같은 해 오 라 클 공식 듀 크 대상 도 받 았 다.현재 Apache Storm,Camel,Log4j 2 를 포함 한 많은 유명 프로젝트 들 이 Disruptor 를 사용 하여 고성능 을 얻 고 있다.
2.Disruptor 의 핵심 에 대해 간단히 이야기 합 니 다.
在这里插入图片描述
  
Disruptor 는 링 버 퍼 를 유지 하고 있 습 니 다.이 대기 열 은 본질 적 으로 첫 번 째 로 연 결 된 배열 입 니 다.링크 드 BlockingQueue 에 비해 RingBuffer 의 배열 구 조 는 검색 에 있어 효율 이 높다.또한,LinkedBlockingQueue 는 머리 노드 포인터 head 와 꼬리 노드 포인터 tail 을 유지 해 야 하 며,RingBuffer 는 하나의 sequence 가 다음 사용 가능 한 위 치 를 가리 키 는 것 만 유지 하면 됩 니 다.그래서 이 두 가지 점 에서 링 버 퍼 는 링크 드 블 로 킹 큐 보다 빠르다.
3.Disruptor 사용
3.1 pom.xml

<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.3</version>
        </dependency>
3.2 이벤트 이벤트
Disruptor 는 사건 을 바탕 으로 하 는 생산자 소비자 모델 이다.그 RingBuffer 에 저 장 된 것 은 사실 메 시 지 를 봉인 한 사건 이다.메시지 큐 에 log 형식의 데 이 터 를 저장 하고 있 음 을 나타 내 는 LongEvent 를 정의 합 니 다.

public class LongEvent {
	private long value;

	public void set(long value) {
		this.value = value;
	}

    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                '}';
    }
}
3.3 EventFactory
EventFactory 인 터 페 이 스 를 실현 하고 Event 공장 을 정의 하여 대기 열 을 채 우 는 데 사용 합 니 다.Event 공장 은 사실 Disruptor 의 효율 성 을 높이 기 위해 초기 화 할 때 Event 공장 을 호출 해 RingBuffer 에 메모 리 를 미리 할당 하고 GC 의 빈 도 를 낮 춘 다.

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {
	public LongEvent newInstance() {
		return new LongEvent();
	}
}
3.4 EventHandler
EventHandler 인 터 페 이 스 를 실현 하고 EventHandler(소비자)를 정의 하 며 용기 의 요 소 를 처리 합 니 다.

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {
	public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
		System.out.println("Event: " + event + ", sequence: " + sequence);
	}
}
3.5 Disruptor 원시 API 를 사용 하여 메시지 발표

import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

/**
 *        , Disruptor     
 */
public class LongEventProducer {

    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        //             
        long sequence = ringBuffer.next();
        try {
            //       event
            LongEvent event = ringBuffer.get(sequence);
            //   event  
            event.set(byteBuffer.getLong(0));
        } finally {
            //   
            ringBuffer.publish(sequence);
        }
    }
}

import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        //   event  
        LongEventFactory factory = new LongEventFactory();
        // ringBuffer  
        int bufferSize = 1024;
        //     Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
        //   handler
        disruptor.handleEventsWith(new LongEventHandler());

        //   Disruptor
        disruptor.start();
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (long i = 0; true; i++) {
            byteBuffer.clear();
            byteBuffer.putLong(i);
            //     
            producer.onData(byteBuffer);
            Thread.sleep(1000);
        }
    }
}
3.6 Translators 를 사용 하여 메시지 발표

import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

public class LongEventProducerUsingTranslator {
    private RingBuffer<LongEvent> ringBuffer;
    public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        @Override
        public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
            longEvent.set(byteBuffer.getLong(0));
        }
    };

    public void onData(ByteBuffer byteBuffer) {
        ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
    }
}

import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

/**
 * @author ZhangSheng
 * @date 2021-4-26 14:23
 */
public class TestMain {

    public static void main(String[] args) throws InterruptedException {
        LongEventFactory factory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
        disruptor.handleEventsWith(new LongEventHandler());

        disruptor.start();
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);

        for (long i = 0L; true; i++) {
            byteBuffer.putLong(0, i);
            //   
            producer.onData(byteBuffer);
            Thread.sleep(1000);
        }
    }
}
자바 다 중 스 레 드 의 Disruptor 입문 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.더 많은 자바 Disruptor 입문 내용 은 우리 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 조회 하 시기 바 랍 니 다.앞으로 많은 응원 바 랍 니 다!

좋은 웹페이지 즐겨찾기