둘을 연동할 때 필요한 모듈이 있다.. commons-httpclient 모듈인데 2010년대 초에 지원 종료가 되어서 인터넷에서 따로 다운받아야 한다.
일단 라이브러리를 넣어 둘 디렉토리를 만들어준다
cd ~ |
mkdir library |
이제 모듈을 다운받아오자
다운로드 페이지에서 다운 경로를 확인한다
(다운로드 페이지 : http://archive.apache.org/dist/httpcomponents/commons-httpclient/3.0/binary/ )
wget으로 파일을 받아온다
wget http://archive.apache.org/dist/httpcomponents/commons-httpclient/3.0/binary/commons-httpclient-3.0.1.tar.gz |
Tar 명령어로 압축을 해제한다
tar -xvf commons-httpclient-3.0.1.tar.gz |
압축 해제한 파일로 들어간다
cd commons-httpclient-3.0.1 |
Ls를 해보면 우리한테 필요한 jar 파일을 확인할 수 있다.
이 파일을 아까 만들었던 library 디렉토리로 복사하자
cp commons-httpclient-3.0.1.jar /root/library |
해당 폴더로 이동해서 제대로 잘 들어갔는지 확인해보자
cd ~/library |
Ls를 쳐보면 제대로 들어간걸 확인할 수 있다
스파크를 켤 때 이 위치를 알려주면 이제 문제없이 연동된다.
스파크를 켜기전에 일단 elasticsearch를 켜준다
systemctl start elasticsearch |
(나는 혹시 몰라서 kibana도 킴)
systemctl start kibana |
이제 스파크를 켠다
(packages 는 우리가 처음에 빌드할 때 입력한 옵션으로 스파크가 갖게 된 es연동 패키지를 사용하겠다고 알려주는 옵션이고 jars는 외부의 jar 파일을 쓸 때 위치를 알려주는 옵션이다)
spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar |
예제
이제 제대로 연동되었는지 확인하기위해 아래 예제를 진행해보자
(예제 출처: http://jason-heo.github.io/elasticsearch/2016/06/28/elasticsearch-with-spark.html )
엘라스틱서치에서 제공하는 예제 파일을 이용해서 실행해보자
준비
루트 홈으로 이동한다
cd ~ |
예제 용 디렉토리를 생성한다(있는경우 안만들어도 됨)
mkdir elasticTest |
디렉토리로 이동한다
cd elasticTest |
wget 예제 파일을 다운로드 받는다
wget https://download.elastic.co/demos/kibana/gettingstarted/accounts.zip |
Unzip 명령어로 압축을 해제한다
unzip accounts.zip |
예제용 elasticsearch 인덱스를 생성한다
curl -XPOST \ -H 'Content-Type: application/x-ndjson' \ 'http://localhost:9200/bank/account/_bulk?pretty' \ --data-binary "@accounts.json" |
잘 생성되었는지 확인한다
curl -XGET 'http://localhost:9200/bank/account/_search?pretty&size=0' |
스파크에서 elasticsearch 데이터 읽어오기
이제 스파크를 켠다
spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar |
Elasticsearch 연결 객체를 만든다
val esConf = Map( "es.nodes" -> "localhost:9200" ) |
Elasticsearch로 data를 로딩하여 DataFrame으로 불러온다
val df = spark. read. format("org.elasticsearch.spark.sql"). options(esConf). load("bank/account") |
스키마를 출력한다
df.printSchema |
쿼리 작성
데이터 프레임에 바로 sql을 쓸 수 없어서 데이터프레임을 이용해서 sql을 쓸 수 있는 객체를 따로 만든다(이경우 logs)
df.createOrReplaceTempView("logs") |
sql(쿼리문) 함수를 사용하여 쿼리를 사용해보자
show를 써야 결과가 보여진다
spark.sql("SELECT COUNT(*) FROM logs").show() |
정확하게 1000이 나오면 잘 수행된 것이다
이번엔 특정 열만 불러와보자
spark.sql("SELECT firstname, lastname, email FROM logs").show() |
(중략)
위에서 볼수 있다 시피 20개만 출력된다
Show 함수 안에 인자를 넣어주면 원하는 개수만 출력할수도 있다
spark.sql("SELECT firstname, lastname, email FROM logs").show(5) |
위의 이메일을 보면 알수 있다시피 긴 문자열은 축약해서 보여준다.
전부 보고 싶다면 아래와 같이 show에 옵션을 준다
spark.sql("SELECT firstname, lastname, email FROM logs").show(5, false) |
Elasticsearch 문서를 parquet로 저장하기
(참고: Parguet(파케이)란? 하둡 생태계에서 사용하는 데이터 저장포맷으로 처리 속도가 빠르다
자세한 내용 : https://engineering.vcnc.co.kr/2018/05/parquet-and-spark/ )
df.write.parquet("file:///root/elasticTest/parquet/") |
스칼라를 나간다
:q |
elasticTest 경로로 간다
cd ~/elasticTest |
Ls를 눌러 설정한 디렉토리가 생성되었는지 확인한다
Parquet 디렉토리로 이동한다
cd parquet |
Ls를 눌러 파일이 생성된걸 확인한다
spark 기능 이용하기(1) – 계산
엘라스틱서치와 스파크를 연동하는 이유는 엘라스틱서치에서는 사용할 수 없는 기능을 사용할 수 있기 때문이다(정확한 계산, 데이터프레임간 Join 기능, 머신러닝 등)
이 기능을 사용해보자
다시 스파크에 접속한다
spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar |
Sql 객체를 이용할 것이기 때문에 elasticsearch 연결 객체부터, 데이터프레임, sql 객체까지 만든다.
Elasticsearch 연결 객체를 만든다
val esConf = Map( "es.nodes" -> "localhost:9200" ) |
Elasticsearch로 data를 로딩하여 DataFrame으로 불러온다
val df = spark. read. format("org.elasticsearch.spark.sql"). options(esConf). load("bank/account") |
데이터 프레임에 바로 sql을 쓸 수 없어서 데이터프레임을 이용해서 sql을 쓸 수 있는 객체를 따로 만든다(이경우 logs)
df.createOrReplaceTempView("logs") |
City의 중복을 제외하고 count를 한다
spark.sql("SELECT COUNT(DISTINCT city) FROM logs").show() |
스파크 기능 이용하기(2) – Join
join기능을 이용해보자
만약 스파크에 접속해있다면 종료해주자(:q)
예제 인덱스와 연결할 csv 파일을 만들어야 한다
아까 만들었던 예제디렉토리로 이동한다
cd ~/elasticTest |
Vi를 켜고 아래 내용을 작성한다
vi name.csv |
firstname,lastname Effie,Gates Kari,Skinner |
:wq를 눌러 저장하고 나간다
스파크를 켠다
spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar |
Elasticsearch 연결 객체를 만든다
val esConf = Map( "es.nodes" -> "localhost:9200" ) |
Elasticsearch로 data를 로딩하여 DataFrame으로 불러온다
val df = spark. read. format("org.elasticsearch.spark.sql"). options(esConf). load("bank/account") |
sql 객체를 생성한다
df.createOrReplaceTempView("es_tab") |
Csv 파일을 불러와서 DataFrame으로 만든다
val csv_df = spark.read.format("csv"). option("header", "true"). option("inferSchema", "true"). load("file:///root/elasticTest/name.csv") |
Sql 객체를 생성한다
csv_df.createOrReplaceTempView("csv_tab") |
생성한 csv 객체를 출력해보자
spark.sql("select * from csv_tab").show() |
Csv 와 elasticsearch의 DataFrame을 join해서 조회해보자
spark.sql("SELECT t1.firstname, t1.lastname FROM es_tab AS t1 JOIN csv_tab AS t2 ON t1.firstname = t2.firstname").show() |
Spark의 DataFrame을 elasticsearch에 저장하기
일단 스파크가 꺼져있다면 다시 켠다
spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar |
다시 킨 경우 연결 객체와 df까지 생성한다
Elasticsearch 연결 객체를 만든다
val esConf = Map( "es.nodes" -> "localhost:9200" ) |
Elasticsearch로 data를 로딩하여 DataFrame으로 불러온다
val df = spark. read. format("org.elasticsearch.spark.sql"). options(esConf). load("bank/account") |
작업에 필요한 모듈을 import 한다
import org.elasticsearch.spark.sql._ |
df(elasticsearch 데이터 프레임)의 10개만 잘라서 새로운 인덱스로 엘라스틱 서치에 넣어보자
df.limit(10).saveToEs("migrated/account", esConf) |
넣은 것을 확인해보기 위해 새로 만든 migrated 인덱스의 연결객체를 만든다
val df2 = spark.read.format("org.elasticsearch.spark.sql").load("migrated/account") |
이제 잘 불러와졌는지 확인하자
아까 잘라서 넣었던 10개가 확인되면 잘 된것이다
[참고] kibana의 index management에서도 잘 생성된것을 확인할 수 있다
[참고2]
더 자세한 내용을 알고싶다면 공식 홈페이지를 참조하자(영어)
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
'각종 기술 삽질하며 익히기 - onpremise' 카테고리의 다른 글
파일 > 카프카 > 스파크 > 엘라스틱 서치로 데이터 옮겨보기 (0) | 2022.02.22 |
---|---|
주키퍼 & 카프카 설치하고 써보기 (0) | 2022.02.22 |
Spark 설치하고 사용해보기 (0) | 2022.02.22 |
하둡 설치 및 예제 실행해보기 (0) | 2022.02.22 |
OPEN-JDK 1.8 설치 / ELK 설치 on centos7 (0) | 2022.02.21 |