disruptor DB 를 이용 하여 대량 저장
4611 단어 자바
package com.cloudeye.core.disruptor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import com.cloudeye.common.bean.monitor.MonitorDataBean;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class CoreMonitorDataRing {
private final static Logger logger = LoggerFactory.getLogger(CoreMonitorDataRing.class);
private static CoreMonitorDataRing instance;
private static int RING_SIZE = 1024 * 8; // , 8192
private RingBuffer ringBufferDB;
private CoreSaveMonitorDataHandler saveMetricDataHandler; //
public CoreMonitorDataRing() {
initRing();
}
public static CoreMonitorDataRing getInstance() {
if (instance == null) {
instance = new CoreMonitorDataRing();
}
return instance;
}
@SuppressWarnings("unchecked")
private void initRing() {
try {
saveMetricDataHandler = new CoreSaveMonitorDataHandler();
//BlockingWaitStrategy,SleepingWaitStrategy,YieldingWaitStrategy
Disruptor disruptorDB = new Disruptor<>(EVENT_FACTORY, RING_SIZE,
new LimitedThreadFactory(), ProducerType.MULTI, new SleepingWaitStrategy());
disruptorDB.handleEventsWith(saveMetricDataHandler);
ringBufferDB = disruptorDB.start();
} catch (Exception e) {
logger.error("init handler fail!", e);
System.exit(1);
}
}
public void publish(MonitorDataBean metricData) {
long sequence = ringBufferDB.next();
try {
MonitorDataBean ringValue = ringBufferDB.get(sequence);
BeanUtils.copyProperties(metricData, ringValue);
} finally {
ringBufferDB.publish(sequence);
}
}
public final EventFactory EVENT_FACTORY =
new EventFactory() {
public MonitorDataBean newInstance() {
return new MonitorDataBean();
}
};
public class LimitedThreadFactory implements ThreadFactory {
private final AtomicInteger count = new AtomicInteger(0);
public Thread newThread(Runnable r) {
if (count.compareAndSet(0, 2)) {
return new Thread(r);
} else {
throw new IllegalStateException("Created more that one thread");
}
}
}
}
CoreSaveMonitorDataHandler:
package com.cloudeye.core.disruptor;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudeye.common.bean.monitor.MonitorDataBean;
import com.cloudeye.common.util.COMNSpringUtil;
import com.cloudeye.core.service.CoreMonitorDataService;
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
public class CoreSaveMonitorDataHandler implements EventHandler {
private final static Logger logger = LoggerFactory.getLogger(CoreSaveMonitorDataHandler.class);
private final static int DB_BATCH_SIZE = 50;
private final static int RING_BATCH_SIZE = 1024;
private List cache = Lists.newArrayList();
private CoreMonitorDataService monitorDataService = COMNSpringUtil.getBean(CoreMonitorDataService.class);
@Override
public void onEvent(MonitorDataBean value, long sequence, boolean endOfBatch)
throws Exception {
try {
saveMetricData(value, sequence, endOfBatch);
} catch (Exception e) {
logger.error("Exception ", e);
}
}
private void saveMetricData(MonitorDataBean value, long sequence, boolean endOfBatch) {
cache.add(value);
if ((sequence + 1) % DB_BATCH_SIZE == 0) {
monitorDataService.batchSave(cache);
cache.clear();
}
if (endOfBatch) {
if ((sequence + 1) % RING_BATCH_SIZE != 0) {
monitorDataService.batchSave(cache);
cache.clear();
}
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.