Kafka의 데이터를 Structured Streaming으로 처리하여 Elasticsearch로 전송

이번에는 Apache Kafka로부터의 스트림 데이터를 취득해 Spark의 Structured Streaming으로 처리해 Elasticsearch에 흘리는 곳을 시험해 본다.
ES-Hadoop을 Structured Streaming과 함께 사용하는 방법은 여기 문서에 쓰여있다.

Spark를 사용하려고 했을 때, 스스로 환경을 준비하는 것이 힘들고, 만든 후에 필요한 기능을 사용하기 위한 설정을 하거나 관리하는 것이 힘들기 때문에, 개인적으로는 Azure Databricks를 사용하는 것이 지금까지 제일 편하다고 생각한다.
이 그림을 보면 여러가지 할 것 같은 느낌을 알 수 있다. 게다가 하는 일이라고 하면 클러스터를 시작해 Notebook을 열어 코드나 쿼리를 쓸 정도이기 때문에 매우 편하다.

(htps : // / cs. mic로소 ft. 코 m / 자 - jp / 아즈레 / 아즈레 - 타타 브리 cks부터)

마지막 기사의 방법에 따라 ES-Hadoop 라이브러리를로드하고 Kafka 및 Elasticsearch 환경이 작성되었다고 가정합니다.

Kafka에 대한 연결 설정



kafka에 대한 연결 정의는 다음과 같이 수행됩니다.
val kafkaDF = (
  spark
    .readStream
    .option("kafka.bootstrap.servers", "YOUR.HOST:PORT1,YOUR.HOST:PORT2")
    .option("subscribe", "YOUR_TOPIC1,YOUR_TOPIC2")
    .option("startingOffsets", "latest")
    .format("kafka")
    .load()
)

이것으로 연결 준비 완료. 데이터는 다음과 같이 보통으로 확인할 수 있다.
display(kafkaDF)

스키마 정의



명시적으로 스키마를 정의한다. 아래는 소지의 샘플 데이터를 기초로 실행하고 있지만, 데이터에 맞추어 변경이 필요한 부분.
import org.apache.spark.sql.types._

val schema = new StructType()
  .add("orderID", IntegerType)
  .add("productID", IntegerType)
  .add("orderTimestamp", TimestampType)
  .add("orderQty", IntegerType)

Kafka 데이터를 DataFrame으로 가져오기



방금 정의한 스키마를 적용하여 DataFrame으로 읽습니다.
import org.apache.spark.sql.functions._

val df = kafkaDF.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, java.sql.Timestamp)]
        .select(from_json($"value", schema).as("order"), $"timestamp")
        .select("order.*", "timestamp")

파티션 설정



shuffle 후의 파티션을 설정하려면 다음과 같이 실행한다. 다음은 8로 설정한 예.
spark.conf.set("spark.sql.shuffle.partitions", 8)

데이터 집계



집계 데이터의 스트림을 쓸 때에는 time window의 정의가 필요하게 된다. 여기서 설정하지 않으면 나중에 writeStream을 할 수 없다.
import org.apache.spark.sql.functions._

val aggregatedDF = df.withWatermark("timestamp", "1 minutes").groupBy($"productID", window($"timestamp", "1 minutes")).count()

체크포인트 디렉토리 만들기



오프셋 및 커밋 로그를 쓰는 디렉토리를 만듭니다.
%fs mkdirs /tmp/es

Elasticsearch에 데이터 쓰기



writeStream은 여기서 흐리게 하고, 다음의 코드를 실행해 간다.
df.writeStream
      .option("es.nodes.wan.only","true")
      .option("es.net.ssl","false")
      .option("es.nodes", "<Your Elasticsearch>")
      .option("checkpointLocation", "/tmp/es")
      .option("es.port", "<Port>")
      .format("es")
      .start("orders/log")

작성된 인덱스 확인



Elasticsearch에 기록된 데이터를 curl 명령으로 확인합니다. 여기 Elasticsearch의 search API를 사용하고 있습니다.
%sh curl http://<Your Elasticsearch>:<Port>/orders/log/_search?q=productID:869

Elasticsearch에 Spark에서 연결하여 데이터 검색



스트림 데이터에서도 특별히 변하지 않고 취급할 수 있다.
val reader = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port", "<Port>")
  .option("es.net.ssl","false")
  .option("es.nodes", "<Your Elasticsearch>")

val SQLdf = reader.load("orders/log")
display(SQLdf)

SQL로 액세스



Elasticsearch 문서에 대한 테이블을 만들고 쿼리를 실행합니다.
%sql
DROP TABLE IF EXISTS dcmotor;

CREATE TEMPORARY TABLE orders
USING org.elasticsearch.spark.sql
OPTIONS('resource'='orders/log', 
  'nodes'= '<Your Elasticsearch>',
  'es.nodes.wan.only'='true',
  'es.port'='<Port>',
  'es.net.ssl'='false');

쿼리 실행
%sql SELECT ProductID, SUM(orderQty) AS sum FROM orders GROUP BY ProductID ORDER BY sum DESC LIMIT 10;

이것으로 Elasticsearch에는 스트림 처리된 데이터가 속속 투입되어 간다. Timestamp 데이터를 사용하여 Kibana에서 시계열 데이터의 시각화/모니터링이 가능해진다.

좋은 웹페이지 즐겨찾기