Database

[PostgreSQL] Python으로 CSV 파일을 PostgreSQL에 테이블로 업로드하기

JustJunsu 2024. 9. 26. 17:51
728x90

1. 테이블 존재 여부 확인 (check_table 함수)

먼저, CSV 파일로 테이블을 생성하기 전에 해당 테이블이 이미 존재하는지 확인하는 과정이 필요합니다. 이를 위해 check_table 함수를 사용합니다.

def check_table(db_connect,table_name):

    check_table_query = f"""
    SELECT EXISTS (
        SELECT FROM pg_tables
        WHERE tablename = '{table_name}'
    );
    """
    with db_connect.cursor() as cur:
        cur.execute(check_table_query)
        result = cur.fetchone()[0]
    return result

 

  • 이 함수는 PostgreSQL에 연결된 db_connect를 이용해, 데이터베이스에 해당 table_name이 존재하는지 확인합니다.
  • SQL 쿼리에서 pg_tables 시스템 테이블을 조회하여 테이블의 존재 여부를 반환합니다.
  • 결과는 True 또는 False로 반환되며, 이를 바탕으로 테이블 생성 여부를 결정할 수 있습니다.

 

2. CSV 파일을 기반으로 테이블 생성 (create_table_from_csv 함수)

CSV 파일의 데이터를 기반으로 PostgreSQL 테이블을 생성하는 함수입니다. 각 열의 데이터 타입에 따라 PostgreSQL의 적절한 데이터 타입으로 변환하여 테이블을 만듭니다.

 

def create_table_from_csv(db_connect, df, table_name):

    columns = []
    for col, dtype in zip(df.columns, df.dtypes):
        if dtype == 'int64':
            col_type = 'BIGINT'
        elif dtype == 'float64':
            col_type = 'FLOAT'
        else:
            col_type = 'TEXT'
        columns.append(f'"{col}" {col_type}')

    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        {', '.join(columns)}
    );
    """

    with db_connect.cursor() as cur:
        cur.execute(create_table_query)
        db_connect.commit()
        print(f"Table {table_name} creation is successful.")

 

  • 이 함수는 CSV 파일을 pandas.DataFrame으로 읽은 후, 각 열의 데이터 타입을 확인하여 적절한 PostgreSQL 데이터 타입 (BIGINT, FLOAT, TEXT)으로 매핑합니다.
  • CSV 열 이름을 대문자로 저장하기 위해 열 이름을 쌍따옴표로 감쌉니다.
  • CREATE TABLE IF NOT EXISTS 쿼리를 통해 테이블을 생성합니다. 테이블이 존재하지 않으면 새로 생성하고, 이미 존재하면 건너뛰게 됩니다.

3. CSV 데이터를 테이블에 삽입 (insert_data_from_csv 함수)

테이블이 생성된 후, CSV 파일의 데이터를 테이블에 삽입하는 함수입니다.

 

def insert_data_from_csv(db_connect, df, table_name):

    columns = [', '.join([f"{col}" for col in df.columns])]

    insert_data_query = f"""
    INSERT INTO {table_name} ({', '.join(columns)}) VALUES %s
    """
    tuples = [tuple(x) for x in df.to_numpy()]

    with db_connect.cursor() as cur:
        extras.execute_values(cur,insert_data_query,tuples)
        db_connect.commit()
        print(f"Data insertion into {table_name} is successful.")
  • 이 함수는 pandas DataFrame을 PostgreSQL 테이블에 삽입하는 역할을 합니다. df.columns를 이용하여 열 이름을 추출하고, 이를 SQL INSERT INTO 쿼리에 삽입합니다.
  • pandas DataFrame의 데이터를 tuple로 변환한 후, psycopg2.extras.execute_values를 사용하여 다중 행 데이터를 한 번에 삽입합니다. 이는 대량의 데이터를 빠르게 삽입하는 데 유용합니다.
  • 데이터 삽입 후 commit을 통해 변경 사항을 PostgreSQL에 저장합니다.

4. 전체 코드

my_project/
├── create_table.py
├── insert_data.py
├── main.py
├── sample.csv
from create_table import create_table_from_csv, check_table
from insert_data import insert_data_from_csv
import psycopg2
import pandas as pd

if __name__ == "__main__":

    db_connect = psycopg2.connect(
        user="postgres_user",
        password="postgres_password",
        host="postgres_server",
        port=5432,
        database="postgres_db"
    )
    df = pd.read_csv("./sample.csv")
    if not check_table(db_connect,"sample_data"):
        create_table_from_csv(db_connect,df, "sample_data")
        insert_data_from_csv(db_connect,df, "sample_data")
    db_connect.close()

 

이 코드를 사용하면 CSV 파일의 데이터를 PostgreSQL에 쉽게 업로드할 수 있습니다. 테이블 생성부터 데이터 삽입까지의 과정을 자동화하여 대량의 데이터를 효율적으로 처리할 수 있습니다.

728x90

'Database' 카테고리의 다른 글

[Database] 인메모리 데이터베이스(In-Memory Database)란?  (2) 2024.10.22