6 분 소요

image

“Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing” 논문을 개인 공부 및 리뷰를 위해 쓴 글입니다.

Contents
1. Introduction
2. Resilient Distributed Datasets
3. Spark Programming Interface
4. Representing RDDs
5. Implementation
6. Evaluation
7. Discussion

논문 출처 : NSDI 12 paper - RDDs




1. Introduction

  • 맵리듀스(MapReduce)같은 클러스터 컴퓨팅 프레임워크는 대규모 데이터 분석에 널리 채택되었다.
  • 이런 시스템을 통해 사용자는 작업 분산(work distribution)과 장애 허용(fault tolerance)에 대해 걱정할 필요 없이 높은 수준의 연산자 집합을 사용하여 병렬(parallel) 연산을 작성할 수 있다.
  • 하지만, 현재 프레임워크는 클러스터의 계산 리소스에 액세스하기 위한 수많은 추상화(abstraction)을 제공하지만 분산 메모리(distributed memory)를 활용하기 위한 추상화는 부족하다.
    • 추상화(abstraction) : 복잡한 자료, 모듈, 시스템 등으로부터 핵심적인 개념 또는 기능을 간추려 내는 것을 말한다. 1
  • 이로 인해 여러 계산에서 중간 결과(intermediate result)를 재사용(reuse)하는 중요한 클래스의 애플리케이션에는 비효율적이다.
    • 데이터 재사용은 PageRank, K-means 클러스터링, 로지스틱 회귀 분석을 포함한 많은 반복 기계 학습 및 그래픔 알고리즘에서 일반적이다.
    • 사용자가 데이터의 동일한 하위 집합에 대해 여러 Adhoc 쿼리(query)를 실행하는 대화형 데이터 마이닝에도 그렇다.
      • Adhoc query : 그때 그때 만들어 쓰는 쿼리. 즉, 서버에 캐시되지 않고 즉석해서 만들어진 쿼리 2


  • 하지만, 대부분의 프레임워크에서 계산 간(ex. 맵리듀스 작업 간) 데이터를 재사용할 수 있는 유일한 방법은 외부 안정적인 스토리지 시스템(ex. 분산 파일 시스템)에 데이터를 쓰는 것이다.
  • 이로 인해 데이터 복제, 디스크 I/O 및 직렬화(serialization)로 인해 상당한 오버헤드가 발생하므로 애플리케이션 실행 시간이 많이 소요될 수 있다.
    • 직렬화(serialization) : 데이터 구조나 오브젝트 상태를 동일하거나 다른 컴퓨터 환경에 저장(ex. 파일이나 메모리 버퍼에서 또는 네트워크 연결 링크 간 전송)하고 나중에 재구성할 수 있는 포맷으로 변환하는 과정 3
      • 오브젝트를 직렬화하는 과정은 오브젝트를 마샬링(marshalling)이라 한다.
  • 따라서 저자들은 데이터 재사용이 필요한 애플리케이션을 위한 전문 프레임워크를 개발했다.
  • 본 논문에서는 광범위한 애플리케이션에서 효율적인 데이터 재사용을 가능하게 하는 탄력적 분산 데이터 세트(Resilient Distributed Datasets, RDDs)라는 새로운 추상화를 제안한다.
    • Resilient : 메모리 내부에서 데이터가 손실 시 유실된 파티션을 재연산해 복구할 수 있다.
    • Distributed : 스파크 클러스터를 통하여 메모리에 분산되어 저장된다.
    • Data : 파일, 정보 등등
  • RDDs는 사용자가 메모리에 중간 결과(intermediate result)를 명시적(explicitly) 유지하고(persist), 데이터 배치(data placement)를 최적화하기 위해 파티셔닝(partitioning)을 제어하고 풍부한 연산자 집합을 사용하여 조작할 수 있도록 하는 장애 허용(fault tolerance) 병렬(parallel) 데이터 구조이다.
    • 파티셔닝(partitioning) : 데이터베이스를 여러 부분으로 분할하는 것이다. 데이터가 너무 커졌을 때, 조회하는 시간이 길어졌을 때 또는 관리 용이성, 성능, 가용성 등의 향상을 이유로 행해진다. 4


이미지 출처 5

  • RDDs를 설계할 때의 주요 과제는 fault tolerance를 효율적으로 제공할 수 있는 프로그래밍 인터페이스를 정의하는 것이다.
  • 분산 공유 메모리(distributed shared memory), 키/값 저장(key-value store), 데이터베이스(database) 및 Piccolo와 같은 클러스터의 인메모리(in-memory) 스토리지를 위한 기존 추상화는 가변(mutable) 상태에 대한 세분화된 업데이트를 기반으로 인터페이스를 제공한다.


이미지 출처 6

  • 이 인터페이스를 사용할 경우 fault tolerance를 제공하는 유일한 방법은 시스템 간에 데이터를 복제하거나 시스템 간에 업데이트를 기록(log)하는 것이다.
    • 비용이 너무 많이 든다.


      이미지출처 7

  • RDDs는 많은 데이터 항목에 동일한 작업을 적용하는 coarse-grained 변환(transformation) (ex. map, filter, join)을 기반으로 인터페이스를 제공한다.
  • 이를 통해 실제 데이터가 아닌 데이터 세트(lineage)를 구축하는 데 사용되는 변환(transformation)을 로깅(logging)하여 장애 허용을 효율적으로 제공할 수 있다.
    • 만약 RDDs의 파티션이 손실된다면, RDDs는 그것이 어떻게 다른 RDDs로부터 파생되었는지에 대한 충분한 정보를 가지고 그 파티션만을 재계산(recompute)한다.
    • 따라서 손실된 데이터를 복제 비용이 많이 들지 않고 신속하게 복구할 수 있다.
  • RDDs는 많은 병렬 애플리케이션에 적합한데, 이러한 애플리케이션들은 자연스럽게 여러 데이터 항목에 동일한 작업을 적용하기 때문이다.



2. Resilient Distributed Datasets (RDDs)

2.1 RDD Abstraction


이미지출처 8

  • RDD는 읽기 전용(read-only), 분할된 레코드(record)의 모음이다.
    • 레코드(record) : 데이터베이스에서 하나의 단위로 취급되는 자료의 집합 (행과 같음) 9
  • RDD는 1) 안정적인 스토리지 안의 데이터 또는 2) 다른 RDDs에 대한 결정적 연산(deterministic operation)을 통해서만 생성될 수 있다.
    • 결정적(deterministic) : 예측한 그대로 동작. 어떤 특정한 입력이 들어오면 언제나 똑같은 과정을 거쳐서 언제나 똑같은 결과를 내놓는다. 10
  • 이러한 작업을 RDD의 다른 작업과 구별하기 위해 변환(transformation)이라 부른다.
    • 변환의 예로는 map, filter, join 등이 있다.
  • RDD는 다른 데이터 세트(ex. 계보(lineage))에서 어떻게 파생되었는지에 대한 충분한 정보를 가지고 있어 안정적인 저장소의 데이터로부터 파티션을 계산한다.
  • 사용자는 재사용할 RDD를 지정하고 이를 위한 스토리지 전략(ex. in-memory 스토리지)을 선택할 수 있다.
  • 또한 RDD의 요소들이 각 레코드의 키(key)에 기초하여 기계들 사이에 분할되도록 요청할 수 있다.


2.2 Spark Programming Interface

  • 프로그래머는 안정적인 스토리지의 데이터에 대한 변환(ex. map, filter)을 통해 하나 이상의 RDD를 정의하는 것으로 시작한다.
  • 그런 다음 이러한 RDD를 애플리케이션에 값을 반환하거나 데이터를 스토리지 시스템으로 내보내는 연산(operation)에 사용할 수 있다.
    • 연산(operation)의 예로는
    • count : 데이터셋의 요소 수를 반환
    • collect : 요소 자체를 반환
    • save : 데이터셋을 스토리지 시스템으로 출력
  • spark는 기본적으로 메모리에 지속적인(persistent) RDD를 유지하지만 RAM이 충분하지 않으면 디스크에 지속적인 RDD를 보낼 수 있다.


2.2.1 Example: Console Log Mining

  • 웹 서비스에서 오류가 발생하고 있으며 운영자가 원인을 찾기 위해 HDFS에서 테라바이트의 로그를 검색하려고 한다고 가정한다.
  • 스파크를 사용하면 운영자는 로그의 오류 메시지만 노드(node) 집합에 걸쳐 RAM으로 로드하고 대화식으로(interactively) 쿼리(query)할 수 있다.

    lines = spark.textFile("hdfs:/...") // HDFS 파일로 백업된 RDD를 정의
    errors = lines.filter(_.startsWith("ERROR")) // 필터링된 RDD를 파생
    errors.persist() // 오류가 메모리에 남아 쿼리 간에 공유될 수 있도록 요청
    


  • 현재 클러스터에 대해 수행될 작업이 없다면 RDD를 사용하여 메시지 수를 셀 수 있다.

    errors.count()
    


  • 사용자는 다음과 같이 RDD에서 추가 변환을 수행하고 결과를 사용할 수도 있다.

    errors.filter(_.contains("MySQL")).count()  // MySQL에서 언급한 에러 세기
    
    // HDFS를 배열(array)로 언급하는 오류의 time field를 반환한다.
    // 시간이 탭으로 구분된 형식의 필드 번호를 3이라 가정
    errors.filter(_.contains("HDFS"))
          .map(_.split('\t')(3))
          .collect()
    
  • 오류와 관련된 첫번째 액션(action)이 실행된 후 스파크는 오류의 파티션을 메모리에 저장하여 후속 연산을 크게 가속화한다.

    • 기본 RDD는 RAM에 로드되지 않는다.
    • 이는 오류 메시지가 데이터의 작은 부분에 불과하기 때문이다.

image

  • 위 그림은 모델이 fault tolerance를 달성하는 방법을 설명하기 위한 3개의 쿼리에 대한 RDD의 계보(lineage) 그래프를 보여준다.
  • 이 쿼리에서는 라인에서 필터의 결과인 오류로 시작하여 collect를 실행하기 전에 추가 필터와 맵을 적용했다.
  • 스파크 스케줄러는 나머지 두 개의 변환을 파이프라인화하고 캐시된 오류 파티션을 보유한 노드에 태스크 집합을 전송한다.
  • 또한 오류 파티션이 손실되면 spark는 해당 라인 파티션에만 필터를 적용하여 재구성한다.


2.3 Advantages of the RDD Model

  • coarse-grained : fine-grained 보다 큰 구성 요소이다. 하나 이상의 fine-gained 서비스를 더 coarse-grained로 간단히 정리할 수 있다. 11
  • fine-grained : 더 큰 구성 요소를 구성하는 더 작은 구성 요소. 낮은 수준의 서비스 11


image

  • DSM(distributed shared memory) 시스템에서 애플리케이션은 전역 주소 공간의 임의의 위치에 읽고 쓴다.
  • DSM은 매우 일반적인 추상화이지만, 이러한 일반성은 상용 클러스터에서 효율적이고 fault tolerance한 방식으로 구현하기 어렵다.
  • RDD는 coarse-grained 변환을 통해서만 생성이 되며, 보다 효율적인 fault tolerance를 허용한다.
    • 특히, RDD는 계보(lineage)를 사용하여 복구할 수 있기 때문에 체크포인트의 오버헤드를 부담할 필요가 없다.
    • 또한, RDD의 손실된 파티션만 장애시 재계산하면 되며, 전체 프로그램을 롤백(rollback)할 필요 없이 서로 다른 노드에서 병렬로 재계산할 수 있다.


  • RDD는 시스템이 맵리듀스와 같이 느린 태스크의 백업 복사본을 실행함으로써 느린 노드(straggler)를 완화할 수 있는 이점이 있다.
    • 백업 작업은 DSM에서는 어려울 수 있는데, 백업 작업의 두 복사본이 동일한 메모리 위치에 액세스하여 서로의 업데이트를 방해하기 때문이다.
    • 스트래글러(straggler) : 여러 작업들 간에 작업량이 균일하지 않게 배포되거나, 데이터 스큐(skewed)로 인해 한 작업에서 더 많은 데이터를 처리하는 경우에 발생할 수 있다. 12
  • 또한 RDD의 대량(bulk) 연산에서 런타임은 성능을 향상시키기 위해 데이터 인접성(locality)을 기반으로 태스크를 스케줄할 수 있다.
  • RDD는 스캔 기반 작업에만 사용되는 한 저장할 메모리가 충분하지 않을 때 성능이 저하된다.
    • RAM에 맞지 않는 파티션은 디스크에 저장할 수 있으며 현재 데이터 병렬 시스템과 유사한 성능을 제공한다.


2.4 Applications Not Suitable for RDDs

  • RDD는 웹 응용 프로그램이나 증분 웹 크롤러를 위한 스토리지 시스템과 같이 공유(shared) 상태로 비동기적으로(asynchronous) fine-grained하게 업데이트되는 응용 프로그램에는 적합하지 않다.
  • 이러한 애플리케이션의 경우 기존 업데이트 로깅 및 데이터 체크포인트를 수행하는 시스템을 사용하는 것이 더 효율적이다.
  • spark의 목표는 배치 분석을 위한 효율적인 프로그래밍 모델을 제공하고 이러한 비동기 애플리케이션을 특수 시스템에 맡기는 것이다.



3. Spark Programming Interface

  • 스칼라(scala)는 간결성(conciseness)과 효율성(efficiency)의 조합으로 이 언어를 선택했다.
    • 간결성은 대화적(interactive) 사용에 편리함
    • 효율성은 정적(static) 타이핑 때문
      • 정적 타이핑(static typing) : 코드를 작성할 때 컴퓨터적 구조를 명시해주기 때문에 컴퓨터가 해야할 일을 덜어 준다. 13


image

  • 스파크를 사용하기 위해 개발자는 워커 클러스터에 연결하는 드라이버(driver) 프로그램을 작성한다.
    • 워커(worker) : 여러 연산에 걸쳐 RDD 파티션을 RAM에 저장할 수 있는 long-lived 프로세스이다.
  • 드라이버는 하나 이상의 RDD를 정의하고 RDD에 대한 액션(action)을 호출한다.
    • RDD의 계보(lineage)도 추적한다.


3.1 RDD Operations in Spark

image

  • Table 2는 스파크에서 사용할 수 있는 주요 RDD 변환 및 액션을 보여준다.
    • 변환(transformation)은 새로운 RDD를 정의하는 게으른(lazy) 연산이며, 액션(action)은 프로그램에 값을 반환하거나 외부 스토리지에 데이트를 쓰는(write) 계산을 시작한다.
      • Lazy evaluation : 계산의 결과값이 필요할 때까지 계산을 늦추는 기법. 즉시, 평가하지 않고 필요한 것만 평가하는 방법 14


3.2 Example Applications

3.2.1 Logistic Regression

  • 기계 학습 알고리즘은 본질적으로 반복적이다. 따라서 데이터를 메모리에 저장함으로써 훨씬 더 빠르게 실행할 수 있다.
val points = spark.textFile(...)
                  .map(parsePoint).persist()
val w = // random initial vector
for (i <- 1 to ITERATIONS) {
  val gradient = points.map{ p =>
   p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
  }.reduce((a,b) => a+b)
  w -= gradient
}


3.2 PageRank

  • PageRank 알고리즘은 각 문서에 연결된 문서의 기여도를 합산하여 각 문서에 대한 순위를 반복적으로(iteratively) 업데이트한다.
  • 각각의 반복에서, 각 문서는 $\frac{r}{n}$의 기여도를 이웃들에게 보낸다.
    • $r$ : 순위(rank)
    • $n$ : 이웃의 수
  • 그런 다음 순위를 $\alpha/N+(1-\alpha)\sum c_i$으로 업데이트한다.
    • 합계(sum) : 자신이 받은 기여도를 초과를 나타낸다.
    • N : 문서의 총 수
  • PageRank를 spark로 나타내면 다음과 같다.
// Load graph as an RDD of (URL, outlinks) pairs
val links = spark.textFile(...)
                 .map(...)
                 .persist()
val ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
  // Build an RDD of (targetURL, float) pairs
  // with the contributions sent by each page
  val contribs = links.join(ranks).flatMap {
    (url, (links, rank)) =>
      links.map(dest => (dest, rank/links.size))
  }
  // Sum contributions by URL and get new ranks
  ranks = contribs.reduceByKey((x,y) => x+y)
                  .mapValues(sum => a/N + (1-a)*sum)
}


image

  • 이 프로그램은 Figure 3의 RDD 계보 그래프로 이어진다.
  • 각 반복마다 이전 반복의 기여와 순위 및 정적 링크 데이터 셋을 기반으로 새로운 순위 데이터 셋을 생성한다.
  • 링크 데이터 집합의 파티션은 입력 파일의 블록(block)에서 맵을 다시 실행하면 효율적으로 재구성될 수 있으므로 복제할 필요가 없다.
  • 또한 RDD의 파티셔닝을 제어함으로써 PageRank의 통신을 최적화할 수 있다.
    • 링크에 대한 파티셔닝을 지정하면 동일한 방식으로 순위를 파티셔닝할 수 있으며, 링크와 순위 간의 join 연산이 통신을 필요로 하지 않도록 보장할 수 있다.





Referecnes

댓글남기기