AWS Athena(아테나)와 Python(파이썬)을 활용한 데이터 Query 및 Pandas(판다스) DataFrame(df)으로의 변환

준비사항

  • AWS 계정
  • boto3 (Python의 AWS SDK)
  • pandas

1. Athena 설정

먼저, boto3를 사용하여 Athena 클라이언트를 설정한다. 이를 위해 AWS 액세스 키 및 비밀 키가 필요하다.

주의: AWS 액세스 키와 비밀 키는 민감한 정보이므로 코드 내에 직접 입력하지 말고, 환경 변수나 AWS Configuration을 사용하여 관리하는 것이 좋다.

1
2
3
4
5
6
import boto3

athena = boto3.client('athena',
                      aws_access_key_id=ACCESS_KEY, 
                      aws_secret_access_key=SECRET_KEY, 
                      region_name="ap-northeast-2")

2. Athena에서 쿼리 실행

Athena는 쿼리의 결과를 S3 버킷에 저장한다. 따라서 결과를 저장할 S3 경로를 지정한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
query = "SELECT * FROM table_name"
output_location = 's3://your-output-bucket/path/'

response = athena.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': 'database_name'
    },
    ResultConfiguration={
        'OutputLocation': output_location,
    }
)

query_execution_id = response['QueryExecutionId']

3. 쿼리 실행 상태 체크

실행 중인 쿼리의 상태를 주기적으로 확인하여 결과가 준비되었는지 알 수 있다.

1
2
3
4
5
6
7
import time

while True:
    response = athena.get_query_execution(QueryExecutionId=query_execution_id)
    if response['QueryExecution']['Status']['State'] in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
        break
    time.sleep(5)

4. 결과를 Pandas DataFrame으로 변환

쿼리가 성공적으로 완료되면 S3에서 결과를 다운로드하여 Pandas DataFrame으로 읽는다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pandas as pd

result_location = output_location + query_execution_id + '.csv'

s3 = boto3.client('s3', 
                  aws_access_key_id=ACCESS_KEY, 
                  aws_secret_access_key=SECRET_KEY, 
                  region_name="ap-northeast-2")

result_file = 'result.csv'
bucket, key = result_location.replace("s3://", "").split("/", 1)
s3.download_file(bucket, key, result_file)

df = pd.read_csv(result_file)
print(df)

결론

Python과 AWS Athena를 조합하면 대규모 데이터셋에 대한 인터랙티브한 쿼리를 수행하고, 그 결과를 편리하게 분석할 수 있다. 위 가이드를 통해 자신만의 데이터 처리 파이프라인을 구축해보길 바란다.

전체 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import boto3
import pandas as pd

# 1. Athena 설정
athena = boto3.client('athena', 
                      aws_access_key_id='ACCESS_KEY',
                      aws_secret_access_key='SECRET_KEY',
                      region_name="ap-northeast-2")

# 2. 쿼리 실행
query = "SELECT * FROM table_name"  # 여기에 원하는 쿼리를 작성
output_location = 's3://output-bucket/path/'  # 여기에 결과를 저장할 S3 경로를 지정
response = athena.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
        'Database': 'database_name'   # 여기에 사용할 데이터베이스 이름을 지정
    },
    ResultConfiguration={
        'OutputLocation': output_location,
    }
)

query_execution_id = response['QueryExecutionId']

# 쿼리가 완료될 때까지 대기 (빠른 쿼리의 경우에만 사용하세요)
import time

while True:
    response = athena.get_query_execution(QueryExecutionId=query_execution_id)
    if response['QueryExecution']['Status']['State'] in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
        break
    time.sleep(5)

# 3. S3에서 결과 가져오기
result_location = output_location + query_execution_id + '.csv'
s3 = boto3.client('s3', 
                  aws_access_key_id='ACCESS_KEY', 
                  aws_secret_access_key='SECRET_KEY', 
                  region_name="ap-northeast-2")

result_file = 'result.csv'
bucket, key = result_location.replace("s3://", "").split("/", 1)
s3.download_file(bucket, key, result_file)

df = pd.read_csv(result_file)
print(df)

함수로 사용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import boto3
import pandas as pd
import time

def fetch_data_from_athena(query, database, output_s3_location, aws_access_key, aws_secret_key, region="ap-northeast-2"):
    # 1. Athena 설정
    athena = boto3.client('athena', 
                          aws_access_key_id=aws_access_key,
                          aws_secret_access_key=aws_secret_key,
                          region_name=region)

    # 2. 쿼리 실행
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
        },
        ResultConfiguration={
            'OutputLocation': output_s3_location,
        }
    )
    
    query_execution_id = response['QueryExecutionId']

    # 쿼리가 완료될 때까지 대기 (빠른 쿼리의 경우에만 사용하세요)
    while True:
        response = athena.get_query_execution(QueryExecutionId=query_execution_id)
        if response['QueryExecution']['Status']['State'] in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        time.sleep(5)

    # 3. S3에서 결과 가져오기
    result_location = output_s3_location + query_execution_id + '.csv'
    s3 = boto3.client('s3', 
                      aws_access_key_id=aws_access_key, 
                      aws_secret_access_key=aws_secret_key, 
                      region_name=region)

    result_file = 'result.csv'
    bucket, key = result_location.replace("s3://", "").split("/", 1)
    s3.download_file(bucket, key, result_file)

    df = pd.read_csv(result_file)
    return df

# Usage:
df = fetch_data_from_athena(query="SELECT * FROM table_name", 
                            database='database_name', 
                            output_s3_location='s3://output-bucket/path/', 
                            aws_access_key='YOUR_ACCESS_KEY', 
                            aws_secret_key='YOUR_SECRET_KEY')
print(df)

0%