목숨건데이터엔지니어
foot-data-engineering
목숨건데이터엔지니어
전체 방문자
오늘
어제
  • 분류 전체보기 (54)
    • 취업과 기본기 잡기 (11)
      • Python (2)
      • 자료구조 (1)
      • 알고리즘 (1)
      • 운영체제 (2)
      • 네크워크 (1)
      • 데이터베이스 (1)
      • SQL (1)
      • Linux (1)
      • Docker (1)
    • 더 나은 엔지니어가 되기위해 (11)
      • (AWS) api-gateway, kinesis (1)
      • (AWS) MWAA (1)
      • Apache Spark (1)
      • Apache Airflow (1)
      • Apache Kafka (1)
      • Apache Flink (1)
      • Hadoop (1)
      • 알쓸신잡 (4)
    • 데이터와 손잡기 (6)
      • 머신러닝(sklearn) (1)
      • 딥러닝 (3)
      • 오디오 처리 (1)
      • 데이터 시각화 (1)
    • 코딩테스트 잡기 (1)
      • [BOJ] 문제풀이 (0)
      • [프로그래머스] 문제풀이 (0)
      • [HackerRank] SQL (0)
      • 코딩테스트 후기 (1)
    • 토이 프로젝트 (4)
    • 각종 기술 삽질하며 익히기 - onpremise (11)
    • 학습과정 잡동사니 (7)
      • 오답노트 (7)
    • 평소 궁금증 파헤치기 (3)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

  • 아직 한참 부족하고 부끄러운 나의 github 링크

인기 글

태그

  • 멀티 프로그래밍
  • CTAS
  • spark to mysql
  • kafka
  • django
  • LIVY
  • 프로세스
  • Kinesis
  • 쓰레드와 프로세스
  • Tableau
  • flink
  • 도커파일
  • 네트워크 스위치
  • aws builders online series
  • Elk
  • spark
  • scikit-learn
  • logstash
  • 파이썬
  • EMR
  • filebeat
  • #데이터수집
  • 서비쿼리
  • jupyter
  • MWAA
  • Api-Gateway
  • #데이터 수집 절차
  • spark with elasticsearch
  • ES
  • spark on centos7

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
목숨건데이터엔지니어

foot-data-engineering

스파크랑 ELASTICSEARCH 연동하기
각종 기술 삽질하며 익히기 - onpremise

스파크랑 ELASTICSEARCH 연동하기

2022. 2. 22. 00:37

둘을 연동할 때 필요한 모듈이 있다.. commons-httpclient 모듈인데 2010년대 초에 지원 종료가 되어서 인터넷에서 따로 다운받아야 한다.

 

일단 라이브러리를 넣어 둘 디렉토리를 만들어준다

cd ~

 

mkdir library

 

이제 모듈을 다운받아오자

다운로드 페이지에서 다운 경로를 확인한다

(다운로드 페이지 : http://archive.apache.org/dist/httpcomponents/commons-httpclient/3.0/binary/ )

 

wget으로 파일을 받아온다

wget http://archive.apache.org/dist/httpcomponents/commons-httpclient/3.0/binary/commons-httpclient-3.0.1.tar.gz

 

Tar 명령어로 압축을 해제한다

tar -xvf commons-httpclient-3.0.1.tar.gz

 

압축 해제한 파일로 들어간다

cd commons-httpclient-3.0.1

 

Ls를 해보면 우리한테 필요한 jar 파일을 확인할 수 있다.

이 파일을 아까 만들었던 library 디렉토리로 복사하자

cp commons-httpclient-3.0.1.jar /root/library

 

해당 폴더로 이동해서 제대로 잘 들어갔는지 확인해보자

cd ~/library

 

Ls를 쳐보면 제대로 들어간걸 확인할 수 있다

스파크를 켤 때 이 위치를 알려주면 이제 문제없이 연동된다.

 

스파크를 켜기전에 일단 elasticsearch를 켜준다

systemctl start elasticsearch

 

(나는 혹시 몰라서 kibana도 킴)

systemctl start kibana

 

이제 스파크를 켠다

(packages 는 우리가 처음에 빌드할 때 입력한 옵션으로 스파크가 갖게 된 es연동 패키지를 사용하겠다고 알려주는 옵션이고 jars는 외부의 jar 파일을 쓸 때 위치를 알려주는 옵션이다)

spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar

예제

이제 제대로 연동되었는지 확인하기위해 아래 예제를 진행해보자

(예제 출처: http://jason-heo.github.io/elasticsearch/2016/06/28/elasticsearch-with-spark.html )

 

엘라스틱서치에서 제공하는 예제 파일을 이용해서 실행해보자

준비

루트 홈으로 이동한다

cd ~

 

예제 용 디렉토리를 생성한다(있는경우 안만들어도 됨)

mkdir elasticTest

 

디렉토리로 이동한다

cd elasticTest

 

wget 예제 파일을 다운로드 받는다

wget https://download.elastic.co/demos/kibana/gettingstarted/accounts.zip

 

Unzip 명령어로 압축을 해제한다

unzip accounts.zip

 

예제용 elasticsearch 인덱스를 생성한다

curl -XPOST \
    -H 'Content-Type: application/x-ndjson' \
    'http://localhost:9200/bank/account/_bulk?pretty' \
    --data-binary "@accounts.json"

 

잘 생성되었는지 확인한다

curl -XGET 'http://localhost:9200/bank/account/_search?pretty&size=0'

 

스파크에서 elasticsearch 데이터 읽어오기

이제 스파크를 켠다

spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar

 

Elasticsearch 연결 객체를 만든다

val esConf = Map(
    "es.nodes" -> "localhost:9200"
)

 

Elasticsearch로 data를 로딩하여 DataFrame으로 불러온다

val df = spark.
    read.
    format("org.elasticsearch.spark.sql").
    options(esConf).
    load("bank/account")

 

 

스키마를 출력한다

df.printSchema

 

쿼리 작성

데이터 프레임에 바로 sql을 쓸 수 없어서 데이터프레임을 이용해서 sql을 쓸 수 있는 객체를 따로 만든다(이경우 logs)

df.createOrReplaceTempView("logs")

 

sql(쿼리문) 함수를 사용하여 쿼리를 사용해보자

show를 써야 결과가 보여진다

spark.sql("SELECT COUNT(*) FROM logs").show()

 

 

정확하게 1000이 나오면 잘 수행된 것이다

 

이번엔 특정 열만 불러와보자

spark.sql("SELECT firstname, lastname, email FROM logs").show()

 

(중략)

위에서 볼수 있다 시피 20개만 출력된다

Show 함수 안에 인자를 넣어주면 원하는 개수만 출력할수도 있다

spark.sql("SELECT firstname, lastname, email FROM logs").show(5)

 

위의 이메일을 보면 알수 있다시피 긴 문자열은 축약해서 보여준다.

전부 보고 싶다면 아래와 같이 show에 옵션을 준다

spark.sql("SELECT firstname, lastname, email FROM logs").show(5, false)

 

Elasticsearch 문서를 parquet로 저장하기

 

(참고: Parguet(파케이)란? 하둡 생태계에서 사용하는 데이터 저장포맷으로 처리 속도가 빠르다

자세한 내용 : https://engineering.vcnc.co.kr/2018/05/parquet-and-spark/ )

 

df.write.parquet("file:///root/elasticTest/parquet/")

스칼라를 나간다

:q

 

elasticTest 경로로 간다

cd ~/elasticTest

 

Ls를 눌러 설정한 디렉토리가 생성되었는지 확인한다

Parquet 디렉토리로 이동한다

cd parquet

 

Ls를 눌러 파일이 생성된걸 확인한다

spark 기능 이용하기(1) – 계산

 

엘라스틱서치와 스파크를 연동하는 이유는 엘라스틱서치에서는 사용할 수 없는 기능을 사용할 수 있기 때문이다(정확한 계산, 데이터프레임간 Join 기능, 머신러닝 등)

이 기능을 사용해보자

 

다시 스파크에 접속한다

spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar

 

Sql 객체를 이용할 것이기 때문에 elasticsearch 연결 객체부터, 데이터프레임, sql 객체까지 만든다.

 

Elasticsearch 연결 객체를 만든다

val esConf = Map(
    "es.nodes" -> "localhost:9200"
)

 

Elasticsearch로 data를 로딩하여 DataFrame으로 불러온다

val df = spark.
    read.
    format("org.elasticsearch.spark.sql").
    options(esConf).
    load("bank/account")

 

데이터 프레임에 바로 sql을 쓸 수 없어서 데이터프레임을 이용해서 sql을 쓸 수 있는 객체를 따로 만든다(이경우 logs)

df.createOrReplaceTempView("logs")

 

City의 중복을 제외하고 count를 한다

spark.sql("SELECT COUNT(DISTINCT city) FROM logs").show()

스파크 기능 이용하기(2) – Join

join기능을 이용해보자

만약 스파크에 접속해있다면 종료해주자(:q)

 

예제 인덱스와 연결할 csv 파일을 만들어야 한다

아까 만들었던 예제디렉토리로 이동한다

cd ~/elasticTest

 

Vi를 켜고 아래 내용을 작성한다

vi name.csv

 

firstname,lastname
Effie,Gates
Kari,Skinner

:wq를 눌러 저장하고 나간다

 

스파크를 켠다

spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar

 

Elasticsearch 연결 객체를 만든다

val esConf = Map(
    "es.nodes" -> "localhost:9200"
)

 

Elasticsearch로 data를 로딩하여 DataFrame으로 불러온다

val df = spark.
    read.
    format("org.elasticsearch.spark.sql").
    options(esConf).
    load("bank/account")

 

sql 객체를 생성한다

df.createOrReplaceTempView("es_tab")

 

Csv 파일을 불러와서 DataFrame으로 만든다

val csv_df = spark.read.format("csv").
           option("header", "true").
           option("inferSchema", "true").
           load("file:///root/elasticTest/name.csv")

 

Sql 객체를 생성한다

csv_df.createOrReplaceTempView("csv_tab")

 

생성한 csv 객체를 출력해보자

spark.sql("select * from csv_tab").show()

Csv 와 elasticsearch의 DataFrame을 join해서 조회해보자

spark.sql("SELECT t1.firstname, t1.lastname FROM es_tab AS t1 JOIN csv_tab AS t2 ON t1.firstname = t2.firstname").show()

 

 

Spark의 DataFrame을 elasticsearch에 저장하기

 

일단 스파크가 꺼져있다면 다시 켠다

spark-shell --master=local[1] --packages="org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0" --jars /root/library/commons-httpclient-3.0.1.jar

 

다시 킨 경우 연결 객체와 df까지 생성한다

Elasticsearch 연결 객체를 만든다

val esConf = Map(
    "es.nodes" -> "localhost:9200"
)

 

Elasticsearch로 data를 로딩하여 DataFrame으로 불러온다

val df = spark.
    read.
    format("org.elasticsearch.spark.sql").
    options(esConf).
    load("bank/account")

 

작업에 필요한 모듈을 import 한다

import org.elasticsearch.spark.sql._

 

df(elasticsearch 데이터 프레임)의 10개만 잘라서 새로운 인덱스로 엘라스틱 서치에 넣어보자

df.limit(10).saveToEs("migrated/account", esConf)

 

넣은 것을 확인해보기 위해 새로 만든 migrated 인덱스의 연결객체를 만든다

val df2 = spark.read.format("org.elasticsearch.spark.sql").load("migrated/account")

 

이제 잘 불러와졌는지 확인하자

아까 잘라서 넣었던 10개가 확인되면 잘 된것이다

 

[참고] kibana의 index management에서도 잘 생성된것을 확인할 수 있다

[참고2]

더 자세한 내용을 알고싶다면 공식 홈페이지를 참조하자(영어)

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

 

 

 

 

 

 

 

 

 

 

'각종 기술 삽질하며 익히기 - onpremise' 카테고리의 다른 글

파일 > 카프카 > 스파크 > 엘라스틱 서치로 데이터 옮겨보기  (0) 2022.02.22
주키퍼 & 카프카 설치하고 써보기  (0) 2022.02.22
Spark 설치하고 사용해보기  (0) 2022.02.22
하둡 설치 및 예제 실행해보기  (0) 2022.02.22
OPEN-JDK 1.8 설치 / ELK 설치 on centos7  (0) 2022.02.21
    '각종 기술 삽질하며 익히기 - onpremise' 카테고리의 다른 글
    • 파일 > 카프카 > 스파크 > 엘라스틱 서치로 데이터 옮겨보기
    • 주키퍼 & 카프카 설치하고 써보기
    • Spark 설치하고 사용해보기
    • 하둡 설치 및 예제 실행해보기
    목숨건데이터엔지니어
    목숨건데이터엔지니어

    티스토리툴바