Esper 입문 안내: 3. Esper 바 텀 데이터 구조 특징 에 대한 분석, 데이터 의 입, 출
22111 단어 Esper-cep
package com.doctor.esper.reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.doctor.esper.common.EsperUtil;
import com.doctor.esper.tutorial.OrderEvent;
import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.soda.StreamSelector;
/**
* Chapter 3. Processing Model Filters and Where-clauses
*
*
* 1.esper , jstorm 。esper , Statement
* ( Statement 、 、 ).
*
* @author doctor
*
* @time 2015 6 1 2:11:12
*/
public class Chapter3ProcessingModelFiltersAndWhereClauses {
private static final Logger log = LoggerFactory.getLogger(Chapter3ProcessingModelFiltersAndWhereClauses.class);
public static void main(String[] args) {
Configuration configuration = new Configuration();
configuration.addEventTypeAutoName("com.doctor.esper.tutorial");
// 。
configuration.getEngineDefaults().getStreamSelection().setDefaultStreamSelector(StreamSelector.RSTREAM_ISTREAM_BOTH);
EPServiceProvider epServiceProvider = EPServiceProviderManager.getDefaultProvider(configuration);
// 3.2. Insert Stream
// EPStatement OrderEvent。 esper OrderEvent OrderEvent ,
// epser EPStatement 。
log.info("{msg:'3.2. Insert Stream'}");
String expression = "select * from OrderEvent";
EPStatement epStatement = epServiceProvider.getEPAdministrator().createEPL(expression);
epStatement.addListener(Chapter3ProcessingModelFiltersAndWhereClauses::update);
OrderEvent orderEvent = new OrderEvent("shirt", 75.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("aaa", 35.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("bbb", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
log.info("{list:{}}", EsperUtil.get(epStatement));
// : EPStatement , 。newEvents
// list , ? , 。
// 06-01 14:43:01.648 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"shirt","price":75.5}}
// 06-01 14:43:01.744 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"aaa","price":35.5}}
// 06-01 14:43:01.745 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"bbb","price":85.5}}
// {list:[]}
// 3.3. Insert and Remove Stream , FIFO , N
log.info("{msg:'3.3. Insert and Remove Stream'}");
epServiceProvider.getEPAdministrator().destroyAllStatements();
epServiceProvider.removeAllStatementStateListeners();
expression = "select * from OrderEvent.win:length(1)";
epStatement = epServiceProvider.getEPAdministrator().createEPL(expression);
epStatement.addListener(Chapter3ProcessingModelFiltersAndWhereClauses::update);
orderEvent = new OrderEvent("1", 75.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("2", 35.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("3", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("4", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
log.info("{list:{}}", EsperUtil.get(epStatement));
// :EPStatement 2. 、 。newEvents ,oldEvents EPStatement
// 。
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"1","price":75.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"2","price":35.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {oldEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"1","price":75.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"3","price":85.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {oldEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"2","price":35.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"4","price":85.5}}
// 06-01 15:22:20.552 main INFO c.d.e.r.Chapter3ProcessingModel - {oldEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"3","price":85.5}}
// {list:[BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent
// bean={"itemName":"4","price":85.5}]}
// 3.4. Filters and Where-clauses 。
// Filters=》
// Where-clauses have ==》 , 。
log.info("{msg:'3.4. Filters and Where-clauses'}");
epServiceProvider.getEPAdministrator().destroyAllStatements();
epServiceProvider.removeAllStatementStateListeners();
expression = "select * from OrderEvent(price > 50 ).win:length(3)";
epStatement = epServiceProvider.getEPAdministrator().createEPL(expression);
epStatement.addListener(Chapter3ProcessingModelFiltersAndWhereClauses::update);
orderEvent = new OrderEvent("1", 75.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("2", 35.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("3", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("4", 5.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
log.info("{list:{}}", EsperUtil.get(epStatement));
// 06-01 15:32:24.301 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"4","price":185.5}}
log.info("{msg:'3.4. Where-clauses'}");
epServiceProvider.getEPAdministrator().destroyAllStatements();
epServiceProvider.removeAllStatementStateListeners();
expression = "select * from OrderEvent.win:length(3) where price > 50 ";
epStatement = epServiceProvider.getEPAdministrator().createEPL(expression);
epStatement.addListener(Chapter3ProcessingModelFiltersAndWhereClauses::update);
orderEvent = new OrderEvent("1", 75.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("2", 35.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("3", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("4", 5.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
log.info("{list:{}}", EsperUtil.get(epStatement));
}
public static void update(EventBean[] newEvents, EventBean[] oldEvents) {
if (newEvents != null && newEvents[0] != null) {
log.info("{newEvents:{}}", newEvents[0]);
}
if (oldEvents != null && oldEvents[0] != null) {
log.info("{oldEvents:{}}", oldEvents[0]);
}
}
}
해석:
1、
configuration.addEventTypeAutoName("com.doctor.esper.tutorial");
위의 설정 은 EPL 표현 식 에서 이벤트 이름 의 약 자 를 위 한 것 입 니 다.
예 를 들 어 select * from OrderEvent, OrderEvent 는 전체 경 로 를 쓰 지 않 아 도 됩 니 다. com. doctor. esper. tutorial. OrderEvent, Mybatis 에서 xml 설정 중의 sql 에 도 이러한 설정 이 있 는 것 이 생각 나 지 않 습 니까?
2、
configuration.getEngineDefaults().getStreamSelection().setDefaultStreamSelector(StreamSelector.RSTREAM_ISTREAM_BOTH);
이 설정 은 주로 데이터 가 Esper 메모리 에서 삭제 되 는 것 을 관찰 하기 위해 서 이 며, 모니터, 구독 자 를 촉발 할 수도 있 습 니 다.기본 설정 은 데이터 가 메모리 구조 에 들 어 갈 때 만 실 행 됩 니 다.
3、
// 3.2. Insert Stream
// EPStatement OrderEvent。 esper OrderEvent OrderEvent ,
// epser EPStatement 。
log.info("{msg:'3.2. Insert Stream'}");
String expression = "select * from OrderEvent";
EPStatement epStatement = epServiceProvider.getEPAdministrator().createEPL(expression);
epStatement.addListener(Chapter3ProcessingModelFiltersAndWhereClauses::update);
OrderEvent orderEvent = new OrderEvent("shirt", 75.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("aaa", 35.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("bbb", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
log.info("{list:{}}", EsperUtil.get(epStatement));
// : EPStatement , 。newEvents
// list , ? , 。
// 06-01 14:43:01.648 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"shirt","price":75.5}}
// 06-01 14:43:01.744 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"aaa","price":35.5}}
// 06-01 14:43:01.745 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"bbb","price":85.5}}
// {list:[]}
EPL 에서 select * from OrderEvent, 이 이벤트 (메모리 데이터 구조) 는 이벤트 가 유지 되 지 않 고 이벤트 가 들 어 갈 때 감청 기 를 터치 하지만 데 이 터 는 저장 되 지 않 습 니 다.이 메모리 구조 에 들 어가 지 않 으 면 데이터 삭제 도 상 관 없 이 이벤트 삭제 도 없다.위의 출력 내용 을 보면 세 개의 주문 사건 을 보 냈 고 모니터 도 새로운 사건 의 도착 을 출력 했다.
4、
// 3.3. Insert and Remove Stream , FIFO , N
log.info("{msg:'3.3. Insert and Remove Stream'}");
epServiceProvider.getEPAdministrator().destroyAllStatements();
epServiceProvider.removeAllStatementStateListeners();
expression = "select * from OrderEvent.win:length(1)";
epStatement = epServiceProvider.getEPAdministrator().createEPL(expression);
epStatement.addListener(Chapter3ProcessingModelFiltersAndWhereClauses::update);
orderEvent = new OrderEvent("1", 75.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("2", 35.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("3", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("4", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
log.info("{list:{}}", EsperUtil.get(epStatement));
// :EPStatement 2. 、 。newEvents ,oldEvents EPStatement
// 。
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"1","price":75.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"2","price":35.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {oldEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"1","price":75.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"3","price":85.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {oldEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"2","price":35.5}}
// 06-01 15:22:20.551 main INFO c.d.e.r.Chapter3ProcessingModel - {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"4","price":85.5}}
// 06-01 15:22:20.552 main INFO c.d.e.r.Chapter3ProcessingModel - {oldEvents:BeanEventBean eventType=BeanEventType name=OrderEvent
// clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"3","price":85.5}}
// {list:[BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent
// bean={"itemName":"4","price":85.5}]}
이제 우 리 는 select 로 * OrderEvent. win: length (1) 에서 최신 이벤트 만 저장 하 는 것 을 정의 합 니 다 (창 보기 길 이 는 1).위의 출력 결 과 를 보면 FIFO 캐 시 처럼 새로운 이벤트 가 도착 하면 오래된 이벤트 가 버 려 진 다 는 것 을 알 수 있다.
5、
// 3.4. Filters and Where-clauses 。
// Filters=》
// Where-clauses have ==》 , 。
log.info("{msg:'3.4. Filters and Where-clauses'}");
epServiceProvider.getEPAdministrator().destroyAllStatements();
epServiceProvider.removeAllStatementStateListeners();
expression = "select * from OrderEvent(price > 50 ).win:length(3)";
epStatement = epServiceProvider.getEPAdministrator().createEPL(expression);
epStatement.addListener(Chapter3ProcessingModelFiltersAndWhereClauses::update);
orderEvent = new OrderEvent("1", 75.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("2", 35.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("3", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("4", 5.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
log.info("{list:{}}", EsperUtil.get(epStatement));
// {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"1","price":75.5}}
// {newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"3","price":85.5}}
// {list:[BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"1","price":75.5}, BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"3","price":85.5}]}
창 Filters 로 정의 할 수 있 는 이벤트 흐름 (메모리 구조) 을 정의 할 수 있 습 니 다.
예 를 들 어 OrderEvent (price > 50). win: length (3).주문 서 는 50 개 단위 이상 만 이 사건 흐름 에 들 어 갈 수 있다.1, 3 만 들 어 갔 고 마지막 으로 에 스 퍼 가 준 조회 로 두 사건 만 밝 혀 냈 다.이벤트 가 정 의 된 이벤트 흐름 (메모리 구조) 에 들 어 갔 습 니 다. 당연히 모니터 를 터치 하고 새로운 이벤트 가 도 착 했 습 니 다.
EsperUtil :
package com.doctor.esper.common;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPOnDemandPreparedQuery;
import com.espertech.esper.client.EPOnDemandPreparedQueryParameterized;
import com.espertech.esper.client.EPOnDemandQueryResult;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.SafeIterator;
/**
* @author docotr
*
* @time 2015 6 1 4:28:33
*/
public enum EsperUtil {
;
public static List get(EPStatement epStatement) {
List list = new ArrayList<>();
SafeIterator safeIterator = epStatement.safeIterator();
try {
while (safeIterator.hasNext()) {
list.add(safeIterator.next());
}
} catch (Throwable e) {
safeIterator.close();
e.printStackTrace();
} finally {
safeIterator.close();
}
return list;
}
/**
* @see 15.2. The Service Provider Interface
*
* EPServiceProvider esper 。 esper ( )。
* administrative and runtime interface.
* , EPServiceProviderManager getDefaultProvider getProvider(String providerURI)。
* providerURI, 。EPServiceProviderManager providerURI ,
* 。
*
*
* @param config
* @return
*/
public static EPServiceProvider esperConfig(String config) {
Configuration configuration = new Configuration();
configuration.configure(EsperUtil.class.getClassLoader().getResource(config));
return EPServiceProviderManager.getDefaultProvider(configuration);
}
public static List executeQuery(EPServiceProvider epServiceProvider, String epl) {
EPOnDemandQueryResult result = epServiceProvider.getEPRuntime().executeQuery(epl);
return Stream.of(result.getArray()).collect(Collectors.toList());
}
public static List prepareQuery(EPServiceProvider epServiceProvider, String epl) {
EPOnDemandPreparedQuery preparedQuery = epServiceProvider.getEPRuntime().prepareQuery(epl);
EPOnDemandQueryResult result = preparedQuery.execute();
return Stream.of(result.getArray()).collect(Collectors.toList());
}
public static List prepareQueryWithParameters(EPServiceProvider epServiceProvider, String epl, Object... parameter) {
EPOnDemandPreparedQueryParameterized queryParameterized = epServiceProvider.getEPRuntime().prepareQueryWithParameters(epl);
if (parameter != null) {
for (int i = 0, length = parameter.length; i < length; i++) {
queryParameterized.setObject(i + 1, parameter[i]);
}
}
EPOnDemandQueryResult result = epServiceProvider.getEPRuntime().executeQuery(queryParameterized);
return Stream.of(result.getArray()).collect(Collectors.toList());
}
}
6、
log.info("{msg:'3.4. Where-clauses'}");
epServiceProvider.getEPAdministrator().destroyAllStatements();
epServiceProvider.removeAllStatementStateListeners();
expression = "select * from OrderEvent.win:length(3) where price > 50 ";
epStatement = epServiceProvider.getEPAdministrator().createEPL(expression);
epStatement.addListener(Chapter3ProcessingModelFiltersAndWhereClauses::update);
orderEvent = new OrderEvent("1", 75.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("2", 35.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("3", 85.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
orderEvent = new OrderEvent("4", 5.50D);
epServiceProvider.getEPRuntime().sendEvent(orderEvent);
log.info("{list:{}}", EsperUtil.get(epStatement));
where - clauses 모니터 에 미 치 는 영향 을 보십시오. 출력 결과:
{newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"1","price":75.5}}
{newEvents:BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"3","price":85.5}}
{oldEvents:BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"1","price":75.5}}
{list:[BeanEventBean eventType=BeanEventType name=OrderEvent clazz=com.doctor.esper.tutorial.OrderEvent bean={"itemName":"3","price":85.5}]}
위의 출력 에 대해 문서 와 결 과 를 비교 해 보면 where - clauses 는 사건 의 유입 에 영향 을 주지 않 습 니 다. new Events 1, 2, 3 이 들 어간 후에 창 크기 가 3 제한 에 달 하기 때문에 oldEvents 는 1 입 니 다. 그래서 오래된 사건 을 촉발 하여 모니터 를 버 리 고 오래된 사건 을 삭 제 했 습 니 다. 그러나 new Events 사건 의 촉발 은 1, 3 입 니 다. 그들의 주문 가격 이 50 보다 많 기 때문에 새로운 일 을 촉발 할 수 있 습 니 다.도착 하 다.
그래서 구분 되 었 습 니까? 마지막 조회 도 where 조건 에 맞 는 것 만 찾 을 수 있 습 니 다 (sql 도 마찬가지 입 니 다)
/** * 3.4. Filters and Where-clauses * 시간 / 길이 창의 Filters 는 이벤트 가 이 데이터 구조 에 들 어 갈 수 있 는 지 에 영향 을 줍 니 다. * 그리고 모니터 에 간접 적 으로 영향 을 주 었 습 니 다. * where 조건 은 모니터 와 관계 가 있 고 where 조건 에 맞 아야 모니터 를 촉발 할 수 있 습 니 다. * 또한 select 결과 에 도 영향 을 미친다. * * 즉, Filters and Where - clauses 는 모두 모니터 와 select 결과 에 영향 을 미친다. * 데이터 창 에 들 어 갈 수 있 는 지 여부 가 다 릅 니 다.