3 분 소요

[Spark The Definitive Guide - BIG DATA PROCESSING MADE SIMPLE] 책을 중심으로 스파크를 개인 공부를 위해 요약 및 정리해보았습니다.
다소 복잡한 설치 과정은 도커에 미리 이미지를 업로해 놓았습니다. 즉, 도커 이미지를 pull하면 바로 스파크를 사용할 수 있습니다.

도커 설치 및 활용하는 방법 : [Spark] 빅데이터와 아파치 스파크란 - 1.2 스파크 실행하기
도커 이미지 링크 : https://hub.docker.com/r/ingu627/hadoop
예제를 위한 데이터 링크 : FVBros/Spark-The-Definitive-Guide
예제에 대한 실행 언어는 SQL과 스칼라(scala)로 했습니다.

기본 실행 방법
1. 예제에 사용 될 데이터들은 도커 이미지 생성 후 spark-3.3.0 안의 하위 폴더 data를 생성 후, 이 폴더에 추가합니다.
1.1 데이터와 도커 설치 및 활용하는 방법은 위에 링크를 남겼습니다.
2. 프로그램 시작은 cd spark-3.3.0 후, ./bin/spark-sql 명령어를 실행하시면 됩니다.

이전 글 링크 : [Spark] 스파크 SQL - 실행방법, 테이블, 뷰



1. 데이터베이스

  • 데이터베이스(database)는 여러 테이블을 조직화하기 위한 도구이다.
  • 스파크에서 실행하는 모든 SQL 명령문은 사용 중인 데이터베이스 범위에서 실행된다.

그전에, 관련 데이터를 spark-3.3.0/spark-warehouse/data에 있는지 확인해본다.


  • 다음은 전체 데이터베이스 목록을 확인할 수 있다.
SHOW DATABASES;
  • 실행결과

    image


1.1 데이터베이스 설정하기

  • CREATE DATABASE 키워드를 사용해 데이터베이스를 설정한다.
CREATE DATABASE some_db;


1.2 데이터베이스 설정하기

  • USE 키워드를 사용해 쿼리 수행에 필요한 데이터베이스를 설정한다.
    • 모든 쿼리는 테이블 이름을 찾을 때 앞서 지정한 데이터베이스를 참조한다.
USE some_db;


  • 첫 번째 코드를 사용해 다른 데이터베이스의 테이블에 쿼리를 수행할 수 있다.
  • 두 번째 코드를 사용해 현재 어떤 데이터베이스를 사용 중인지 확인할 수 있다.
  • 세 번째 코드를 사용해 기본 데이터베이스로 돌아갈 수 있다.
SELECT * FROM default.flights;

SELECT current_database();

USE default;


1.3 데이터베이스 제거하기

  • DROP DATABASE 키워드를 사용해 데이터베이스를 제거한다.
DROP DATABASE IF EXISTS some_db;



2. select 구문

  • 스파크 쿼리는 다음과 같이 ANSI-SQL 요건을 충족한다.
SELECT [ALL|DISTINCT] name_expression[, named_expression, ...]
    FROM relation [, relation, ...]
    [lateral_view[, lateral_view, ...]]
    [WHERE boolean_expression]
    [aggregation [HAVING boolean_expression]]
    [ORDER BY sort_expressions]
    [CLUSTER BY expressions]
    [DISTRIBUTE BY expressions]
    [SORT BY sort_expressions]
    [WINDOW named_window[, WINDOW named_window, ...]]

named_expressions:
  : expression [AS alias]

relation:
  | join_relation
  | (table_name|query|relation) [sample] [AS alias]
  : VALUES (expressions)[, (expressions), ...]
    [AS (column_name[, colmn_name, ...])]

expressions:
  : expression[, expression, ...]

sort_expressions:
  : expression [ASC|DESC][, expression [ASC|DESC], ...]


2.1 case…when…then 구문

  • case...when...then...end 구문을 사용해 조건에 맞는 처리를 할 수 있다.
    • 다음 구문은 프로그래밍의 if 구문과 동일한 처리를 한다.
SELECT
  CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
       WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
       ELSE -1 END
FROM partitioned_flights;



3. 고급 주제

  • SQL 쿼리는 특정 명령 집합을 실행하도록 요청하는 SQL 구문이다.
    • SQL 구문은 조작(manipulation), 정의(definition), 제어(control)와 관련된 명령을 정의할 수 있다.


3.1 복합 데이터 타입

  • 복합 데이터 타입은 표준 SQL과는 거리가 있으며 표준 SQL에는 존재하지 않는 매우 강력한 기능이다.
  • 스파크 SQL에는 구조체(structs), 리스트(lists), 맵(maps) 3가지 핵심 복합 데이터 타입이 존재한다.

구조체

  • 구조체는 맵에 더 가까우며 스파크에서 중첩 데이터를 생성하거나 쿼리하는 방법을 제공한다.
  • 구조체를 만들기 위해서는 여러 컬럼이나 표현식을 괄호로 묶기만 하면 된다.
CREATE VIEW IF NOT EXISTS nested_data AS
  SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights;


  • 점을 사용해 구조체의 개별 컬럼을 조회할 수 있다.
SELECT country.DEST_COUNTRY_NAME, count FROM nested_data;
  • 구조체의 이름과 모든 하위 컬럼을 지정해 모든 값을 조회할 수 있다.
SELECT country.*, count FROM nested_data;


리스트

  • 값의 리스트를 만드는 collect_list 함수나 중복 값이 없는 배열을 만드는 collect_set 함수를 사용할 수 있다.
    • 두 함수는 모두 집계(aggregation) 함수이므로 집계 연산 시에만 사용할 수 있다.
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts, collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME;


  • 컬럼에 직접 배열을 생성할 수도 있다.
SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights;


  • 배열 쿼리 구문을 사용해 리스트의 특정 위치의 데이터를 쿼리할 수 있다.
SELECT DEST_COUNTRY_NAME as new_name,  collect_list(count)[0]
FROM flights GROUP BY DEST_COUNTRY_NAME;


  • explode 함수를 사용해 배열을 다시 여러 로우로 변환할 수 있다.
    • 이 함수를 사용해 저장된 배열의 모든 값을 단일 로우 형태로 분해한다.
CREATE OR REPLACE TEMP VIEW flights_agg AS
  SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
  FROM flights GROUP BY DEST_COUNTRY_NAME;

SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg;


3.2 함수

  • 스파크 SQL은 복합 데이터 타입 외에도 다양한 고급 함수를 제공한다.
  • 스파크 SQL이 제공하는 전체 함수 목록을 확인하려면 SHOW FUNCTIONS 구문을 사용한다.
SHOW FUNCTIONS;


사용자 정의 함수

  • 스파크는 사용자 정의 함수를 정의하고 분산 환경에서 사용할 수 있는 기능을 제공한다.
  • 특정 언어를 사용해 함수를 개발한 다음, 등록하여 함수를 정의할 수 있다.
    • 스칼라 사용자 함수 정의 : def [func id]([param id]: [type], …): [type] = { [function] } 1
def power3(number:Double):Double = number * number * number
spark.udf.register("power3", power3(_:Double):Double)
SELECT count, power3(count) FROM flights;


3.3 서브쿼리

  • 서브쿼리(subquery)를 사용하면 쿼리 안에 쿼리를 지정할 수 있다.
  • 스파크는 2가지 기본 서브쿼리가 있다.
    • 상호 연관 서브쿼리(correlated subquery)는 서브쿼리의 정보를 보완하기 위해 쿼리의 외부 범위에 있는 외부 정보를 사용할 수 있다.
    • 조건절 서브쿼리(predicate subquery)는 값에 따라 필터링할 수 있게 해준다.


비상호연관 조건절 서브쿼리

  • 첫 번째 비상호연관 쿼리는 데이터 중 상위 5개의 목적지 국가 정보를 조회한다.
SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5;
  • 실행결과

    image


  • 이제 서브쿼리를 필터 내부에 배치하고 이전 예제의 결과에 출발지 국가 정보가 존재하는지 확인할 수 있다.
    • 이 쿼리는 쿼리의 외부 범위에 있는 어떤 관련 정보도 사용하고 있지 않으므로 비상호연관 관계이다.
SELECT * FROM flights
WHERE origin_country_name IN (SELECT dest_country_name FROM flights
      GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5);


상호연관 조건절 서브쿼리

  • 상호연관 조건절 서브쿼리는 내부 쿼리에서 외부 범위에 있는 정보를 사용할 수 있다.
    • ex. 목적지 국가에서 되돌아올 수 있는 항공편이 있는지 알고 싶다면 목적지 국가를 출발지 국가로, 출발지 국가를 목적지 국가로 하여 항공편이 있는지 확인한다.
SELECT * FROM flights f1
WHERE EXISTS (SELECT 1 FROM flights f2
            WHERE f1.dest_country_name = f2.origin_country_name)
AND EXISTS (SELECT 1 FROM flights f2
            WHERE f2.dest_country_name = f1.origin_country_name);


비상호연관 스칼라 쿼리

  • 비상호연관 스칼라 쿼리(uncorrelated scalar query)를 사용하면 기존에 없던 일부가 정보를 가져올 수 있다.
SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights;





References

댓글남기기