[Spark] SQLite, PostgreSQL, 텍스트 파일 데이터소스 읽고 쓰는 방법
[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.
도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 스칼라(scala)로 했습니다.
앞으로 스파크와 관련된 내용은 딥러닝 부분을 제외하고 스칼라로 진행될 예정입니다.
기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0
안의 하위 폴더 data
를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0
후, ./bin/spark-shell
명령어를 실행하시면 됩니다.
이전 글 링크 : [Spark] CSV, JSON, Parquet, ORC 데이터소스 읽고 쓰는 방법
7. SQL 데이터베이스
- 사용자는 SQL을 지원하는 다양한 시스템에 SQL 데이터소스를 연결할 수 있다.
- 예를 들어, MySQL, PostgreSQL, Oracle, SQLite 데이터베이스에 접속할 수 있다.
- 데이터베이스는 원시 파일 형태가 아니므로 고려해야 할 옵션이 더 많다.
- 예를 들어 데이터베이스의 인증 정보나 접속과 관련된 옵션이 필요하다.
- 그리고 스파크 클러스터에서 데이터베이스 시스템에 접속 가능한지 네트워크 상태를 확인해야 한다.
예제에서는 SQLite를 활용하는데 이는 데이터베이스 접속 시 사용자 인증이 필요 없기 때문에 이용하기 쉽다.
하지만 SQLite는 쓰기 연산 시 전체 데이터베이스에 락을 설정하므로 분산 환경에 적합하지 않다.
- 데이터베이스의 데이터를 읽고 쓰기 위해서는 스파크 클래스패스(classpath)에 데이터베이스의 JDBC(Java DataBase Connectivity) 드라이버를 추가하고 적절한 JDBC 드라이버 jar 파일을 제공해야 한다.
- 다음은 sqlite-jdbc 데이터베이스에 데이터를 읽거나 쓰기 위한 예제이다.
$ ./bin/spark-shell \
--driver-class-path sqlite-jdbc-3.39.2.1.jar \
--jars sqlite-jdbc-3.39.2.1.jar
우분투 내 SQLite 설치
$ apt update
$ apt install sqlite3
jars sqlite-jdbc-3.39.2.1.jar
파일 링크 : sqlite-jdbc-3.39.2.1.jar - poeun 1
JDBC 옵션
- JDBC 옵션 정리한 링크 : [Spark] CSV, JSON, Parquet, SQL DB 등 데이터소스에 대한 옵션 정리
7.1 SQL 데이터베이스 일기
- SQL 데이터베이스도 다른 데이터소스처럼 포맷과 옵션을 지정한 후 데이터를 읽어 들인다.
val driver = "org.sqlite.JDBC"
val path = "./data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:${path}"
val tablename = "flight_info"
- 접속 관련 속성을 정의한 다음, 정삭적으로 데이터베이스에 접속되는지 테스트해 해당 연결이 유효한지 확인할 수 있다.
- SQLite는 로컬 머신에 존재하는 파일 형태이므로 접속 테스트가 무의미해질 수 있긴 하다.
import java.sql.DriverManager
val connection = DriverManager.getConnection(url)
connection.isClosed()
connection.close()
-
실행결과
- 접속에 성공하면 다음 예제를 진행할 수 있다.
- SQL 테이블을 읽어 DataFrame을 만들어 본다.
val dbDataFrame = spark.read.format("jdbc"
).option("url", url
).option("dbtable", tablename
).option("driver", driver
).load()
- 생성한 DataFrmae은 기존 예제에서 생성한 DataFrame과 전혀 다르지 않다.
dbDataFrame.select("DEST_COUNTRY_NAME"
).distinct().show(5)
-
실행결과
7.2 Postgresql로 실행하기
- 다음은 PostgreSql 데이터베이스를 이용해 위와 같은 과정을 똑같이 따라해보겠다.
- 먼저 다음과 같이 실행 해준다.
$ ./bin/spark-shell \
--driver-class-path postgresql-32.3.7.jar
--jars postgresql-32.3.7.jar
postgresql-32.3.7.jar
파일 링크 : postgresql-42.3.7.jar - poeun 2
- PostgreSQL 데이터베이스는 SQLite과는 달리 사용자 인증 정보 등 더 많은 설정이 필요하다.
val pgDF = spark.read.format("jdbc"
).option("driver", "org.postgresql.Driver"
).option("url", "jdbc:postgresql://database_server"
).option("dbtable", "schema.tablename"
).option("user", "username"
).option("password", "my-secret-password"
).load()
7.3 쿼리 푸시다운
- 스파크는 DataFrame을 만들기 전에 데이터베이스 자체에서 데이터를 필터링하도록 만들 수 있다.
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().explain()
-
실행결과
- 스파크는 특정 유형의 쿼리를 더 나은 방식으로 처리할 수 있다.
- 예를 들어 DataFrame에 필터를 명시하면 스파크는 해당 필터에 대한 처리를 데이터베이스로 위임(push down)한다.
- 실행 계획의
PsuhedFilters
부분에서 관련 내용을 확인할 수 있다.
- 실행 계획의
dbDataFrame.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain
-
실행결과
- 스파크는 모든 스파크 함수를 사용하는 SQL 데이터베이스에 맞게 변환하지는 못한다.
- 따라서 때로는 전체 쿼리를 데이터베이스에 전달해 결과를 DataFrame으로 받아야 하는 경우도 있다.
- 이때 처리방식은 테이블명 대신 SQL 쿼리를 명시하면 된다.
val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME)
FROM flight_info)
AS flight_info"""
val dbDataFrame = spark.read.format("jdbc"
).option("url", url
).option("dbtable", pushdownQuery
).option("driver", driver
).load()
- 이제 이 테이블에 쿼리할 때 실제로는
pushdownQuery
변수에 명시한 쿼리를 사용해 수행한다. - 스파크는 테이블의 실제 스키마와 관련된 정보를 알지 못하며 단지 쿼리의 결과에 대한 스키마만 알 수 있다.
dbDataFrame.explain()
-
실행결과
데이터베이스 병렬로 읽기
- 스파크는 파일 크기, 파일 유형, 압축 방식에 따른 ‘분할 가능성’에 따라 여러 파일을 읽어 하나의 파티션으로 만들거나 여러 파티션을 하나의 파일로 만드는 기본 알고리즘을 가지고 있다.
- 파일이 가진 이런 유연성은 SQL 데이터베이스에도 존재하지만 몇 가지 수동 설정이 필요하다.
numPartitions
옵션을 사용해 읽기 및 쓰기용 동시 작업 수를 제한할 수 있는 최대 파티션 수를 설정할 수 있다.
val dbDataFrame = spark.read.format("jdbc"
).option("url", url
).option("dbtable", tablename
).option("driver", driver
).option("numPartitions", 10
).load()
- 데이터베이스 연결을 통해 명시적으로 조건절을 SQL 데이터베이스에 위임할 수 있다.
- 이 최적화 방법은 조건절을 명시함으로써 특정 파티션에 특정 데이터의 물리적 위치를 제어할 수 있다.
- 예시로, 데이터소스 생성 시 조건절 목록을 정의해 스파크 자체 파티션에 결과 데이터를 저장할 수 있다.
spark.read.jdbc
메서드를 사용하면 전체 데이터를 가지고 오는 경우가 된다. 3
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array(
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"
)
spark.read.jdbc(url, tablename, predicates, props).show()
spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions
-
실행결과
슬라이딩 윈도우 기반의 파티셔닝
- 스파크는 데이터베이스에 병렬로 쿼리를 요청하며 numPartitions에 설정된 값만큼 파티션을 반환한다.
- 그리고 파티션에 값을 할당하기 위해 상한값과 하한값을 수정한다.
val colName = "count"
val lowerBound = 0L
val upperBound = 348113L
val numPartitions = 10
- 최젓값에서 최곳값까지 동일하게 분배한다.
spark.read.jdbc(url, tablename, colName, lowerBound, upperBound, numPartitions, props).count()
// 결과
// res8: Long = 255
7.4 SQL 데이터베이스 쓰기
- SQL 데이터베이스에 데이터를 쓰는 방법은 URI를 지정하고 지정한 쓰기 모드에 따라 데이터를 쓰면 된다.
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
new StructField("DEST_COUNTRY_NAME", StringType, true),
new StructField("ORIGIN_COUNTRY_NAME", StringType, true),
new StructField("count", LongType, false)
))
val csvFile = spark.read.format("csv"
).option("header", "true"
).option("mode", "FAILFAST"
).schema(myManualSchema
).load("./data/flight-data/csv/2010-summary.csv")
val newPath = "jdbc:sqlite:/tmp/my-sqlite.db"
csvFile.write.mode("overwrite").jdbc(newPath, tablename, props)
-
실행결과
- 새로운 테이블에 데이터를 쉽게 추가할 수 있다.
csvFile.write.mode("append").jdbc(newPath, tablename, props)
8. 텍스트 파일
- 스파크는 일반 텍스트 파일도 읽을 수 있다.
- 파일의 각 줄은 DataFrame의 레코드가 된다.
- 텍스트 파일은 기본 데이터 타입의 유연성을 활용할 수 있으므로 Dataset API에서 사용하기 매우 좋은 포맷이다.
8.1 텍스트 파일 읽기
- 텍스트 파일을 읽는 것은
textFile
메서드에 텍스트 파일을 지정하기만 하면 된다.
spark.read.textFile("./data/flight-data/csv/2010-summary.csv"
).selectExpr("split(value, ',') as rows").show(5)
-
실행결과
8.2 텍스트 파일 쓰기
- 텍스트 파일을 쓸 때는 문자열 컬럼이 하나만 존재해야 한다.
csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")
9. 고급 I/O 개념
- 쓰기 작업 전에 파티션 수를 조절함으로써 병렬로 처리할 파일 수를 제어할 수 있다.
- 또한 버켓팅(bucketing)과 파티셔닝(partitioning)을 조절함으로써 데이터의 저장 구조를 제어할 수 있다.
9.1 분할 가능한 파일 타입과 압축 방식
- 특정 파일 포맷은 기본적으로 분할을 지원한다.
- 따라서 스파크에서 전체 파일이 아닌 쿼리에 필요한 부분만 읽을 수 있으므로 성능 향상에 도움이 된다.
- 또한 HDFS 같은 시스템을 사용한다면 분할된 파일을 여러 블록으로 나누어 분산 저장하기 때문에 훨씬 더 최적화할 수 있다.
9.2 병렬로 데이터 읽기
- 여러 익스큐터가 같은 파일을 동시에 읽을 수 있는 없지만 여러 파일을 동시에 읽을 수는 있다.
- 다수의 파일이 존재하는 폴더를 읽을 때 폴더의 개별 파일은 DataFrame의 파티션이 된다.
- 따라서 사용 가능한 익스큐터를 이용해 병렬로 파일을 읽는다.
9.3 병렬로 데이터 쓰기
- 기본적으로 데이터 파티션당 하나의 파일이 작성된다.
- 따라서 옵션에 지정된 파일명은 실제로는 다수의 파일을 가진 디렉터리이다.
- 디렉터리 안에 파티션당 하나의 파일로 데이터를 저장한다.
- 다음 코드는 폴더 안에 5개의 파일을 생성한다.
csvFile.repartition(5).write.format("csv").save("/tmp/multiple.csv")
-
실행결과
파티셔닝
- 파티셔닝은 어떤 데이터를 어디에 저장할 것인지 제어할 수 있는 기능이다.
- 파티셔닝된 디렉터리 또는 테이블에 파일을 쓸 때 디렉터리별로 컬럼 데이터를 인코딩해 저장한다.
- 그러므로 데이터를 읽을 때 전체 데이터셋을 스캔하지 않고 필요한 컬럼의 데이터만 읽을 수 있다.
- 이 방식은 모든 파일 기반의 데이터소스에서 지원한다.
csvFile.limit(10).write.mode("overwrite"
).partitionBy("DEST_COUNTRY_NAME"
).save("/tmp/partitioned-files.parquet")
-
실행결과
버켓팅
- 버켓팅(bucketing)은 각 파일에 저장된 데이터를 제어할 수 있는 또 다른 파일 조직화 기법이다.
- 이 기법을 사용하면 동일한 버킷 ID를 가진 데이터가 하나의 물리적 파티션에 모두 모여 있기 때문에 데이터를 읽을 때 셔플을 피할 수 있다.
- 즉, 데이터가 이후의 사용 방식에 맞춰 사전에 파티셔닝되므로 조인이나 집계 시 발생하는 고비용의 셔플을 피할 수 있다.
- 다음은 버켓 단위로 데이터를 모아 일정 수의 파일로 저장하는 예제이다.
- 기본적으로
/user/hive/warehouse
디렉터리 하위에 버켓팅 파일을 기록한다.
- 기본적으로
val numberBuckets = 10
val columnToBucketBy = "count"
csvFile.write.format("parquet"
).mode("overwrite"
).bucketBy(numberBuckets, columnToBucketBy
).saveAsTable("bucketedFiles")
-
실행결과
댓글남기기