Azure Databricks와 Elasticsearch의 협력
10672 단어 Azure스파크DatabricksElasticsearch
덧붙여서 Azure Databricks는 Azure상에서 움직이는 Spark의 PaaS 환경으로, 여러가지 처치를 실시하고 있기 때문에 보통의 Spark보다 고속이거나, 부속의 Notebook이 사용하기 쉽거나 하는, 편리한 서비스이다.
htps : // 오즈레. mic로소 ft. 코 m / 쟈 jp / 세 r
하는 것으로는 이쪽에 써 있는 것 그대로이지만, 일단 메모로서 기록해 둔다.
htps : // / cs. 오즈레타타비 cks. 네 t/s파 rk/ぁてst/だたーそうrせ s/에ぁs 치c 세아 rch. HTML
Azure Databricks의 환경 준비에 관해서는 이 빠른 시작을 보면서 실시. 매우 간단하기 때문에 즉시 완성한다.
ES-Hadoop 설정
ES-Hadoop 패키지는 여기 여기에서는 ZIP을 해동해 나온 elasticsearch-spark-xx_x.xx-x.x.x.jar의 파일을 사용한다.
Azure Databricks의 Workspace에서 User > Create > Library를 선택합니다.
New Library라는 화면이 열리므로 여기에 조금 다운로드한 ES-Hadoop의 JAR 파일을 업로드해 주면 준비 OK.
이것으로 준비가 되었으므로, ES-Hadoop을 사용하는 처리를 써 간다.
데이터 아래 준비 및 통신 확인
Scala 버전의 Databricks Notebook을 시작하여 코드를 작성합니다.
먼저 샘플 데이터를로드하는 스토리지를 마운트, 스토리지는 Blob 스토리지 계정과 컨테이너를 미리 준비해야합니다.
dbutils.fs.mount(
source = "wasbs://<Container Name>@<Storage Account>.blob.core.windows.net/"
,mount_point = "/mnt/hol"
,extra_configs = {"fs.azure.account.key.hthol.blob.core.windows.net": "<Storage Account Key>"})
만약을 위해 Elasticsearch의 인스턴스와의 소통 확인을 취한다. 아무것도 설정하지 않으면 Public IP에서의 통신이 필요하게 되지만, 가상 네트워크의 설정을 실시해 두면 Private IP에서의 통신도 가능하다. 여기서 %sh라고 쓰면 쉘을 실행할 수 있는 곳이 상당히 편리.
%sh
ping -c 2 <Elasticsearch IP>
덧붙여서 다음과 같이 curl 명령으로 HTTP 통신도 가능.
%sh
curl -XGET http://<Elasticsearch IP>:9200
이어서, 스토리지에 샘플 데이터 세트를 다운로드한다. 이것은 공개된 샘플 데이터처럼 무료로 다운로드할 수 있습니다.
%sh wget -O /tmp/akc_breed_info.csv https://query.data.world/s/msmjhcmdjslsvjzcaqmtreu52gkuno
Databcirks에서는 Databricks Filesystem이라는 파일 시스템을 사용한다. 파일 시스템의 명령은 %fs 매직 명령을 사용하여 호출할 수 있다.
%fs cp file:/tmp/akc_breed_info.csv dbfs:/mnt/hol/
Elasticsearch와의 협력
여기에서 Elasticsearch와의 제휴를 진행해 나간다.
마운트된 파일 시스템에서 다운로드한 CSV 파일을 읽고 Elasticsearch에 로드합니다. 이와 같이 DataFrame을 만들고 write로 작성하기 때문에 비교적 단순한 느낌.
val esURL = "<Elasticsearch IP>"
val df = spark.read.option("header","true").csv("/mnt/hol/akc_breed_info.csv")
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9200")
.option("es.net.ssl","false")
.option("es.nodes", esURL)
.mode("Overwrite")
.save("index/dogs")
데이터가 로드되었는지 확인하기 위해 아래의 명령으로 Elasticsearch에 쿼리를 던지는 것이 좋습니다.
%sh curl http://<Elasticsearch IP>:9200/index/dogs/_search?q=Breed:Collie
Spark를 사용하여 Elasticsearch에서 데이터를 읽는 경우 read를 사용합니다.
val reader = spark.read
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9200")
.option("es.net.ssl","false")
.option("es.nodes", esURL)
val df = reader.load("index/dogs").na.drop.orderBy($"breed")
display(df)
SQL을 사용해 액세스하는 경우는 아래와 같이 테이블을 작성해, 쿼리를 실행한다. SQL을 실행할 때는 %sql 매직 명령을 사용한다. 이런 전환을 할 수 있는 것은 매우 편리하다.
%sql
drop table if exists dogs;
create temporary table dogs
using org.elasticsearch.spark.sql
options('resource'='index/dogs',
'nodes'= '<Elasticsearch IP>',
'es.nodes.wan.only'='true',
'es.port'='9200',
'es.net.ssl'='false');
select weight_range as size, count(*) as number
from (
select case
when weight_low_lbs between 0 and 10 then 'toy'
when weight_low_lbs between 11 and 20 then 'small'
when weight_low_lbs between 21 and 40 then 'medium'
when weight_low_lbs between 41 and 80 then 'large'
else 'xlarge' end as weight_range
from dogs) d
group by weight_range
Reference
이 문제에 관하여(Azure Databricks와 Elasticsearch의 협력), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/whata/items/d40f644c2f38660cd51d텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)