[SPARK] SPARK에서 대량의 스몰 파일이 생성되는 문제 해결
서론
데이터를 적재하는 과정에서 하둡 네임노드 부하 방지 목적으로 HDFS 상의 대량으로 생성된 스몰 파일을 확인하여 삭제 요청을 받고 있다.
단순히 파일을 삭제하여 해결할수도 있겠지만, 스몰파일이란 무엇이며 왜 발생하고 어떠한 이유 때문에 문제가 되는지 알아보도록 하겠다.
0. 스몰 파일이란?
스몰 파일은 블록 크기보다 작은 파일을 의미한다. 하둡 클러스터의 블록 크기와 비교하여 판단되기 때문에, 스몰 파일의 크기 기준은 상대적이다. 기본적으로 하둡 클러스터에서는 블록(Block) 크기가 보통 64MB 또는 128MB로 설정되어 있다. 일부 경우에는 1MB 미만의 파일도 스몰 파일로 간주할 수 있으며, 다른 경우에는 10MB 이상의 파일부터 스몰 파일로 간주할 수도 있다.
1. 스몰 파일을 왜 지워야되나?
HDFS 상에서 파일 생성 시 파일 1개의 사이즈를 기본 블록 사이즈로 구성한다면 하둡 네임노드 부하 방지와 생성된 데이터의 조회 성능을 향상시킬 수 있다.
- 메타데이터 부하: 하둡 클러스터는 많은 수의 파일에 대한 메타데이터를 유지 관리해야 한다. 스몰 파일의 수가 많으면 각 파일에 대한 메타데이터를 관리하는 비용이 증가하고, 스몰 파일의 메타정보를 네임노드의 한정된 메모리에 보관 네임노드(Namenode)의 부하가 증가하게 된다. 이는 전체 클러스터의 성능에 영향을 주게 된다.
- 디스크 공간 낭비: 스몰 파일은 파일 시스템 블록 크기보다 작은 파일이므로 디스크 공간을 효율적으로 사용하지 못한다. 스몰 파일이 많이 쌓이면 디스크 공간을 낭비하게 되어 클러스터 용량을 효과적으로 활용하지 못하게 된다.
- I/O 작업 오버헤드: 스몰 파일은 개별 파일에 대한 I/O 작업이 늘어나게 된다. 파일 입출력 작업은 디스크 액세스와 네트워크 통신을 포함하므로, 스몰 파일이 많은 경우 전체적인 작업 시간이 길어지고 I/O 오버헤드가 발생할 수 있다. 스몰 파일을 읽는 후속 배치 잡은 노드간의 빈번한 통신으로 네트워크 대기시간 증가로 극심한 조회 성능 저하가 발생하게 된다.
- 태스크 병렬성 저하: 스몰 파일은 작은 크기의 파일로 이루어져 있으며, 많은 수의 스몰 파일이 있을 경우 맵(Map) 및 리듀스(Reduce) 태스크의 병렬성이 저하될 수 있으며, 이는 작업의 처리 속도를 저하시킬 수 있게된다.
2. 스몰 파일 병합 방법
스파크에서는 하이브와 같이 개발자 관점에서 합리적인 크기의 파일로 병합하는 기능이 없기 때문에, 휴리스틱하게 데이터 특징 및 환경에 따라서 아래의 방식 중 선택하여 개선 필요하다. (예시 - 전체 결과 건수 기반으로 계산 : 디스크에 있는 단일 레코드 크기 * 전체건수를 고려하여 전체 사이즈를 결정)
2-1. HIVE
-- TEZ 작업일 때 병합 수행
set hive.merge.tezfiles=true;
-- MR 작업일 때 병합 수행
set hive.merge.mapredfiles=true;
-- 리듀스 없이 매퍼 단독 작업일 때 파일 병합 수행
set hive.merge.mapfiles=true;
-- 병합된 파일을 묶을 때 기준 256MB
set hive.merge.size.per.task=268435456;
-- 출력 파일들의 평균 크기가 hive.merge.smallfiles.avgsize 보다 작으면 병합 (128MB)
set hive.merge.smallfiles.avgsize=134217728;
2-2. SPARK
Output Partition
관련 설정 : df.repartition(cnt), df.coalesce(cnt)
Output Partition은 파일을 저장할 때 생성하는 Partition입니다. 이 Partition의 수가 HDFS 상의 마지막 경로의 파일 수를 지정합니다. (참고 : Spark Shuffle Partition과 최적화)
1. 파일 병합(Merge) : coalesce 를 사용
- 예 : n개의 병렬 처리가 있었지만, 최종 1개의 파일을 쓴다고 할 때
# 스몰 파일 생성
DF.write.partitionBy('part_hour').mode('append').parquet(save_PATH + 'DF')
# 1개의 파일로 스몰 파일 병합
DF.coalesce(1).write.partitionBy('part_hour').mode('append').parquet(save_PATH + 'DF')
2. 출력 파일 크기 조정 : repartition를 사용
- 데이터프레임이나 RDD의 파티션 수를 변경하여 데이터의 재분배를 수행하는 메서드이며, 이 과정에서 셔플이 발생한다.
- 특정 컬럼을 기준으로 재분할하는 방식으로 Spark가 동일한 키를 가진 레코드를 동일한 파티션에 강제로 적용 시 유용하다.
- 고유한 값을 가진 컬럼을 생성하여 각 파티션을 동등하게 나눈 다음 해당 컬럼에서 다시 파티션하면, 각 작업이 단일 파티션을 로드하고 각 작업이 동일한 양의 데이터를 갖도록 하는 방식이다. 따라서 이 작업은 항상 네트워크를 통해 모든 데이터를 기반으로 셔플링을 수행한다.
파티션 재분배(repartition)와 병합(coalesce) - 스파크 완벽 가이드 책
파티션 재분배 과정은 셔플을 수반합니다. 하지만 셔플 작업은 클러스터 전체에 데이터가 균등하게 분배하므로 잡의 전체 실행 단계를 최적화할 수 있습니다.일반적으로 가능한 적은 양의 데이터를 셔플하는 것이 좋습니다. 따라서 셔플 대신 동일 노드에서 파티션을 합치는 coalesce 메서드를 실행하여 데이터프레임이나 RDD의 전체 파티션 수를 먼저 줄이는 것을 추천 드립니다. 이보다 느린 repartition 메서드는 부하를 분산하기 위하여 네트워크 기반에서 데이터를 셔플링합니다.
3. 아카이브(Archive) 파일 사용
데이터를 아카이브 파일로 변환 압축하여 저장하기
data.write.format("hadoop-archive").option("path", "output.har").save()
Shuffle Partition
Spark 성능에 가장 크게 영향을 미치는 Partition으로, Join, groupBy 등의 연산을 수행할 때 Shuffle Partition이 쓰입니다.
설정값은 spark.sql.shuffle.partitions이고, 이 설정값에 따라 Join, groupBy 수행 시 Partition의 수(또는 Task의 수)가 결정됩니다. (참고 : Spark Shuffle Partition과 최적화)
1. 스파크 SQL 상에서 분석 결과를 INSERT 수행 시 최종 생성될 파일 개수를 결정
- 셔플 작업 시 생성되는 중간 데이터의 파티션 수를 지정하는 파라미터.
- 예: SparkSession.config("spark.sql.shuffle.partitions", 1);
- spark.sql.shuffle.partitions 값은 join 또는 repartition 시에 파티션 수를 명시적으로 지정하지 않으면 기본 값을 사용
- 하나의 셔플 파티션 크기가 100~200MB 정도 사용되도록 spark.sql.shuffle.partitions 수 조절 필요
3. 결론
스몰 파일 병합과 파티셔닝은 파일 시스템의 효율적인 관리와 메타데이터 오버헤드 감소, 하둡 네임노드 부하 방지와 데이터의 조회 성능을 높일 수 있다는 장점이 있다.
어쨌든 namenode 메모리이든, spark.driver.memory와 spark.executor.memory이든 한정된 메모리에서 모델 업데이트를 빠르게 해야 하므로 코드 연산량을 줄이는 것도 중요하지만 자원을 어떻게 활용하는지가 더 중요하다는 깨달음을 얻게 되었다.
모델링을 하면서 항상 메모리가 부족하여 작업이 지연되고 에러발생 및 강제 종 경우가 많았는데, 앞으로는 데이터를 파티셔닝하여 Shuffle Spill을 최소화하는 작업도 추가 해봐야겠다.
>> 더 읽어보고 추가 : 1. Managing Spark Partitions with Coalesce and Repartition
>> 하둡
>>하둡 분산파일 시스템