아래 글은 EMR Serverless + Step Functions: Python to Spark
마이그레이션 작업을 위해 실험을 했던 과정을 기록한 것으로 잘못된 부분이 있거나, 최적화 되지 못한 부분이 존재할 수 있습니다.
1. 개요
기존에는 물리 서버에 구축된 jupyter hub환경에서 python으로 데이터 처리 후 아래와 같은 구조로 작업을 진행했다.
기존 환경에서는 빠른 개발과 테스트가 가능했지만, 확장성과 유지보수 측면에서 한계가 있었다.
또한, Python에서도 multiprocessing 같은 라이브러리를 활용하면 멀티프로세싱을 통해 병렬 처리를 수행할 수 있다. 하지만, 이러한 방식은 기본적으로 단일 머신의 CPU 리소스에 제한 되기 때문에, 대량의 데이터를 다룰 때는 결국 클러스터 기반의 분산 처리가 필요했고, 이에 따라 Spark로의 마이그레이션을 고려하게 되었다.
처음 Spark로의 전환을 고민할 때는 EMR Serverless를 바로 선택한 것이 아니라, 기존 EMR(EC2 기반)을 먼저 도입하여 Spark 마이그레이션을 진행해보자는 접근 방식을 택했다. 기존 EMR을 활용하면 EC2 인스턴스를 사용하여 YARN 기반의 Spark 클러스터를 직접 운영할 수 있고, 기존 Python 코드를 점진적으로 PySpark로 변환하는 작업을 진행할 수 있었다. 이 방식은 기존의 온프레미스 환경에서 클러스터를 운영하는 것과 비슷한 구조이기 때문에 비교적 익숙한 방식으로 진행할 수 있었다.
그러나 EMR을 운영하면서 몇 가지 문제점이 나타났다.
✅ 첫 번째로, 비용 문제였다. EMR(EC2 기반)에서는 클러스터가 항상 실행되고 있어야 했기 때문에, 작업을 수행하지 않는 유휴 시간에도 인스턴스 비용이 지속적으로 발생했다. 또한, 스팟 인스턴스를 활용하여 비용을 절감할 수는 있었지만, 장기적으로 볼 때 EMR을 계속 운영하는 것은 비용적으로 부담이 되는 구조였다.
✅ 두 번째 문제는 운영 관리의 복잡성이었다. EMR 클러스터는 Spark 설정, YARN 리소스 관리, Auto-Scaling 설정 등 여러 가지 튜닝이 필요했다. 작은 규모에서는 문제가 되지 않았지만, Spark Job을 여러 개 실행하거나 데이터 처리량이 변동될 경우 적절한 리소스 관리를 위해 클러스터를 조정해야 하는 부담이 컸다.
이러한 문제를 해결하기 위해 EMR Serverless로 전환을 결정했다. EMR Serverless는 클러스터를 직접 관리할 필요 없이, Spark Job을 실행할 때 필요한 만큼만 리소스를 할당하고 작업이 끝나면 자동으로 리소스를 해제하는 방식을 제공했다. 덕분에 비용이 사용한 만큼만 과금되며, 관리 부담도 최소화할 수 있었다.
또한, Step Functions을 활용하여 EMR Serverless + Step Functions + Spark로의 전환으로 데이터 처리 워크플로우를 자동화 해보기로 했다.
이번 블로그 시리즈에서는 기존 Python 기반의 데이터 처리 방식을 EMR Serverless 환경으로 마이그레이션하는 과정 중에서 EMR Serverless를 StepFunction에서 실행하고 그 과정에서 고려해야 할 사항 등을 정리해볼 예정이다.
2. EMR Serverless에서 Spark 구조
참고로 내가 spark 작업을 하면서 헷갈렸던 부분인 deploy mode에 대해서 먼저 정리를 하고 가겠다! Spark에서는 deploy-mode를 선택하는 것이 매우 중요한데, 이는 Driver의 실행 위치와 클러스터 자원 할당 방식이 결정되기 때문이다.
현재 로컬 환경에서 Standalone Mode로 Client Mode로 실행하여 디버깅한 뒤, 최종적으로 EMR Serverless에서 Spark Job을 실행하는 방식으로 운영 중인데, emr serverless에서는 당연하게도 Client Mode 지원이 되지 않는다.
- 이유는 Driver를 사용자 머신에서 실행하는 방식(Client Mode)이 서버리스 환경과 맞지 않기 때문.
- spark-submit을 실행할 때 기본적으로 Cluster Mode로 동작하며, Driver가 EMR Serverless 내부에서 실행됨.
즉, EMR Serverless에서는 모든 것이 Worker로 실행되며, Worker 내부에서 Driver와 Executor가 동작하는 구조이다.
따라서 Master Node를 관리할 필요가 없으며 그에 따른 YARN, Kubernetes, Standalone 등 클러스터 매니저 설정을 신경 쓸 필요 없다.
3. EMR Serverless, Step Functions에서 실행하기 위해 필요한 조건 - IAM Role 설정
- State Machine IAM Role
- 목적 : Step Functions 실행 및 EMR Serverless 실행 관리
- 참고 문서:
https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/setting-up.html#setting-up-iam
#Action부분만 기재 (참고용)
"states:StartExecution",
"states:DescribeExecution",
"states:StopExecution",
"states:GetExecutionHistory",
"emr-serverless:CreateApplication",
"emr-serverless:StartApplication",
"emr-serverless:StartJobRun",
"emr-serverless:GetJobRun",
"emr-serverless:StopApplication",
"dynamodb:ImportTable",
"dynamodb:DescribeTable",
"dynamodb:ListTables",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"iam:PassRole"
- Step Functions Execution Role
- 목적 : EMR Serverless에서 Spark Job 실행 및 S3/DynamoDB 접근
#Action부분만 기재 (참고용)
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket",
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
- (선택) stepfuntion에서 dynamo import table을 위한
- + logs:GetLogEvents
- 참고 문서:
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.Requesting.html
4. Step Functions 워크플로우 설계
(1) 기본 템플릿 생성
예시로 emr serverless 작업 실행을 선택해준다.
바로 생성하려고 하면 executionrole이 설정이 안되어있다고 하는데 위에서
EMR Serverless에서 Spark Job 실행 및 S3/DynamoDB 접근을 위해 생성한 role을 기재해준다.
기존에 stepfunction 실행을 위한 role을 만들어놨다면 선택해주고 그게 아니라면 생성도 가능하다.
최종적으로 생성한 stepfunction이다.
(2) 실제 설계한 워크플로우 예시
참고로 step function에서는 InitialCapacity를 명시적으로 설정하지 않으면 기본값이 할당된다.
특히 스토리지는 20GB 이하로 설정 시 에러가 나니 무조건 20GB 이상으로 설정해야한다.
- EMR Serverless의 InitialCapacity 기본값
Component | 기본 vCPU | 기본 Memory | 기본 Disk | Worker |
Driver / Executor | 4 vCPU | 16 GB | 20 GB | 1 |
주요 제한 사항
참고:https://aws.amazon.com/ko/emr/faqs/?nc=sn&loc=5
- CPU(vCPU)
- 각 작업자는 1, 2, 4, 8, 16 vCPU 중 하나만 설정 가능
- 메모리(GB)
- CPU 개수에 따라 최소/최대 메모리 설정 가능
- 1 vCPU → 최소 2GB, 최대 8GB (1GB 단위 증분)
- 16 vCPU → 최소 32GB, 최대 120GB (8GB 단위 증분)
- 디스크(Disk, GB)
- 최소 20GB, 최대 200GB
- 기본적으로 20GB는 무료이며, 20GB 초과 부분만 비용 부과됨
(3) Step Functions JSON 정의 예시 (job run 부분만)
아래 부분에서 기본 템플릿과 다른 점은 job 실패 시 바로 stop application으로 가도록 설정한 부분이다.
실제 실행할 spark job 스크립트를 s3에 업로드 후 해당 경로를 EntryPoint에 입력해준다.
"EntryPoint": "s3://S3-BUCKET/scripts/my_spark_job.py"
뿐만 아니라 중간에 잡이 실패할 경우 디버깅이 필요하므로 log를 기록할 경로도 기재해준다.
S3에 로그를 기록하는 기본 옵션을 택했다.
"LogUri": "s3://S3-BUCKET/logs/"
(참고로 Spark 작업에서 job의 에러 로그 및 print() 되는 출력은 항상 Driver에서 확인해야하므로 SPARK_DRIVER/stdout.gz 에서 확인하면 된다.)
"B2B_GP_Count_Processing": {
"Type": "Task",
"Resource": "arn:aws:states:::emr-serverless:startJobRun.sync",
"Arguments": {
"ApplicationId": "{% $ApplicationId %}",
"ExecutionRoleArn": "arn:aws:iam::YOUR_ACCOUNT_ID:role/emr_serverless_execution_role": {
"SparkSubmit": {
"EntryPoint": "s3://S3-BUCKET/scripts/my_spark_job.py"
}
},
"ConfigurationOverrides": {
"MonitoringConfiguration": {
"S3MonitoringConfiguration": {
"LogUri": "s3://S3-BUCKET/logs/"
}
}
}
},
"Assign": {
"FirstJobId": "{% $states.result.JobRunId %}"
},
"Next": "ImportTable",
"Catch": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"Next": "Stop Application",
"Comment": "Stop application on any occasion"
}
]
},
5. 로컬에서 Step Functions 자동 배포 및 실행
조금 더 정리를 했지만 로컬에서 stepfunction을 바로바로 수정 실행 할 수 있게 deploy.sh 라는
스크립트를 만들어 조금 더 빠르게 반영될 수 있게 하였다.
#!/bin/bash
# ✅ S3 버킷 및 상태 머신 JSON 파일 설정
BUCKET_NAME="your-s3-bucket/scripts"
JSON_FILE="your_state_machine.asl.json"
STATE_MACHINE_ARN="arn:aws:states:your-region:your-account-id:stateMachine:your-state-machine"
# ✅ S3 Key Prefix 및 DynamoDB 테이블명 설정
end_ym=$(date -d "$(date +%Y-%m-01) -2 month" +%Y%m) # 현재 날짜 기준 2달 전 연월 (YYYYMM)
mmdd=$(date +%m%d) # 현재 날짜 (MMDD)
NEW_S3_KEY_PREFIX="dynamo_import_path/gzip_${end_ym}"
NEW_TABLE_NAME="dynamo_table_${end_ym}v${mmdd}"
# ✅ 인자값 확인 (PySpark 작업이 2개 실행되므로 스크립트 2개가 필요함)
if [ "$#" -ne 2 ]; then
echo "Usage: ./deploy.sh <PYSPARK_SCRIPT_1> <PYSPARK_SCRIPT_2>"
exit 1
fi
PYSPARK_SCRIPT_1=$1
PYSPARK_SCRIPT_2=$2
# ✅ PySpark 스크립트 S3 업로드
echo "Uploading PySpark scripts to S3..."
aws s3 cp "/path/to/your/scripts/$PYSPARK_SCRIPT_1" "s3://$BUCKET_NAME/"
aws s3 cp "/path/to/your/scripts/$PYSPARK_SCRIPT_2" "s3://$BUCKET_NAME/"
# ✅ 상태 머신 JSON 업데이트
echo "Updating state machine JSON file..."
# 1️⃣ 첫 번째 PySpark 스크립트 (`B2B_GP_Processing` 단계 업데이트)
sed -i '/"B2B_GP_Processing"/,/^ *}/ {
s|"EntryPoint": *"s3://[^"]*\.py"|\"EntryPoint\": \"s3://'"$BUCKET_NAME"'/'"$PYSPARK_SCRIPT_1"'\"|
}' "$JSON_FILE"
# 2️⃣ 두 번째 PySpark 스크립트 (`B2B_GP_Save_DynamoDBjson` 단계 업데이트)
sed -i '/"B2B_GP_Save_DynamoDBjson"/,/^ *}/ {
s|"EntryPoint": *"s3://[^"]*\.py"|\"EntryPoint\": \"s3://'"$BUCKET_NAME"'/'"$PYSPARK_SCRIPT_2"'\"|
}' "$JSON_FILE"
# 3️⃣ DynamoDB 테이블 및 S3 경로 업데이트
sed -i "s|\"S3KeyPrefix\": \".*\"|\"S3KeyPrefix\": \"${NEW_S3_KEY_PREFIX}\"|" "$JSON_FILE"
sed -i "s|\"TableName\": \".*\"|\"TableName\": \"${NEW_TABLE_NAME}\"|" "$JSON_FILE"
# ✅ Step Functions 상태 머신 업데이트
echo "Updating Step Functions state machine..."
aws stepfunctions update-state-machine \
--state-machine-arn "$STATE_MACHINE_ARN" \
--definition file://"$JSON_FILE"
# ✅ Step Function 실행
echo "Starting Step Function execution..."
aws stepfunctions start-execution --state-machine-arn "$STATE_MACHINE_ARN"
echo "Deployment complete!"
이후에는 step function으로 설계했던 것을
airflow를 활용하여 워크플로우를 설계하고 스케쥴링 될 수 있게 제작을 진행하여 step function과 위 deploy.sh는 더이상 사용하지는 않지만 기록용으로 기재한다.
6. 결론
EMR Serverless와 Step Functions을 활용해서 PySpark 데이터 파이프라인을 자동화하는 과정을 정리해봤다.
EMR Serverless를 활용하면 클러스터를 직접 관리할 필요 없이 Spark Job을 실행할 수 있고, Step Functions을 통해 데이터 파이프라인을 자동화할 수 있다. 이를 위해 IAM Role과 iam:PassRole 설정이 필수이며, 상태 머신(JSON)을 활용해 실행할 PySpark 스크립트를 동적으로 변경할 수 있다. 배포를 자동화하기 위해 deploy.sh 스크립트를 사용하면 PySpark 스크립트를 S3에 업로드하고 Step Functions을 업데이트 및 실행할 수 있다. 결과적으로, 반복적인 작업을 자동화하고 운영 부담을 줄일 수 있으며, 확장성과 유지보수성이 향상된다.
다음에는 Spark 성능 최적화 및 dynamoDB JSON 관련한 내용을 다뤄보도록 하겠다.
1. 참고 : https://github.com/aws-samples/emr-serverless-samples
'STUDY > Data Engineering' 카테고리의 다른 글
04장 부호화와 발전 (0) | 2024.03.31 |
---|---|
03장 저장소와 검색DB란 (0) | 2024.03.03 |
02장 데이터 모델과 질의언어 (1) | 2024.02.18 |
[CKA] 2024 시험 합격 후기 (5) | 2024.02.04 |
[CKA] 실습/개념 정리 (0) | 2024.01.20 |