Apache Spark의 Pair RDD는 (Key, Value) 형식의 데이터를 저장하는 RDD로, 키를 기반으로 그룹화, 조인, 집계 등의 연산을 수행할 수 있도록 제공하는 RDD 유형이다.
개요
Pair RDD는 일반 RDD와 달리 키-값 쌍을 처리하는 데 최적화된 연산을 제공한다. 예를 들어, 같은 키를 가진 데이터를 그룹화하거나, 키를 기준으로 조인하는 작업을 쉽게 수행할 수 있다.
Pair RDD는 일반적으로 `map` 또는 `flatMap` 등의 변환 연산을 통해 생성된다.
Pair RDD 생성
Pair RDD를 생성하는 일반적인 방법은 `(Key, Value)` 형태의 튜플 데이터를 포함하는 RDD를 만드는 것이다.
val spark = SparkSession.builder.appName("PairRDDExample").getOrCreate()
val sc = spark.sparkContext
// (Key, Value) 튜플을 포함하는 RDD 생성
val data = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5))
val pairRDD = sc.parallelize(data)
Pair RDD 연산
Pair RDD는 일반 RDD에서 제공하는 연산 외에도 키를 기준으로 데이터를 처리하는 추가 연산을 제공한다.
reduceByKey
같은 키를 가진 값들을 합산하는 연산이다.
val reducedRDD = pairRDD.reduceByKey(_ + _)
reducedRDD.collect().foreach(println)
// 출력 결과
// (a,4)
// (b,6)
// (c,5)
groupByKey
같은 키를 가진 값들을 그룹화하는 연산이다.
val groupedRDD = pairRDD.groupByKey()
groupedRDD.collect().foreach(println)
// 출력 결과
// (a,CompactBuffer(1, 3))
// (b,CompactBuffer(2, 4))
// (c,CompactBuffer(5))
mapValues
키를 유지하면서 값에 대한 변환을 적용하는 연산이다.
val mappedRDD = pairRDD.mapValues(_ * 10)
mappedRDD.collect().foreach(println)
// 출력 결과
// (a,10)
// (b,20)
// (a,30)
// (b,40)
// (c,50)
sortByKey
키를 기준으로 데이터를 정렬하는 연산이다.
val sortedRDD = pairRDD.sortByKey()
sortedRDD.collect().foreach(println)
// 출력 결과
// (a,1)
// (a,3)
// (b,2)
// (b,4)
// (c,5)
join
두 개의 Pair RDD를 키를 기준으로 조인하는 연산이다.
val otherData = Array(("a", 100), ("b", 200), ("c", 300))
val otherRDD = sc.parallelize(otherData)
val joinedRDD = pairRDD.join(otherRDD)
joinedRDD.collect().foreach(println)
// 출력 결과
// (a,(1,100))
// (a,(3,100))
// (b,(2,200))
// (b,(4,200))
// (c,(5,300))
==Pair RDD 사용 시 주의점==*groupByKey 대신 reduceByKey 사용 추천: `groupByKey`는 데이터 셔플링이 많이 발생하여 성능이 저하될 수 있으므로 `reduceByKey`를 사용하는 것이 효율적이다.
- 키의 개수가 너무 많으면 성능 저하 가능: 많은 수의 키를 가진 Pair RDD는 실행 노드 간 데이터 이동이 많아 성능 문제가 발생할 수 있다.
- 메모리 사용량 관리 필요: 큰 데이터를 처리할 경우 메모리 문제를 방지하기 위해 `persist()` 또는 `checkpoint()`를 활용할 수 있다.