Programming/Python

Python - AWS Athena 쿼리실행방법 PyAthena vs Boto3

wave35 2024. 9. 3. 23:54

 

python에서 AWS Athena의 query를 활용하는데는 보편적으로 2가지 방법이 있다.

하나는 pyathena 라이브러리의 connect.cursor()를 사용하는 방법이고, 

다른 하나는 Boto3의 start_query_execution API를 사용하는 방법이다.

 

 

pyathena.connect 사용

pyathena는 AWS Athena와의 상호작용을 단순화한 라이브러리로,

SQL 쿼리 실행 및 결과 조회를 간편하게 할 수 있다.

from pyathena import connect
import pandas as pd

# Athena에 연결
conn = connect(
    s3_staging_dir='s3://your-bucket/path/',  # 쿼리 결과를 저장할 S3 버킷 경로
    region_name='your-region',                # AWS 리전
    aws_access_key_id='ABCDE',
    aws_secret_access_key='123AB436'
)

# 쿼리 실행
as_df = False
if as_df: # as_df 변수가 True면 결과 값을 DataFrame으로 받음
    df = pd.read_sql(query, connection)
    return df
else:
    with conn.cursor() as cursor:
        cursor.execute("SELECT * FROM your_table")
        rows = cursor.fetchall()

# 결과 출력
for row in result:
    print(row)

위의 예시는 쿼리문을 실행하고 해당 결과를 데이터프레임 또는 리스트 튜플로 받는다.

 

장점

간편함 :

pyathena는 Python의 표준 DB-API 2.0에 맞춰 구현되어 있어,

SQL 쿼리를 날리고 결과를 받는 작업이 매우 직관적이다.

 

편리한 데이터 처리:

fetchall(), fetchone() 등 표준 데이터베이스 API 메서드를 지원하고,

Pandas와 같은 라이브러리와 연동하기에 용이하다.

 

자동 관리:

connect() 메서드를 통해 세션을 관리할 수 있으며,

쿼리 실행 및 결과 처리의 모든 과정을 자동으로 관리한다.

 

단점

성능 한계:

매우 큰 쿼리 결과를 처리할 때는 메모리 사용량이 증가할 수 있습니다. 

특히 fetchall()로 모든 데이터를 한 번에 가져올 경우 문제가 될 수 있습니다.

 

제한된 제어: 

Boto3 API에 비해 복잡한 쿼리 실행 상태 모니터링이나 고급 기능이 제한적일 수 있습니다.

 

 

boto3의 start_query_execution 사용

boto3는 AWS의 공식 Python SDK로, 

Athena 쿼리 실행과 결과 조회를 보다 세밀하게 제어할 수 있습니다.

import boto3
import time

client = boto3.client(
    'athena', 
    region_name='ap-northeast-2',
    aws_access_key_id='ABCDE',
    aws_secret_access_key='123AB436'
)

# 쿼리 실행 함수
def execute_query(query, async=True, get_result=True):
    client = create_athena_client()
    
    # 쿼리 실행 시작
    response_query_execution_id = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={'Database': 'your_database'},
        ResultConfiguration={'OutputLocation': 's3://your-bucket/path/'}
    )
    query_execution_id = response_query_execution_id['QueryExecutionId']
    
    # 비동기일 때
    if not async: 
        return query_execution_id, True

    # 쿼리 완료까지 대기
    status = 'RUNNING'
    max_attempts = 50
    attempt = 0

    while attempt < max_attempts:
        attempt += 1
        response_get_query_details = client.get_query_execution(
            QueryExecutionId=query_execution_id
        )
        status = response['QueryExecution']['Status']['State']

        if status in ['FAILED', 'CANCELLED']:
            print(f"Query failed or was cancelled: {response}")
            return False, False
        elif status == 'SUCCEEDED':
            result_location = 
              response['QueryExecution']['ResultConfiguration']['OutputLocation']
            if get_result:
                return fetch_query_results(client, query_execution_id)
            return result_location, None
        else:
            time.sleep(5)

    print("Query timed out")
    return False, False

# 쿼리 결과 가져오기 함수
def fetch_query_results(client, query_execution_id):
    response = client.get_query_results(
        QueryExecutionId=query_execution_id
    )
    rows = response['ResultSet']['Rows']

    if len(rows) > 1:
        header = [col['VarCharValue'] for col in rows[0]['Data']]
        result = [
            dict(zip(header, [col['VarCharValue'] for col in row['Data']]))
            for row in rows[1:]
        ]
        return response['ResultSet']['Rows'][0], result

    return response['ResultSet']['Rows'][0], None

# 사용 예제
query = 'SELECT * FROM your_table'
result_location, result = execute_query(query, async=True, get_result=True)

print("Result Location:", result_location)
print("Result:", result)

위의 예시는 동기/비동기 쿼리실행을 제어하며, 데이터결과를 받는 리턴값을 조정하다.

파이썬 실행 메모리에 따라 다르지만 쿼리 결과 값이 크다면(rows가 많다면),

result 변수에 전부 담기지 않고 생략된다.

 

장점

세부 제어 가능: 

쿼리 실행의 각 단계(실행, 상태 확인, 결과 조회)를 세밀하게 제어할 수 있다. 

특히 비동기식으로 쿼리를 실행할 수 있어, 대규모 쿼리를 처리하는 데 유리하다.

 

범용성: 

Boto3는 AWS SDK이므로, Athena뿐만 아니라 다른 AWS 서비스와도 연동하기에 좋다.

 

큰 데이터 처리 가능: 

매우 큰 결과 집합을 처리할 때 메모리 관리를 직접 할 수 있다. 

쿼리결과를 저장하는 S3에서 필요한 부분만 읽어들일 수 있다.

 

에러 처리: 

쿼리 실행 상태를 수동으로 확인하고, 

실패 또는 취소된 쿼리에 대해 적절한 에러 처리가 가능합니다.

 

단점

복잡함: 

쿼리 실행과 상태 확인, 결과 처리 등이 명시적으로 나누어져 있어,

코드가 상대적으로 복잡해진다.

 

불편한 데이터 처리: 

쿼리 결과를 직접 S3에서 읽어오고 파싱해야 하므로, 추가적인 작업이 필요할 수 있다.

 

 

요약

일반적인 SQL 작업 및 간단한 쿼리 처리를 위해서는

pyathena의 connect.cursor()를 사용하는 것이 더 편리하고 권장된다.

 

하지만 조금이라도 데이터가 커지면 대규모 데이터 처리, 비동기 작업이 가능한

Boto3의 start_query_execution API를 사용하는 것이 더 적합하다.