Apache Spark의 Accumulator는 분산 환경에서 변수를 안전하게 업데이트할 수 있도록 설계된 공유 변수이다. 주로 카운터 및 합계와 같은 집계 작업을 수행하는 데 사용되며, 트랜스포메이션 연산 내에서 사용될 경우 중복 업데이트가 발생할 수 있으므로 주의해야 한다.
개요
Accumulator는 Spark의 분산 연산에서 값이 안전하게 증가하거나 변경될 수 있도록 지원하는 특별한 변수이다. 각 실행 노드(Executor)에서 로컬 값을 업데이트할 수 있지만, 최종적으로 드라이버에서만 읽을 수 있다. 즉, 실행 노드에서의 업데이트는 오직 누적만 가능하며, 읽기는 불가능하다.
특징
- 누적 연산만 가능 (예: 합계, 카운트)
- 드라이버에서만 최종 값 확인 가능
- 트랜스포메이션 내에서 사용할 경우 중복 업데이트 발생 가능
- Spark UI에서 Accumulator 값을 확인할 수 있음
사용 방법
Spark에서 Accumulator를 사용하는 기본적인 방법은 다음과 같다.
Accumulator 생성
Accumulator는 SparkContext를 사용하여 생성할 수 있다.
val spark = SparkSession.builder.appName("AccumulatorExample").getOrCreate()
val sc = spark.sparkContext
// 정수형 Accumulator 생성
val acc = sc.longAccumulator("My Accumulator")
Accumulator 값 업데이트
각 노드에서 실행되는 코드에서 Accumulator 값을 증가시킬 수 있다.
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd.foreach(x => acc.add(x))
// 최종 Accumulator 값 확인
println(s"Accumulator Value: ${acc.value}") // 결과: 15
트랜스포메이션 내에서의 문제점
트랜스포메이션(`map`, `filter` 등) 내에서 Accumulator를 사용하면 중복 업데이트가 발생할 수 있다. 예를 들어, `map` 연산은 Lazy Evaluation(지연 실행)되므로 여러 번 실행될 가능성이 있다.
val rdd2 = rdd.map(x => {
acc.add(x) // 이 코드가 여러 번 실행될 수 있음
x * 2
})
println(acc.value) // 예상한 값보다 클 수 있음 (중복 실행 가능성)
위 코드에서 `map` 연산이 여러 번 실행될 경우, Accumulator 값이 중복 증가할 수 있다. 따라서 Accumulator는 `foreach`와 같은 액션에서 사용하는 것이 안전하다.
Accumulator 유형
Spark는 여러 유형의 Accumulator를 지원한다.
- `LongAccumulator` – 정수 값을 누적
- `DoubleAccumulator` – 실수 값을 누적
- 사용자 정의 Accumulator – 복잡한 데이터 구조를 저장할 수 있음
사용자 정의 Accumulator는 `AccumulatorV2` 클래스를 확장하여 구현할 수 있다.
Accumulator 사용 시 주의점
- **드라이버에서만 읽기 가능:** 실행 노드에서 Accumulator의 값을 읽을 수 없으며, 오직 드라이버에서만 `value` 속성을 통해 확인할 수 있다.
- **트랜스포메이션에서 중복 실행 가능성:** Lazy Evaluation으로 인해 변환 연산 내에서 Accumulator가 여러 번 업데이트될 수 있음.
- **체크포인트 및 캐싱 시 주의:** RDD가 체크포인트되거나 캐싱될 경우, Accumulator 값이 예상과 다를 수 있음.