Azure Databricks와 Elasticsearch의 협력

ES-Hadoop을 사용하여 Spark와 연계할 수 있지만 Databricks에서도 마찬가지로 할 수 있다는 것으로 Azure Databricks를 사용해 보았다.
덧붙여서 Azure Databricks는 Azure상에서 움직이는 Spark의 PaaS 환경으로, 여러가지 처치를 실시하고 있기 때문에 보통의 Spark보다 고속이거나, 부속의 Notebook이 사용하기 쉽거나 하는, 편리한 서비스이다.
htps : // 오즈레. mic로소 ft. 코 m / 쟈 jp / 세 r

하는 것으로는 이쪽에 써 있는 것 그대로이지만, 일단 메모로서 기록해 둔다.
htps : // / cs. 오즈레타타비 cks. 네 t/s파 rk/ぁてst/だたーそうrせ s/에ぁs 치c 세아 rch. HTML
Azure Databricks의 환경 준비에 관해서는 이 빠른 시작을 보면서 실시. 매우 간단하기 때문에 즉시 완성한다.

ES-Hadoop 설정



ES-Hadoop 패키지는 여기 여기에서는 ZIP을 해동해 나온 elasticsearch-spark-xx_x.xx-x.x.x.jar의 파일을 사용한다.

Azure Databricks의 Workspace에서 User > Create > Library를 선택합니다.


New Library라는 화면이 열리므로 여기에 조금 다운로드한 ES-Hadoop의 JAR 파일을 업로드해 주면 준비 OK.

이것으로 준비가 되었으므로, ES-Hadoop을 사용하는 처리를 써 간다.

데이터 아래 준비 및 통신 확인



Scala 버전의 Databricks Notebook을 시작하여 코드를 작성합니다.

먼저 샘플 데이터를로드하는 스토리지를 마운트, 스토리지는 Blob 스토리지 계정과 컨테이너를 미리 준비해야합니다.
dbutils.fs.mount(
  source = "wasbs://<Container Name>@<Storage Account>.blob.core.windows.net/"
 ,mount_point = "/mnt/hol"
 ,extra_configs = {"fs.azure.account.key.hthol.blob.core.windows.net": "<Storage Account Key>"})

만약을 위해 Elasticsearch의 인스턴스와의 소통 확인을 취한다. 아무것도 설정하지 않으면 Public IP에서의 통신이 필요하게 되지만, 가상 네트워크의 설정을 실시해 두면 Private IP에서의 통신도 가능하다. 여기서 %sh라고 쓰면 쉘을 실행할 수 있는 곳이 상당히 편리.
%sh 
ping -c 2 <Elasticsearch IP>

덧붙여서 다음과 같이 curl 명령으로 HTTP 통신도 가능.
%sh 
curl -XGET http://<Elasticsearch IP>:9200

이어서, 스토리지에 샘플 데이터 세트를 다운로드한다. 이것은 공개된 샘플 데이터처럼 무료로 다운로드할 수 있습니다.
%sh wget -O /tmp/akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno

Databcirks에서는 Databricks Filesystem이라는 파일 시스템을 사용한다. 파일 시스템의 명령은 %fs 매직 명령을 사용하여 호출할 수 있다.
%fs cp file:/tmp/akc_breed_info.csv dbfs:/mnt/hol/

Elasticsearch와의 협력



여기에서 Elasticsearch와의 제휴를 진행해 나간다.
마운트된 파일 시스템에서 다운로드한 CSV 파일을 읽고 Elasticsearch에 로드합니다. 이와 같이 DataFrame을 만들고 write로 작성하기 때문에 비교적 단순한 느낌.
val esURL = "<Elasticsearch IP>"
val df = spark.read.option("header","true").csv("/mnt/hol/akc_breed_info.csv")
df.write
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","9200")
  .option("es.net.ssl","false")
  .option("es.nodes", esURL)
  .mode("Overwrite")
  .save("index/dogs")

데이터가 로드되었는지 확인하기 위해 아래의 명령으로 Elasticsearch에 쿼리를 던지는 것이 좋습니다.
%sh curl http://<Elasticsearch IP>:9200/index/dogs/_search?q=Breed:Collie

Spark를 사용하여 Elasticsearch에서 데이터를 읽는 경우 read를 사용합니다.
val reader = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.nodes.wan.only","true")
  .option("es.port","9200")
  .option("es.net.ssl","false")
  .option("es.nodes", esURL)
val df = reader.load("index/dogs").na.drop.orderBy($"breed")
display(df)

SQL을 사용해 액세스하는 경우는 아래와 같이 테이블을 작성해, 쿼리를 실행한다. SQL을 실행할 때는 %sql 매직 명령을 사용한다. 이런 전환을 할 수 있는 것은 매우 편리하다.
%sql
drop table if exists dogs;
create temporary table dogs
using org.elasticsearch.spark.sql
options('resource'='index/dogs', 
  'nodes'= '<Elasticsearch IP>',
  'es.nodes.wan.only'='true',
  'es.port'='9200',
  'es.net.ssl'='false');

select weight_range as size, count(*) as number 
from (
  select case 
    when weight_low_lbs between 0 and 10 then 'toy'
    when weight_low_lbs between 11 and 20 then 'small'
    when weight_low_lbs between 21 and 40 then 'medium'
    when weight_low_lbs between 41 and 80 then 'large'
    else 'xlarge' end as weight_range
  from dogs) d
group by weight_range

좋은 웹페이지 즐겨찾기