elasticsearch 집합 함수로distinct 조회 실현
1. 등가의 sql
SELECT DISTINCT field1,field2 FROM test_index.test_type
등가SELECT field1,field2 FROM test_index.test_type GROUP BY field1,field2
2. Group by 쿼리는 es에서 Aggregation (집합) 으로 구현할 수 있습니다. 등가의 DSL 쿼리 문장은 다음과 같습니다.
POST /test_index/test_type/_search
{
"from": 0,
"size": 0,
"aggregations": {
"field1": {
"terms": {
"field": "field1",
"size": 2147483647
},
"aggregations": {
"field2": {
"terms": {
"field": "field2",
"size": 2147483647
}
}
}
}
}
}
3.java의 실현:
import com.google.common.collect.ArrayListMultimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Stack;
import java.util.stream.Collectors;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
/**
*
* @author zhongchenghui
*/
public class EsSearch {
private static final String CLUSTER_NAME = "test_cluster";
private static final String ES_ADDRESSES = "192.168.12.1,192.168.12.2,192.168.12.3";
private static final String INDEX_NAME = "test_index";
private static final Client ES_CLIENT = ESClientFactory.newInstance(CLUSTER_NAME, ES_ADDRESSES);
/**
* group by
*
* @param type
* @param groupColumnsNames
* @return
* @throws Exception
*/
public List
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author zhongchenghui
*/
public class ESClientFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(ESClientFactory.class);
private static final ConcurrentHashMap CLIENT_CACHE = new ConcurrentHashMap<>();
public static Client newInstance(String clusterName, String hostStr) {
Client client = CLIENT_CACHE.get(clusterName);
if (client == null) {
Map addressMap = ESClientFactory.getESAddressMap(hostStr);
Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build();
TransportClient newClient = TransportClient.builder().settings(settings).build();
addressMap.keySet().forEach((host) -> {
try {
newClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), addressMap.get(host)));
} catch (UnknownHostException ex) {
LOGGER.error("Init ES client failure,cluster name is:{},Error:{}", clusterName, ex);
}
});
client = newClient;
CLIENT_CACHE.put(clusterName, newClient);
}
return client;
}
private static Map getESAddressMap(String hostStr) {
Map hostMap = new HashMap<>();
String[] hosts = hostStr.split(",");
for (String host : hosts) {
String[] hostPort = host.trim().split(":");
Integer port = hostPort.length < 2 ? 9300 : Integer.valueOf(hostPort[1]);
hostMap.put(hostPort[0], port);
}
return hostMap;
}
}
4.문제점:
a. 구현된 방식은 층층이 아래로 집합되기 때문에es의document에field1의 필드가null일 때 이 조건은 아래로 집합되지 않습니다. 설령 이document의field2 필드가 값이 있더라도.(null 대신 지정한 문자를 사용해서 이 문제를 해결할 수 있습니다)
b. 데이터 양이 너무 많은 테이블에 적합하지 않습니다. 3의 코드는 최대 각 필드의 그룹 수가 Integer보다 크면 안 됩니다.MAX_VALUE
c. 되돌아오는 필드는 그룹by에 참여하는 필드만 가능합니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
kafka connect e elasticsearch를 관찰할 수 있습니다.No menu lateral do dashboard tem a opção de connectors onde ele mostra todos os clusters do kafka connect conectados atu...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.