11 분 소요

image

“GraphX: Graph Processing in a Distributed Dataflow Framework” 논문을 개인 공부 및 리뷰를 위해 쓴 글입니다.

논문 출처 pdf : graphx paper
논문의 이해를 돕기 위해 GraphX에 대한 ppt 링크도 첨부하겠습니다.
GraphX ppt in standford.edu 링크 : GraphX - Graph Analytics in Spark
논문에 나온 프로그래밍 언어는 스칼라(Scala)로 기재했습니다.




1. Introduction

  • 프리겔(Pregel)같은 시스템은 그래프별 최적화를 지원하는 특수 추상화(abstraction)를 노출함으로써 수십억 개의 정점(vertex)과 에지(edge)가 있는 그래프에서 PageRank 같은 반복 그래프 알고리즘을 효율적으로 실행할 수 있다.
  • 그 결과, 그래프 처리 시스템은 일반적으로 하둡 맵리듀스(MapReduce)와 같은 범용 분산 데이터 흐름 프레임워크를 훨씬 능가한다.
  • 사실 그래프는 비정형 및 표 형식의 데이터와 결합하는 더 큰 분석 프로세스의 일부일 뿐이다.
  • 결과적으로, 분석 파이프라인(Fig 11)은 여러 시스템을 구성하도록 강요되어 복잡성을 증가시키고 불필요한 데이터 이동과 복제를 초래한다.

image


  • 맵리듀스, 스파크, 드라이어드는 풍부한 데이터 흐름 연산자(map, reduce, groupBy, join)를 노출하며, 비정형 및 표 형식의 데이터를 분석하는 데 적합하고 널리 채택된다.
  • 그러나 데이터 흐름 연산자를 사용하여 반복 그래프 알고리즘을 직접 구현하는 것은 어려울 수 있으며, 여러 단계의 복잡한 결합이 필요하다.
  • 또한 분산 데이터 흐름 프레임워크에 정의된 조인 및 집계 전략은 반복 그래프 알고리즘에서 공통 패턴과 구조를 활용하지 않으므로 중요한 최적화 기회를 놓친다.


  • 역사적으로 그래프 처리 시스템은 몇 가지 이유로 분산 데이터 흐름 프레임워크와 별도로 진화했다.
  • 첫째로, 분산 데이터 흐름 프레임워크(예: 맵리듀스)에서 싱글 스테이지 계산 및 디스크 내 처리에 대한 초기 강조로 그래프의 서브셋(subset)에 반복적으로 랜덤하게 액세스하는 반복 그래프 알고리즘에 대한 적용성(applicability)이 제한되었다.
  • 둘째, 초기 분산 데이터 흐름 프레임워크는 데이터 파티셔닝(partitioning)에 대한 세밀한 제어를 노출하지 않아 그래프 파티셔닝 기술의 적용을 방해했다.


  • 본 논문에서는 그래프 계산에서 필수적인 데이터 흐름 패턴을 식별하고 그래프 처리 시스템의 최적화를 데이터 흐름 최적화로 재구성함으로써 범용 분산 데이터 흐름 프레임워크 내에서 전문화된 그래프 처리 시스템의 장점을 복구할 수 있다고 주장한다.
  • 이 주장을 뒷받침하기 위해 저자들은 스파크에 내장된 효율적인 그래프 처리 프레임워크인 GraphX를 소개한다.

image

  • GraphX API는 기존 그래프 처리 시스템과 달리 비정형 및 표 형식의 데이터로 그래프를 구성할 수 있으며 데이터 이동이나 중복 없이 동일한 물리적 데이터를 그래프와 컬렉션으로 모두 볼 수 있다.
    • 예를 들어, GraphX를 사용하면 사용자 코멘트와 함께 소셜 그래프를 결합하고, 그래프 알고리즘을 적용하고, 결과를 collect 또는 그래프 시각화 또는 롤업(roll-up)하는 것이 쉽다.
  • 결과적으로, GraphX를 통해 사용자는 성능이나 유연성을 희생하지 않고도 현재 작업에 가장 적합한 계산 패턴을 채택할 수 있다.
  • GraphX는 spark를 수정할 필요가 없으므로 분산 데이터 흐름 프레임 워크 내에 그래프 계산을 내장하고 그래프 계산을 특정 join-map-group-by 데이터 흐름 패턴으로 증류(distill)하는 일반적인 방법을 보여준다.
  • 그래프 계산을 특정 패턴으로 줄임으로써 시스템 최적화를 위한 중요한 경로를 식별한다.


  • 유연한 정점 절단(vertex-cut) 파티셔닝은 그래프를 수평으로 분할된 컬렉션(collection)으로 인코딩하고 분산 그래프 파티셔닝의 최신 기술과 일치시키는 데 사용된다.
  • GraphX는 그래프 처리 시스템의 맥락에서 개발된 시스템 최적화를 조인 최적화 및 구체화된 뷰 유지 관리로 재구성하고 이러한 기술을 스파크 데이터 흐름 연산자에 적용한다.
  • GraphX는 논리적 파티셔닝과 리니지(lineage)를 활용하여 저렴한 장애 허용(fault tolerance)을 달성한다.
  • 마지막으로, 불변성(immutability)을 이용하여 GraphX는 그래프 및 수집 보기와 여러 반복에 걸쳐 인덱스를 재사용하여 메모리 오버헤드를 줄이고 시스템 성능을 향상시킨다.



2. Background

  • 고수준에서, 그래프 처리 시스템은 정점과 이웃의 세분성에서 계산을 정의하고 그래프에 의해 미리 정의된 희소 종속 구조(sparse dependency structure)를 활용한다.


이미지출처 1


2.1 The Property Graph Data Model

  • 그래프 처리 시스템은 그래프 구조 데이터를 속성(property) 그래프로 표현하며, 이는 사용자 정의 속성을 각 정점 및 에지와 연관시킨다.
    • 속성에는 메타 데이터와 프로그램 상태가 포함될 수 있다.
    • 속성 그래프(property graph) : 각 정점과 에지에 연결된 사용자 정의 객체를 포함하는 방향 멀티 그래프이다. 2
  • 그래프 처리 시스템의 연산은 일반적으로 미리 선언된 희소 구조를 가진 싱글 속성 그래프와 관련하여 정의된다.
    • 희소 그래프(sparse graph) : 간선의 수가 적은 그래프 3
  • 이러한 제한된 초점은 다양한 최적화를 용이하게 하지만, 여러 그래프와 하위 그래프에 걸쳐 있을 수 있는 분석 작업의 표현을 복잡하게 만든다.


2.2 The Graph-Parallel Abstraction

  • 페이지랭크(PageRank) 알고리즘은 인접한 정점과 에지의 속성을 기반으로 정점 속성을 반복적으로 변환한다.
  • 이 반복적인 로컬 변환(transformation)의 공통 패턴은 그래프 병렬 추상화의 기초를 형성한다.
  • 그래프 병렬 추상화에서 사용자 정의 정점 프로그램은 각 정점에 대해 동시에 인스턴스화되고 메시지(ex. Pregel) 또는 공유 상태(ex. PowerGraph)를 통해 인접한 정점 프로그램과 상호 작용한다.


// Listing 1: PageRank in Pregel
def PageRank(v: Id, msgs: List[Double]) {
    // 메시지 합계(sum)를 계산한다.
    var msgSum = 0
    for (m <- msgs) { msgSum += m}
    // 페이지랭크를 업데이트한다.
    PR(v) = 0.15 + 0.85 * msgSum
    // 새로운 PR로 메시지를 브로드새크스한다.
    for (j <- OutNbrs(v)) {
        msg = PR(v) / NumLinks(v)
        send_msg(to=j, msg)
    }
    // 종료를 위한 체크를 한다.
    if (converged(PR(v))) voteToHalt(v)
}

더 자세히 알고 싶다면
프리겔 논문 정리 링크 : [논문 리뷰] Pregel: A System for Large-Scale Graph Processing - poeun

  • 그래프 병렬 추상화는 그래프의 정적 이웃 구조같은 반복 그래프 알고리즘에 적합하지만, 분리된(disconnected) 정점이 상호 작용하거나 계산이 그래프 구조를 변경하는 계산을 표현하는 데는 적합하지 않다.
  • 예를 들어, raw 텍스트, 비정형 데이터에서 그래프 구성 또는 그래프 조대화(coarsening), 여러 그래프에 걸친 분석 등의 작업은 정점 중심 프로그래밍 모델에서 표현하기 어렵다.


2.3 Graph System Optimizations

GAS 분해(Decomposition)

  • 대부분의 정점 프로그램은 일반화된 교환(commutative) 결합(associative) 합계의 형태로 메시지를 수집한 다음 병렬 루프에서 새로운 메시지를 브로드캐스트함으로써 이웃 정점과 상호 작용한다.
  • Gonzalez et al는 정점 프로그램을 세 개의 데이터 병렬 스테이지로 나누는 GAS 분해를 제안하였다. (PowerGraph) 4
    • Gather, Apply, Scatter
    def Gather(a: Double, b: Double) = a + b
    def Apply(v, msgSum) {
        PR(v) = 0.15 + 0.85 * msgSum
        if (converged(PR(v))) voteToHalt(v)
    }
    def scatter(v, j) = PR(v) / NumLinks(v)
    
  • GAS 분해는 메시지 계산의 pull 기반 모델로 이어진다.
  • 시스템은 사용자가 정점 프로그램에서 직접 메시지를 보내는 대신 인접한 정점 사이의 메시지 값을 정점 프로그램에 묻는다.
  • 결과적으로 GAS 분해는 정점 절단 파티셔닝, 작업 균형 개선, 직렬 에지 반복 및 데이터 이동 감소를 가능하게 한다.
  • 그러나 GAS 분해는 그래프에서 인접하지 않은 정점 간의 직접적인 통신을 금지하므로, 보다 일반적인 통신 패턴의 표현을 방해한다.


Graph Partitioning

  • 그래프 처리 시스템은 통신을 최소화하고 계산을 균형 있게 하기 위해 다양한 그래프-파티셔닝 알고리즘을 적용한다.
  • 정점 절단 파티셔닝은 각 정점이 절단되는 횟수를 최소화하는 방식으로 머신에 에지를 균등하게 할당한다.
  • Mirror Vertices
    • 종종 고차원의 정점은 동일한 원격 시스템에 여러 개의 이웃이 있을 것이다.
    • 동일한 여러 개의 메시지를 네트워크를 통해 전송하는 대신, 그래프 처리 시스템은 싱글 메시지를 미러로 보낸 다음 모든 이웃으로 전달하는 미러링 기술을 채택한다.
    • 그래프 처리 시스템은 정적(static) 그래프 구조를 활용하여 미러 데이터 구조를 재사용한다.
  • Active Vertices
    • 그래프 알고리즘이 진행됨에 따라 그래프 내의 정점 프로그램은 서로 다른 속도로 수렴되어 활성 정점 프로그램의 집합이 급격히 축소된다.
    • 최근 시스템은 활성 정점을 추적하고 수렴된 정점에 대한 데이터 이동과 불필요한 계산을 제거한다.


2.4 Distributed Dataflow Frameworks

  • 분산 데이터 흐름 프레임워크의 세부 사항은 일반적으로 다음과 같은 특성을 만족한다.
    1. 유형(typed) 컬렉션으로 구성된 데이터 모델
    2. 컬렉션을 변환하는 결정론적 연산자(ex. map, groupBy, join)로 구성된 데이터 정렬 프로그래밍 모델
    3. 각 작업을 DAG(Direct Cyclic Graph)로 분할하는 스케줄러
    4. 다시 시작하지 않고 스트래글러(straggler)와 부분(partial) 클러스터 장애를 허용할 수 있는 런타임


  • 스파크는 GraphX를 위해 매력적인 특징들을 가지고 있다.
    1. RDD(Resilient Distributed Datasets)라고 하는 spark 스토리지 추상화는 애플리케이션이 데이터를 메모리에 유지할 수 있도록 하며, 이는 반복 그래프 알고리즘에 필수적이다.
    2. RDD는 사용자 정의 데이터 파티셔닝을 허용하며 실행 엔진은 이를 활용하여 데이터 이동을 피하기 위해 RDD를 공동 파티셔닝하고 작업을 공동 스케줄링할 수 있다.
      • 이것은 파티셔닝된 그래프를 인코딩하는 데 필수적이다.
    3. 스파크는 RDD를 구축하는 데 사용되는 작업 리니지를 기록하여 장애 발생 시 손실된 파티션을 자동으로 재구성할 수 있다.
      • 리니지 그래프는 장시간 실행되는 애플리케이션에서도 상대적으로 작기 때문에 체크포인트와 달리 런타임 오버헤드를 무시할 수 있다.
    4. 스파크는 쉽게 확장할 수 있는 scala의 고급 API를 제공한다.


3. The GraphX Programming Abstraction

3.1 Property Graphs as Collections


이미지출처 1

  • 속성 그래프는 한 쌍의 정점과 에지 속성 컬렉션으로 논리적으로 표현될 수 있다.
  • 정점 컬렉션은 정점 식별자에 의해 고유하게 키가 지정됨 정점 속성을 포함한다.
  • GraphX 시스템에서 정점 식별자는 64비트 정수이며 외부(ex. 사용자 ID)에서 파생되거나 정점 속성(ex. 페이지 URL)에 해시 함수를 적용하여 파생될 수 있다.
  • 에지 컬렉션에는 소스 및 목적지 정점 식별자에 의해 키가 지정된 에지 속성이 포함되어 있다.
  • 속성 그래프를 컬렉션 쌍으로 줄임으로써 분산 데이터흐름 프레임워크에서 다른 컬렉션과 함께 그래프를 구성할 수 있다.


3.2 Graph Computation as Dataflow Ops

  • 속성 그래프를 정점 및 에지 속성 컬렉션 쌍으로 정규화하면 분산 데이터 흐름 프레임워크에 그래프를 포함할 수 있다.
  • 분산 데이터 흐름 프레임워크에서 그래프 병렬 계산을 맵 연산에 의해 중단되는 join 단계와 groupBy 단계의 시퀀스로 표현할 수 있다.
  • 조인(join) 단계에서, 정점과 에지 속성은 각 에지와 그에 상응하는 소스 및 목적지 정점 속성으로 구성된 triplets 뷰를 형성하기 위해 조인된다.
CREATE VIEW triplets AS
SELECT s.Id, d.Id, s.P, e.P, d.P
FROM edges AS e
JOIN vertices AS s JOIN vertices AS d
ON e.srcId = s.Id AND e.dstId = d.Id



이미지출처 1

  • triplets는 소스 또는 목적지 정점별로 그룹화되어 각 정점의 이웃을 구성하고 집계를 계산한다.
  • 예를 들어, 정점의 페이지랭크를 계산하기 위해 다음과 같이 실행한다.
    • 아래 쿼리를 반복적으로 적용하여 수렴할 때까지 정점 속성을 업데이트함으로써 각 정점의 페이지랭크를 계산할 수 있다.
SELECT t.dstId, 0.15+0.85*sum(t.srcP*t.eP)
FROM triplets AS t GROUP BY t.dstId


  • 위 두 단계는 GAS 분해를 보여준다.
    • groupBy 단계는 동일한 정점으로 향하는 메시지를 수집하고, 개입 map 연산은 메시지 합을 적용하여 정점 속성을 업데이트하며, join 단계는 새로운 정점 속성을 인접한 모든 정점으로 분산시킨다.


3.3 GraphX Operators

  • 그래프 생성자(constructor)는 한 쌍의 정점 및 에지 속성 컬렉션을 논리적으로 결합하여 속성 그래프를 만든다.
    • 또한 무결성(integrity) 제약 조건도 확인한다.


이미지출처 1

  • mrTriplets (Map Reduce Triplets) 연산자는 3.2절에 정의된 그래프 병렬 계산의 필수적인 2단계 프로세스를 인코딩한다.
    • mrTriplets 연산자는 트리플렛 뷰에서 맵 및 그룹별 데이터 흐름 연산자의 구성이다.
  • 사용자 정의 맵 함수는 각 트리플렛에 적용되어 값을 산출하고 사용자 정의 집계 함수를 사용하여 대상 정점에서 집계된다.
SELECT t.dstId, reduceF(mapF(t)) AS msgSum
FROM triplets AS t GROUP BY t.dstId


  • mrTriplets 연산자는 목적지 정점 식별자에 의해 키가 지정된 인바운드 메시지의 합계를 포함하는 컬렉션을 생성한다.

image

val graph: Graph[User, Double]
def mapUDF(t: Triplet[User, Double]) =
  if (t.src.age > t.dst.age) 1 else 0
def reduceUDF(a: Int, b: Int): Int = a + b
val seniors: Collection[(Id, Int)] = 
  graph.mrTriplets(mapUDF, reduceUDF)


  • 마지막으로, listing 4에는 정점 또는 에지 컬렉션에서 데이터 흐름 연산을 간단히 수행하는 여러 함수가 포함되어 있다.
// listing 4

// Graph Operators
class Graph[V, E] {
  // 생성자
  def Graph(v: Collection[(Id, V)],
            e: Collection[(Id, Id, E)])
  // 컬렉션 뷰로 초기화
  def vertices: Collection[(Id, V)]
  def edges: Collection[(Id, Id, E)]
  def triplets: Collection[Triplet]
  // 그래프 병렬 계산
  def mrTriplets(f: (Triplet) => M,
      sum: (M, M) => M): Collection[(Id, M)]
  // 다음은 편의 함수들이다.
  def mapV(f: (Id, V) => V): Graph[V, E]
  def mapE(f: (Id, Id, E) => E): Graph[V, E]
  def leftJoinV(v: Collection[(Id, V)],
      f: (Id, V, V) => V): Graph[V, E]
  def leftJoinE(e: Collection[(Id, Id, E)],
      f: (Id, Id, E, E) => E): Graph[V, E]
  def subgraph(vPred: (Id, V) => Boolean,
      ePred: (Triplet) => Boolean): Graph[V, E]
  def reverse: Graph[V, E]
}


  • 예를 들어 mapV는 다음과 같이 정의된다.
g.mapV(f) = Graph(g.vertices.map(f), g.edges)


  • 다음은 GraphX API를 사용하여 프리겔 추상화의 GAS 분해를 구현한다.
// GraphX Enhanced Pregel
def Pregel(g: Graph[V, E],
      vprog: (Id, V, M) => V, // vprog = vertex program
      sendMsg: (Triplet) => M,
      gather: (M, M) => M): Collection[V] = {
  // Set all vertices as active
  g = g.mapV((id, v) => (v, halt=false))
  // Loop until convergence
  while (g.vertices.exists(v => !v.halt)) {
    // Compute the messages
    val msgs: Collection[(Id, M)] =
    // Restrict to edges with active source
    g.subgraph(ePred=(s,d,sP,eP,dP)=>!sP.halt)
    // 활성 정점이 있는 동안 메시지는 mrTriplets 연산자를 사용하여 계산되고,
    // 정점 프로그램은 결과 메시지 합계에 적용된다.
      .mrTriplets(sendMsg, gather)
    // Receive messages and run vertex program
    g = g.leftJoinV(msgs).mapV(vprog)
  }
  return g.vertices
}


  • 다음은 연결 구성 요소(connected components) 알고리즘을 구현하기 위해 프리겔의 GraphX 변형을 사용한다.
    • 연결 구성 요소(connected components)은 각 정점에 대해 도달 가능한 가장 낮은 정점 ID를 계산한다.
def ConnectedComp(g: Graph[V, E]) = {
  g = g.mapV(v => v.id) // Initialize vertices
  def vProg(v: Id, m: Id): Id = {
    if (v == m) voteToHalt(v)
    return min(v, m)
  }
  def sendMsg(t: Triplet): Id = 
    if (t.src.cc < t.dst.cc) t.src.cc
    else None // No message required
  def gatherMsg(a: Id, b: Id): Id = min(a,b)|
  return Pregel(g, vProg, sendMsg, gatherMsg)
}


Combining Graph and Collection Operators

  • 종종 연결 정점의 그룹은 싱글 정점으로 더 잘 모델링된다.
  • 이러한 경우, 공통 특성(ex. 웹 도메인)을 공유하는 연결(connected) 정점을 집계하여 새로운 그래프(ex. 도메인 그래프)를 도출하는 것을 그래프를 coarsen하게 만드는 데 도움이 될 수 있다.
    • Coarsen : coarsening 연산자는 에지 predicate(=함수)를 만족하는 에지로 연결된 정점을 merge한다.
  • 다음 예제에서 그래프 coarsening를 구현하기 위해 GraphX 추상화를 사용한다.
def coarsen(g: Graph[V, E],
            pred: (Id,Id,V,E,V) => Boolean,
            reduce: (V,V) => V) = {
  // Restrict graph to contractable edges
  val subG = g.subgraph(v => True, pred)
  // Compute connected component id for all V
  val cc: Collection[(Id, ccId)] = ConnectedComp(subG).vertices
  // Merge all vertices in same component
  val superV: Collection[(ccId,V)] = g.vertices.leftJoin(cc).groupBy(CC_ID, reduce)
  // Link remaining edges between components
  val invG = g.subgraph(ePred = t => !pred(t))
  val remainingE: Collection[(ccId,ccId,E)] = invG.leftJoin(cc).triplets.map {
    e => (e.src.cc, e.dst.cc, e.attr)
  }
  // Return the final graph
  Graph(superV, remainingE)
}



4. The GraphX System

  • 이 섹션에서는 indexing, incremental view maintenance, join optimization를 포함한 기존 데이터베이스 시스템의 고전적 기법의 맥락에서 이러한 최적화를 설명한다.


4.1 Distributed Graph Representation

  • GraphX는 내부적으로 스파크 RDD 추상화에 구축된 정점 및 에지 컬렉션 쌍으로 그래프를 나타낸다.
  • 이러한 컬렉션에는 인덱싱(indexing) 및 그래프별 파티셔닝이 RDD 위에 레이어로 도입된다.
  • 다음 그림은 수평으로(horizontally) 파티셔닝된 정점 및 에지 컬렉션과 그 인덱스의 물리적 표현을 보여준다.

image


  • 정점 컬렉션(vertex collection)은 정점 ID에 의해 해시(hash) 파티셔닝된다.
    • 정점 컬렉션 간 빈번한 조인을 지원하기 위해 정점은 각 파티션 내의 로컬 해시 인덱스에 저장된다.
    • 또한 비트마스크(bitmask)는 각 정점의 가시성(visibility)을 저장하여 인덱스 재사용을 촉진하기 위해 부드러운 삭제를 가능하게 한다.
      • 비트마스크(bitmask) : bit(0, 1)에 관련된, 즉 정수의 이진수 표현을 활용한 기법이다. 5
  • 에지 컬렉션(edge collection)은 사용자 정의 파티션 함수에 의해 수평으로 파티셔닝된다.
  • GraphX는 소셜 네트워크 및 웹 그래프와 같은 것에서 통신을 최소화하는 정점 절단(vertex-cut) 파티셔닝을 가능하게 한다.
  • 기본적으로 에지는 입력 컬렉션의 파티셔닝에 따라 파티션에 할당된다.
  • 그러나 GraphX는 mrTriplets와 같은 연산자의 통신 복잡성에 대한 강력한 상한(bound)을 가진 2D 해시 파티셔너를 포함하여 다양한 내장 파티셔닝 기능을 제공한다.
  • 에지 배치의 이러한 유연성은 routing table에 의해 활성화된다.
  • 소스 및 목적지 정점에 의한 에지의 효율적인 조회를 위해 파티션 내의 에지는 압축 희소 로우(CSR, compressed sparse row) 표현을 사용하여 소스 정점 ID로 클러스터링되고 목적지 ID에 의해 해시 인덱싱된다.


  • GraphX는 분산 그래프 파티셔닝에 정점 절단 접근법을 채택했다. 6


Index Reuse

  • GraphX는 스파크의 불변성(immutability)을 계승하므로 모든 그래프 연산자는 기존 컬렉션을 수정하지 않고 논리적으로 새로운 컬렉션을 생성한다.
  • 결과적으로 파생된 정점과 에지 컬렉션은 종종 인덱스를 공유하여 메모리 오버헤드를 줄이고 로컬 그래프 작업을 가속화할 수 있다.
    • 예를 들어, 정점의 해시 인덱스는 빠른 집계를 가능하게 하고, 결과 집계는 원래 정점과 인덱스를 공유한다.


  • GraphX 연산자는 인덱스 재사용을 극대화하려고 한다.
  • 그래프 구조를 수정하지 않는 연산자(ex. mapV)는 자동으로 인덱스를 보존한다.
  • 그래프 구조(ex. subgraph)를 제한하는 작업에 인덱스를 재사용하기 위해 GraphX는 비트마스크를 사용하여 제한된 뷰(view)를 구성한다.
  • 인덱스 재사용이 효율성 저하로 이어질 수 있는 경우(ex. 그래프가 고도로 필터링된 경우) GraphX는 reindex 연산자를 사용하여 새 인덱스를 작성한다.



4.2 Implementing the Triplets View

  • 그래프 계산의 핵심 단계는 소스 및 목적지 정점 속성과 에지 속성 사이의 3방향 조인으로 구성된 트리플렛(triplets) 뷰를 구성하고 유지하는 것이다.


Vertex Mirroring

  • 정점 및 에지 속성 컬렉션은 독립적으로 분할되기 때문에 조인에는 데이터 이동이 필요하다.
  • GraphX는 네트워크를 통해 정점 속성을 에지로 전달하여 3방향 조인을 수행하며, 따라서 에지 파티션을 조인 사이트로 설정한다.
  • 이 접근법은 두 가지 이유로 통신을 상당히 감소시킨다.
    1. 실제 그래프는 일반적으로 정점보다 훨씬 많은 에지를 갖는다.
    2. 하나의 정점은 동일한 파티션에 많은 에지를 가질 수 있어 정점 속성의 실실적인 재사용을 가능하게 한다.


Multicast Join

  • 모든 정점이 각 에지 파티션으로 전송되는 브로드캐스트 조인(broadcast join)은 에지 파티션에서 조인이 발생하도록 보장하지만, 대부분의 파티션에서 조인을 완료하기 위해 정점의 서브셋(subset)만 필요하므로 여전히 비효율적일 수 있다.
  • 따라서, GraphX는 각 정점 속성이 인접한 에지를 포함하는 에지 파티션에만 전송되는 멀티캐스트 조인을 도입한다.
    • 각 정점에 대해 GraphX는 인접한 에지가 있는 에지 파티션 집합을 유지한다.
    • 이 조인 사이트 정보는 정점 컬렉션을 가지고 파티셔닝된 라우팅 테이블에 저장된다.
    • 라우팅 테이블(routing table)은 에지 수집과 연관되어 있으며, 트리플렛 뷰의 첫 번째 인스턴스화 시 lazy하게 구성된다.


Partial Materialization

  • 정점 복제(replication)은 정점 속성이 변경될 때 수행되지만, 중복(duplication)을 피하기 위해 에지 파티션의 로컬 조인은 구체화되지 않은(unmaterialized) 상태로 남아 있다.
  • 대신 미러링된 정점 속성은 각 에지 파티션의 해시 맵에 저장되고 트리플렛 구성 시 참조된다.


Incremental View Maintenance

  • 반복 그래프 알고리즘은 종종 각 반복에서 정점 속성의 서브셋만 수정한다.
  • 따라서 변경되지 않은 데이터의 불필요한 이동을 방지하기 위해 트리플렛 뷰에 증분 보기 유지(incremental view maintenance)를 적용한다.
  • 각 그래프 작업 후, 트리플렛 뷰가 마지막으로 구성된 이후 변경된 정점 속성을 추적한다.
  • 트리플렛 뷰에 다음에 액세스하면 변경된 정점만 에지 파티션 결합 사이트로 재라우팅되고, 변경되지 않은 정점의 로컬 미러링 값이 재사용된다.
    • 이 기능은 그래프 연산자에 의해 자동으로 관리된다.

image



4.3 Optimizations to mrTriplets

  • GraphX는 mrTriplets 연산자에 의해 필터링된 인덱스 스캔(filtered index scanning)자동 조인 제거(automatic join elimination)라는 두 가지 추가 쿼리 최적화를 통합한다.


4.3.1 Filtered Index Scanning

  • 애플리케이션은 서브그래프(subgraph) 연산자를 사용하여 그래프를 제한하여 현재 활성 셋을 표현한다.
  • 정점 술어(predicate)는 에지 파티션으로 푸시(push)되며, 여기서 소스 정점 ID의 CSR 인덱스를 사용하여 트리플렛 필터링에 사용할 수 있다.
    • 정점 술어의 선택성(selectivity)을 측정하고 선택성이 0.8 미만일 때 시퀀셜 스캔에서 클러스터 인덱스 스캔으로 전환한다.

image


4.3.2 Automatic Join Elimination

  • 일부 경우, 트리플렛 뷰의 작업은 정점 속성 중 하나만 액세스하거나 전혀 액세스하지 못할 수 있다.
  • GraphX는 JVM 바이트 코드 분석기를 사용하여 런타임에 사용자 정의 함수를 검사하고 소스 또는 목적지 정점 속성이 참조되는지 여부를 확인한다.
  • 하나의 속성만 참조되고 트리플렛 뷰가 아직 구체화되지 않은(unmaterialized) 경우, GraphX는 3방향 조인에서 2방향 조인까지 트리플렛 뷰를 생성하기 위한 쿼리 계획를 자동으로 다시 작성한다.
  • 정점 속성이 참조되지 않으면 GraphX는 조인을 완전히 제거한다.
    • 이러한 수정은 트리플렛 뷰가 스파크에서 RDD의 느긋한(lazy) 의미론을 따르기 때문에 가능하다.
  • 사용자가 트리프렛 뷰에 액세스하지 않으면 구체화되지 않는다.
  • 따라서 mrTriplets에 대한 호출은 트리플렛 뷰의 관련 부분을 생성하는 데 필요한 조인을 다시 작성할 수 있다.

image





References

댓글남기기