[Spark] 스파크 SQL - 데이터베이스, 함수, 서브쿼리
[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.
도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 SQL과 스칼라(scala)로 했습니다.
기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0
안의 하위 폴더 data
를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0
후, ./bin/spark-sql
명령어를 실행하시면 됩니다.
이전 글 링크 : [Spark] 스파크 SQL - 실행방법, 테이블, 뷰
1. 데이터베이스
- 데이터베이스(database)는 여러 테이블을 조직화하기 위한 도구이다.
- 스파크에서 실행하는 모든 SQL 명령문은 사용 중인 데이터베이스 범위에서 실행된다.
그전에, 관련 데이터를
spark-3.3.0/spark-warehouse/data
에 있는지 확인해본다.
- 다음은 전체 데이터베이스 목록을 확인할 수 있다.
SHOW DATABASES;
-
실행결과
1.1 데이터베이스 설정하기
- CREATE DATABASE 키워드를 사용해 데이터베이스를 설정한다.
CREATE DATABASE some_db;
1.2 데이터베이스 설정하기
- USE 키워드를 사용해 쿼리 수행에 필요한 데이터베이스를 설정한다.
- 모든 쿼리는 테이블 이름을 찾을 때 앞서 지정한 데이터베이스를 참조한다.
USE some_db;
- 첫 번째 코드를 사용해 다른 데이터베이스의 테이블에 쿼리를 수행할 수 있다.
- 두 번째 코드를 사용해 현재 어떤 데이터베이스를 사용 중인지 확인할 수 있다.
- 세 번째 코드를 사용해 기본 데이터베이스로 돌아갈 수 있다.
SELECT * FROM default.flights;
SELECT current_database();
USE default;
1.3 데이터베이스 제거하기
- DROP DATABASE 키워드를 사용해 데이터베이스를 제거한다.
DROP DATABASE IF EXISTS some_db;
2. select 구문
- 스파크 쿼리는 다음과 같이 ANSI-SQL 요건을 충족한다.
SELECT [ALL|DISTINCT] name_expression[, named_expression, ...]
FROM relation [, relation, ...]
[lateral_view[, lateral_view, ...]]
[WHERE boolean_expression]
[aggregation [HAVING boolean_expression]]
[ORDER BY sort_expressions]
[CLUSTER BY expressions]
[DISTRIBUTE BY expressions]
[SORT BY sort_expressions]
[WINDOW named_window[, WINDOW named_window, ...]]
named_expressions:
: expression [AS alias]
relation:
| join_relation
| (table_name|query|relation) [sample] [AS alias]
: VALUES (expressions)[, (expressions), ...]
[AS (column_name[, colmn_name, ...])]
expressions:
: expression[, expression, ...]
sort_expressions:
: expression [ASC|DESC][, expression [ASC|DESC], ...]
2.1 case…when…then 구문
case...when...then...end
구문을 사용해 조건에 맞는 처리를 할 수 있다.- 다음 구문은 프로그래밍의 if 구문과 동일한 처리를 한다.
SELECT
CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
ELSE -1 END
FROM partitioned_flights;
3. 고급 주제
- SQL 쿼리는 특정 명령 집합을 실행하도록 요청하는 SQL 구문이다.
- SQL 구문은 조작(manipulation), 정의(definition), 제어(control)와 관련된 명령을 정의할 수 있다.
3.1 복합 데이터 타입
- 복합 데이터 타입은 표준 SQL과는 거리가 있으며 표준 SQL에는 존재하지 않는 매우 강력한 기능이다.
- 스파크 SQL에는 구조체(structs), 리스트(lists), 맵(maps) 3가지 핵심 복합 데이터 타입이 존재한다.
구조체
- 구조체는 맵에 더 가까우며 스파크에서 중첩 데이터를 생성하거나 쿼리하는 방법을 제공한다.
- 구조체를 만들기 위해서는 여러 컬럼이나 표현식을 괄호로 묶기만 하면 된다.
CREATE VIEW IF NOT EXISTS nested_data AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights;
- 점을 사용해 구조체의 개별 컬럼을 조회할 수 있다.
SELECT country.DEST_COUNTRY_NAME, count FROM nested_data;
- 구조체의 이름과 모든 하위 컬럼을 지정해 모든 값을 조회할 수 있다.
SELECT country.*, count FROM nested_data;
리스트
- 값의 리스트를 만드는 collect_list 함수나 중복 값이 없는 배열을 만드는 collect_set 함수를 사용할 수 있다.
- 두 함수는 모두 집계(aggregation) 함수이므로 집계 연산 시에만 사용할 수 있다.
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts, collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME;
- 컬럼에 직접 배열을 생성할 수도 있다.
SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights;
- 배열 쿼리 구문을 사용해 리스트의 특정 위치의 데이터를 쿼리할 수 있다.
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
FROM flights GROUP BY DEST_COUNTRY_NAME;
- explode 함수를 사용해 배열을 다시 여러 로우로 변환할 수 있다.
- 이 함수를 사용해 저장된 배열의 모든 값을 단일 로우 형태로 분해한다.
CREATE OR REPLACE TEMP VIEW flights_agg AS
SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
FROM flights GROUP BY DEST_COUNTRY_NAME;
SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg;
3.2 함수
- 스파크 SQL은 복합 데이터 타입 외에도 다양한 고급 함수를 제공한다.
- 스파크 SQL이 제공하는 전체 함수 목록을 확인하려면
SHOW FUNCTIONS
구문을 사용한다.
SHOW FUNCTIONS;
사용자 정의 함수
- 스파크는 사용자 정의 함수를 정의하고 분산 환경에서 사용할 수 있는 기능을 제공한다.
- 특정 언어를 사용해 함수를 개발한 다음, 등록하여 함수를 정의할 수 있다.
- 스칼라 사용자 함수 정의 :
def [func id]([param id]: [type], …): [type] = { [function] }
1
- 스칼라 사용자 함수 정의 :
def power3(number:Double):Double = number * number * number
spark.udf.register("power3", power3(_:Double):Double)
SELECT count, power3(count) FROM flights;
3.3 서브쿼리
- 서브쿼리(subquery)를 사용하면 쿼리 안에 쿼리를 지정할 수 있다.
- 스파크는 2가지 기본 서브쿼리가 있다.
- 상호 연관 서브쿼리(correlated subquery)는 서브쿼리의 정보를 보완하기 위해 쿼리의 외부 범위에 있는 외부 정보를 사용할 수 있다.
- 조건절 서브쿼리(predicate subquery)는 값에 따라 필터링할 수 있게 해준다.
비상호연관 조건절 서브쿼리
- 첫 번째 비상호연관 쿼리는 데이터 중 상위 5개의 목적지 국가 정보를 조회한다.
SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5;
-
실행결과
- 이제 서브쿼리를 필터 내부에 배치하고 이전 예제의 결과에 출발지 국가 정보가 존재하는지 확인할 수 있다.
- 이 쿼리는 쿼리의 외부 범위에 있는 어떤 관련 정보도 사용하고 있지 않으므로 비상호연관 관계이다.
SELECT * FROM flights
WHERE origin_country_name IN (SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5);
상호연관 조건절 서브쿼리
- 상호연관 조건절 서브쿼리는 내부 쿼리에서 외부 범위에 있는 정보를 사용할 수 있다.
- ex. 목적지 국가에서 되돌아올 수 있는 항공편이 있는지 알고 싶다면 목적지 국가를 출발지 국가로, 출발지 국가를 목적지 국가로 하여 항공편이 있는지 확인한다.
SELECT * FROM flights f1
WHERE EXISTS (SELECT 1 FROM flights f2
WHERE f1.dest_country_name = f2.origin_country_name)
AND EXISTS (SELECT 1 FROM flights f2
WHERE f2.dest_country_name = f1.origin_country_name);
비상호연관 스칼라 쿼리
- 비상호연관 스칼라 쿼리(uncorrelated scalar query)를 사용하면 기존에 없던 일부가 정보를 가져올 수 있다.
SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights;
댓글남기기