개발/Spark

[Spark DEBUG] partiton path load 오류

wonpick 2023. 2. 26. 16:04

문제 상황

  • 참조하는 테이블은 컬럼 기반의 데이터 저장 방식인 parquet 포맷. max partition이 25개로 설정되어 있음. 
    path를 확인하는 시점과 데이터프레임을 load하는 시점에서의 리스트가 달라 path가 없다는 오류 발생. 
    -> 25 제한에 따라 가장 오래 된 파티션이 삭제 됨으로 해당 파티션이 없다는 오류 발생

시도한 것들

http://www.corejavaguru.com/bigdata/hadoop/hdfs-file-read

  • 기존 방법 (기존 코드를 그대로 따라 갔었는데, 해당 방법은 전체 메타데이터를 로드하는 방법임을 깨달음 stackoverflow
    • 배열로가져오고 맥스값 가져오기. collect가 한번 로드되면 목록 외로 데이터 전체가 로드 되는 줄 알았는데 아니었음.
      • max_updatetime = spark.sql("""select max(updatetime) from db.table""").collect()[0][0]
    • 일단 원본 테이블만 불러와서 min/max값을 확인하려고 했는데 처음부터 refresh error가 남.
    • refresh sql문과 query 자체에 max값을 가져오도록 load하기
      • -> 테이블 메타데이터에서 직접 파티션 열 값을 추출 하므로 전체 파티션 로드 문제 발생
      • where 파티션 = (select max(파티션) 
    • 동적표현식 사용하지 않는 해결방안?
      • 상수형 문자열로 필터링을 한다. 그리고 SQL 쿼리 실행 순서 과정을 고려하여 where구문 순서를 배치
        FROM - WHERE GROUP BY - HAVING - SELECT - ORDER BY
      • 2시간 이내의 파티션 중 Max(최신) 파티션 로드
        where 파티션 > cast(DATE_FORMAT(current_timestamp()- interval 2 hour,'yyyyMMddHHmm') as string)\ and 파티션 = (select max(파티션) from 디비.테이블명) \

  • temporary table로 올려놓고 사용하기
    • -> 메모리 문제 
    • CreateOrReplaceTempView
      • DataFrame을 DB처럼 사용하는 방법(createOrReplaceTempView 함수로 temporary view만들기)
      • 캐싱해서 쓸거임 -> persist 실행시, job 이 끝나도 메모리에 데이터가 상주해있음 따라서 unpersist 로 해제해야함.
    • Global Temporary View
  • 데이터를 메모리에 올리지 않은 상황에서 HDFS 경로에서 최대값 찾은 뒤 로드 
    • 최신 파티션 1개만 찾은 뒤 로드하기 
hdfs_path = 'hdfs://~~'
file_list = [str(f.getPath()) for f in fs.get(conf).listStatus(Path(hdfs_path))]
max_partition = max(file_list).split(f'updateTime=')[-1]

df = spark.sql("select col from db.table where updateTime ='%s'"%(max_partition))

 


참고) 파티션 프루닝 권장사항

파티션 프루닝이란? 하드파싱이나 실행 시점에서 SQL 조건절을 분석하여 읽지 않아도 되는 파티션 세그먼트를 엑세스 대상에서 제외시키는 기능. -> 파티션 나눠서 sql 수행시 데이터 없는 파티션 안읽는 것. (있는지 없는지에 대한 구분은 조건절 분석으로 파악함) 

 

상수 필터 표현식 사용

쿼리에서 검색되는 파티션을 제한하려면 필터에서 상수 표현식을 사용해야 한다.
쿼리 필터에서 동적 표현식을 사용할 경우, BigQuery는 모든 파티션을 불러오므로 지양해야된다.

예를 들어 다음 쿼리에서는 필터에 상수 표현식이 포함되어 있으므로 파티션을 프루닝하게 된다.

SELECT
  t1.name,
  t2.category
FROM
  table1 AS t1
INNER JOIN
  table2 AS t2
ON t1.id_field = t2.field2
WHERE
  t1.ts = CURRENT_TIMESTAMP()

그러나  WHERE t1.ts = (SELECT timestamp from table where key = 2) 
필터가 상수 표현식이 아니므로 파티션을 프루닝하지 않게되고, timestamp  key 필드의 동적 값에 따라 달라지게 된다.

따라서 첫번째 쿼리대로 사용을 해야함.

SELECT
  t1.name,
  t2.category
FROM
  table1 AS t1
INNER JOIN
  table2 AS t2
ON
  t1.id_field = t2.field2
WHERE
  t1.ts = (SELECT timestamp from table3 where key = 2)

기존에 있는 모든 코드들이 다 collect로 해당 테이블을 불러오고 있었는데 배치잡이 데일리로 진행되어서 그런 것인지 hourly로 진행되는 배치잡과 계속 빌드 시간이 겹쳐 우리 잡에서만 에러가 나고 있다. 마지막 방법이 진짜 해결 방안인거 같은데 ㅠㅠ 부디 에러가 나지 않길.. 

- [Apache Spark] Partition Pruning과 Predicate Pushdown

- 왜 파티션은 문자열일까?

- Automatically Updating a Hive View Daily

- spark temporary view에 대해

- 하둡 기초 정리 (파일 시스템)

- 스파크 최신 파티션을 찾아가는 방법