Idea 원격 ElasticSearch에 데이터 쓰기

7658 단어

의존 추가


<dependency>
    <groupId>io.searchboxgroupId>
    <artifactId>jestartifactId>
    <version>6.3.1version>
dependency>

<dependency>
    <groupId>net.java.dev.jnagroupId>
    <artifactId>jnaartifactId>
    <version>4.5.2version>
dependency>

<dependency>
    <groupId>org.codehaus.janinogroupId>
    <artifactId>commons-compilerartifactId>
    <version>2.7.8version>
dependency>

코드 예제

import com.atguigu.gmall.realtime.bean.AlertInfo
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, Index}
import org.apache.spark.rdd.RDD

/**
 * Author atguigu
 * Date 2020/6/3 13:58
 */
object ESUtil {
    
    val factory = new JestClientFactory
    // 1.1.1  es 
    val esUrl = "http://hadoop102:8300" // (9200)
    val config = new HttpClientConfig.Builder(esUrl)
        .maxTotalConnection(100) //  
        .connTimeout(10000) //  es 
        .readTimeout(10000) //  
        .multiThreaded(true)
        .build()
    factory.setHttpClientConfig(config)
    
    /**
     *  es 
     *
     * @param index
     * @param source
     * @param id
     */
    def insertSingle(index: String, source: Object, id: String = null): Unit = {
        val client: JestClient = factory.getObject
        val action = new Index.Builder(source)
            .index(index)
            .`type`("_doc")
            .id(id) //  null,  
            .build()
        client.execute(action)
        client.shutdownClient() //  
    }
    
    /**
     *  
     *
     * @param index
     * @param sources
     */
    def insertBulk(index: String, sources: Iterator[Object]) = {
        val client: JestClient = factory.getObject
        val builder = new Bulk.Builder()
            .defaultIndex(index)
            .defaultType("_doc")
        //  Bulk.Builder add Action,  es 
        // Object   (id, object)
        sources.foreach {
            case (id: String, data) =>
                val action = new Index.Builder(data)
                    .id(id)
                    .build()
                builder.addAction(action)
            case data =>
                val action = new Index.Builder(data)
                    .build()
                builder.addAction(action)
        }
        
        client.execute(builder.build())
        client.shutdownClient()
    }
}

좋은 웹페이지 즐겨찾기