728x90
1. 테이블 존재 여부 확인 (check_table 함수)
먼저, CSV 파일로 테이블을 생성하기 전에 해당 테이블이 이미 존재하는지 확인하는 과정이 필요합니다. 이를 위해 check_table 함수를 사용합니다.
def check_table(db_connect,table_name):
check_table_query = f"""
SELECT EXISTS (
SELECT FROM pg_tables
WHERE tablename = '{table_name}'
);
"""
with db_connect.cursor() as cur:
cur.execute(check_table_query)
result = cur.fetchone()[0]
return result
- 이 함수는 PostgreSQL에 연결된 db_connect를 이용해, 데이터베이스에 해당 table_name이 존재하는지 확인합니다.
- SQL 쿼리에서 pg_tables 시스템 테이블을 조회하여 테이블의 존재 여부를 반환합니다.
- 결과는 True 또는 False로 반환되며, 이를 바탕으로 테이블 생성 여부를 결정할 수 있습니다.
2. CSV 파일을 기반으로 테이블 생성 (create_table_from_csv 함수)
CSV 파일의 데이터를 기반으로 PostgreSQL 테이블을 생성하는 함수입니다. 각 열의 데이터 타입에 따라 PostgreSQL의 적절한 데이터 타입으로 변환하여 테이블을 만듭니다.
def create_table_from_csv(db_connect, df, table_name):
columns = []
for col, dtype in zip(df.columns, df.dtypes):
if dtype == 'int64':
col_type = 'BIGINT'
elif dtype == 'float64':
col_type = 'FLOAT'
else:
col_type = 'TEXT'
columns.append(f'"{col}" {col_type}')
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{', '.join(columns)}
);
"""
with db_connect.cursor() as cur:
cur.execute(create_table_query)
db_connect.commit()
print(f"Table {table_name} creation is successful.")
- 이 함수는 CSV 파일을 pandas.DataFrame으로 읽은 후, 각 열의 데이터 타입을 확인하여 적절한 PostgreSQL 데이터 타입 (BIGINT, FLOAT, TEXT)으로 매핑합니다.
- CSV 열 이름을 대문자로 저장하기 위해 열 이름을 쌍따옴표로 감쌉니다.
- CREATE TABLE IF NOT EXISTS 쿼리를 통해 테이블을 생성합니다. 테이블이 존재하지 않으면 새로 생성하고, 이미 존재하면 건너뛰게 됩니다.
3. CSV 데이터를 테이블에 삽입 (insert_data_from_csv 함수)
테이블이 생성된 후, CSV 파일의 데이터를 테이블에 삽입하는 함수입니다.
def insert_data_from_csv(db_connect, df, table_name):
columns = [', '.join([f"{col}" for col in df.columns])]
insert_data_query = f"""
INSERT INTO {table_name} ({', '.join(columns)}) VALUES %s
"""
tuples = [tuple(x) for x in df.to_numpy()]
with db_connect.cursor() as cur:
extras.execute_values(cur,insert_data_query,tuples)
db_connect.commit()
print(f"Data insertion into {table_name} is successful.")
- 이 함수는 pandas DataFrame을 PostgreSQL 테이블에 삽입하는 역할을 합니다. df.columns를 이용하여 열 이름을 추출하고, 이를 SQL INSERT INTO 쿼리에 삽입합니다.
- pandas DataFrame의 데이터를 tuple로 변환한 후, psycopg2.extras.execute_values를 사용하여 다중 행 데이터를 한 번에 삽입합니다. 이는 대량의 데이터를 빠르게 삽입하는 데 유용합니다.
- 데이터 삽입 후 commit을 통해 변경 사항을 PostgreSQL에 저장합니다.
4. 전체 코드
my_project/
├── create_table.py
├── insert_data.py
├── main.py
├── sample.csv
from create_table import create_table_from_csv, check_table
from insert_data import insert_data_from_csv
import psycopg2
import pandas as pd
if __name__ == "__main__":
db_connect = psycopg2.connect(
user="postgres_user",
password="postgres_password",
host="postgres_server",
port=5432,
database="postgres_db"
)
df = pd.read_csv("./sample.csv")
if not check_table(db_connect,"sample_data"):
create_table_from_csv(db_connect,df, "sample_data")
insert_data_from_csv(db_connect,df, "sample_data")
db_connect.close()
이 코드를 사용하면 CSV 파일의 데이터를 PostgreSQL에 쉽게 업로드할 수 있습니다. 테이블 생성부터 데이터 삽입까지의 과정을 자동화하여 대량의 데이터를 효율적으로 처리할 수 있습니다.
728x90
'Database' 카테고리의 다른 글
[Database] 인메모리 데이터베이스(In-Memory Database)란? (2) | 2024.10.22 |
---|