ConnectorX
1. 소개
- RDB 조회 결과를 Pandas, PyArrow, Polars 등 Dataframe 계열 객체로 빠르게 로드하는 라이브러리
- SQL query를 실행하고, 결과를 Pandas DataFrame으로 빠르고 메모리 효율적으로 로딩한다.
- 즉, 파이썬에서 RDB 데이터를 빠르게 DataFrame 으로 로딩하는 기능을 제공한다.
2. 사용법
(1) 설치
- 아래와 같이 사용하며,
pandas또는pyarrow와 같이 DataFrame 형태를 다룰 수 있는 라이브러리를 함께 설치하길 권장
1
2
3
4
5
# pip
pip install connectorx
# uv
uv add connectorx
(2) 기본 문법
1
2
3
4
5
6
7
8
9
import connectorx as cx
db_url = f"mysql://{user}:{password}@{host}:{port}/{db}"
query = "SELECT * FROM table"
df = cx.read_sql(
conn = db_url, # 연결 DB URL
query = query
)
(3) 병렬 읽기
- auto_increment id 처럼 숫자형이고, NULL이 없는 column을 파티션 대상으로 지정해 병렬 읽기 수행이 가능하다.
partition_on파라미터 : 병렬처리를 위해 파티션을 나눌 대상 컬럼 이름partition_num파라미터 : 데이터 전체를 몇 개의 파티션으로 나눌 것인지partition_range파라미터 : 파티션 대상 컬럼의 전체 값 범위. (선택적)- 대상 컬럼은 꼭 id 가 아니더라도 생성일시와 같은 연속적이고 분포가 고른 데이터면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
import connectorx as cx
db_url = f"mysql://{user}:{password}@{host}:{port}/{db}"
query = "SELECT * FROM table"
df = cx.read_sql(
conn = db_url, # 연결 DB URL
query = query,
partition_on = "id",
partition_num = 8,
partition_range = (1, 500_000)
)
3. 장단점과 적용 권장 상황
(1) 장점
| 장점 | 설명 |
|---|---|
| 빠른 속도 | • partition 을 지정시 병렬 스레드로 데이터를 읽어 빠른 조회 |
| 메모리 효율 | • Rust 기반 + zero-copy 원칙으로 더 낮은 메모리 사용량 |
| 여러 DB를 지원함 | • MySQL, MariaDB, PostgreSQL, Oracle, SQLite 등 여러 RDB 지원 • 결과물로는 Pandas, PyArrow, Dask 등 지원 |
| Pandas 와의 연계 | • 결과물이 바로 DataFrame 형태로 나와 사용하기 쉬움 |
(2) 한계
| 한계 | 설명 |
|---|---|
| 쓰기 작업이 불가 | • 읽기 전용 라이브러리로, 쓰기 작업 불가 |
| 모든 쿼리에서 항상 빠르지 않음 | • 소량 데이터에서는 속도의 차이가 거의 없음 |
| 내부적인 추가 쿼리 | • partition 확인 및 count 확인을 위한 추가 쿼리가 내부적으로 수행됨 • 이로 인한 약간의 오버헤드 발생 가능 • 또한, 데이터가 변경되는 중에 읽어들일시 일관성이 무너질 위험 |
(3) 적용을 권장하는 상황
- 대용량 SELECT 결과를 Python DataFrame으로 빠르게 가져오고 싶을 때
- pandas.read_sql이 느리거나 메모리를 많이 쓸 때
- DB → pandas / polars / arrow 로딩이 병목일 때
- 숫자형 partition column이 있어서 병렬 읽기가 가능할 때
(4) 적용을 권장하지 않는 상황
- 소량 데이터 조회
- INSERT / UPDATE / DELETE 작업 (불가능)
- ORM이 필요한 애플리케이션 코드
- transaction 제어가 중요한 작업
- 실시간으로 계속 변경되는 OLTP 테이블의 일관 조회
4. RDB 데이터를 불러오는 방식 비교
(1) 기존
pandas.read_sql(),SQLAlchemy,pymysql,psycopg,sqlite등을 많이 사용해왔음- 위 방법들로 RDB 데이터를 읽어와 DataFrame 형태로 변환할 때 아래와 같은 작업을 거침
1
2
3
4
5
6
DB에서 결과 조회
→ Python DB driver가 DB 결과 데이터를 읽음
→ DB 값을 Python 객체로 변환함
→ row를 tuple/list/dict 같은 Python 자료구조로 구성함
→ pandas DataFrame으로 변환함
→ pandas 내부 구조에 맞게 column 단위로 재배치함
- (1) 데이터가 많을수록 DB driver fetch 횟수와 전송량이 증가함
- (2) 각 row와 column 값을 Python 객체로 변환하는 비용이 발생함
- (2) Python 객체로 변환된 데이터를 다시 DataFrame 구조로 바꾸는 비용이 발생
- (3) 변환 중에는 동일한 데이터가 여러 형태로 메모리에 동시에 존재할 수 있음
(2) Connector X
- 반복적인 데이터 복사와 Python 레벨에서의 오버헤드를 줄이는 데 집중함
- Rust로 작성되어 있으며, DB에서 읽은 데이터를 Rust 내부에서 처리한 뒤 DataFrame으로 변환 가능한 형태로 Python에 전달함
- 따라서 row들을 Python 객체로 먼저 만든 뒤 다시 DataFrame으로 조립하는 기존 방식보다 변환 비용과 복사 비용을 줄일 수 있음
1
2
3
4
DB에서 결과 조회
→ ConnectorX가 DB와 직접 연결해 데이터를 읽음
→ Rust 내부에서 데이터를 효율적으로 처리함
→ Python 객체 변환을 최소화하면서 DataFrame으로 전달함
5. 성능 이득 원리
(1) Partition
- ConnectorX의 기능으로, 데이터를 몇 개의 덩어리로 분할한 뒤, 이를 병렬로 읽는 방법
- 분할 기준은 id 컬럼과 같이 숫자형이면서, 되도록이면 연속적인 값을 가진 컬럼이 될 수 있다.
partition_range를 직접 지정해주면, 내부적으로 min/max 조회를 하지 않아 성능 이득을 볼 수 있다.- 다만, 파티셔닝을 하는 데에도 추가적인 작업이 필요하므로, 데이터량에 따라 성능 이득을 볼 수도 있고, 오히려 성능이 떨어질 수도 있다.
1
2
3
partition_on = "분할 기준이 될 컬럼명"
partition_num = "몇 개의 덩어리로 나눌 것인지"
partition_range = "분할 기준 컬럼의 최소, 최대값 범위"
(2) Python Object 변환을 줄임
- 기존 방식에서는 DB의 각 row와 column 값이 Python 객체로 변환됨
- 반면 ConnectorX는 Rust 레벨에서 데이터를 처리하고, DataFrame이 사용할 수 있는 형태로 직접 구성함
- 따라서 Python 인터프리터를 거치는 반복 작업이 줄으듦으로 속도에서 이득을 볼 수 있음
(2) Python Object 변환을 줄임
- 기존 방식에서는 DB에서 읽은 각 row와 column 값이 Python 객체로 변환됨(예:
int,str,datetime,tuple등) - 이후 이 Python 객체 기반의 row 데이터를 다시 pandas DataFrame으로 변환함
- 반면 ConnectorX는 Rust에서 대부분의 데이터처리를 거치고 DataFrame과 유사한 형태로 반환함
- 따라서 대량의 row를 Python 객체로 반복 생성하는 비용이 줄어들어 읽기 성능에서 이점을 볼 수 있음
(3) 메모리 복사 횟수를 줄이는 구조
- 기존 방식에서는 DB 결과가 Python 객체, list/tuple, pandas DataFrame 등 여러 단계를 거치며 변환됨
- 이 과정에서 중간 자료구조가 생성되거나, DataFrame 내부 구조로 옮겨지는 과정에서 추가 복사가 발생할 수 있음
- ConnectorX는 이러한 중간 변환 단계를 줄이는 것을 지향함
- DB에서 읽은 데이터를 Rust 내부에서 처리하고, DataFrame으로 변환하기 좋은 형태로 전달함
- 따라서 (2)번의 Python 객체 생성 비용뿐 아니라 중간 자료구조 생성 비용과 메모리 복사 비용도 줄일 수 있음
실습
0. 실습 코드
- 아래 깃헙에서 확인 가능
study/00_Languages/04_Python/std_connectorx at main · whdrns2013/study
1. 설정값
1
2
3
4
5
6
7
8
9
10
11
12
# config/config.ini
[db]
baseurl=mysql+pymysql://
host={ip주소}
port={port번호}
user={DB유저명}
password={DB패스워드}
db={DB명}
table=dummy_data
[setting]
dummy_data_size=500000
2. 더미 데이터 테이블 스키마
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# models/dummy_data.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer, Float, DateTime
from config.config import config
class Base(DeclarativeBase):
pass
class DummyData(Base):
__tablename__ = config["db"]["table"]
id :Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name :Mapped[str] = mapped_column(String(50), nullable=True)
email :Mapped[str] = mapped_column(String(50), nullable=True)
age :Mapped[int] = mapped_column(Integer, nullable=True)
country :Mapped[str] = mapped_column(String(2), nullable=True)
score :Mapped[float] = mapped_column(Float, nullable=True)
created_at :Mapped[str] = mapped_column(DateTime, nullable=False)
1. 더미 데이터 생성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# scripts/data_generator.py
from config.config import config
from models.dummy_data import DummyData
from sqlalchemy.orm import sessionmake
def generate_dummy_data(data_size:int = int(config["setting"]["dummy_data_size"])):
countries = ["KR", "US", "JP", "CN", "DE", "FR", "GB", "IN"]
rows = []
for i in range(1, data_size + 1):
rows.append((
i,
f"user_{i}",
f"user_{i}@example.com",
20 + (i % 50),
countries[i % len(countries)],
round((i % 10000) / 100, 2),
f"2026-01-{(i % 28) + 1:02d} 12:00:00"
))
return rows
def insert_dummy_data(data:list[tuple], engine=get_engine()):
Session = sessionmaker(bind=engine)
with Session() as s:
for id, name, email, age, country, score, created_at in data:
dummy = DummyData(
id = id, name = name, email = email, age = age,
country = country, score = score, created_at = created_at
)
s.add(dummy)
s.commit()
return
2. DB 생성 및 데이터 삽입
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# scripts/data_generator.py
from config.config import config
from sqlalchemy import create_engine
from urllib.parse import quote_plus
from models.dummy_data import DummyData
def get_engine(baseurl:str=config["db"]["baseurl"],
user:str=config["db"]["user"],
password:str=config["db"]["password"],
host:str=config["db"]["host"],
port:str=config["db"]["port"],
db:str=config["db"]["db"]):
url = f"{baseurl}{user}:{quote_plus(password)}@{host}:{port}/{db}"
connect_args = {
"connect_timeout": 30,
"read_timeout": 600,
"write_timeout": 600,
"charset": "utf8mb4",
}
engine = create_engine(url, echo=False, connect_args=connect_args)
return engine
def create_db(engine = get_engine()):
DummyData.__table__.create(bind=engine)
def init_data():
try:
create_db()
datas = generate_dummy_data()
insert_dummy_data(datas)
except:
datas = generate_dummy_data()
try:
insert_dummy_data(datas)
except:
print("data insert failed")
3. 벤치마크 코드
- 공통 import
1
2
3
4
5
6
7
8
9
10
11
# scripts/benchmark.py
import time
import pandas as pd
from config.config import config
from scripts.data_generator import get_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
from models.dummy_data import DummyData
import pymysql
from urllib.parse import quote_plus
import connectorx as cx
- pandas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# scripts/benchmark.py
# =========================
# 벤치마크 1: pandas
# =========================
def benchmark_pandas(engine=None):
if engine is None:
engine = get_engine()
query = f"SELECT * FROM {config['db']['table']}"
start = time.perf_counter()
df = pd.read_sql(query, engine)
elapsed = time.perf_counter() - start
print("\n[pandas.read_sql + pymysql connection]")
print(f"rows: {len(df):,}")
print(f"columns: {len(df.columns)}")
print(f"time: {elapsed:.4f}초")
return f"{elapsed:.4f}"
- SQLAlchemy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# scripts/benchmark.py
# =========================
# 벤치마크 2: sqlalchemy
# =========================
def benchmark_sqlalchemy(engine=None):
if engine is None:
engine = get_engine()
Session = sessionmaker(bind=engine)
start = time.perf_counter()
with Session() as s:
smtm = select(DummyData)
datas = s.scalars(smtm).all()
elapsed = time.perf_counter() - start
print("\n[sqlalchemy]")
print(f"rows: {len(datas):,}")
print(f"time: {elapsed:.4f}초")
return f"{elapsed:.4f}"
- pymysql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# scripts/benchmark.py
# =========================
# 벤치마크 3: pymysql
# =========================
def benchmark_pymysql():
query = f"SELECT * FROM {config['db']['table']}"
conn = pymysql.connect(
host=config["db"]["host"],
port=int(config["db"]["port"]),
user=config["db"]["user"],
password=config["db"]["password"],
database=config["db"]["db"],
)
cur = conn.cursor()
start = time.perf_counter()
cur.execute(query)
datas = cur.fetchall()
elapsed = time.perf_counter() - start
cur.close()
conn.close()
print("\n[pymysql]")
print(f"rows: {len(datas):,}")
print(f"time: {elapsed:.4f}초")
return f"{elapsed:.4f}"
- connectorx.read_sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# scripts/benchmark.py
# =========================
# 벤치마크 4: connectorx.read_sql
# =========================
def benchmark_connectorx():
user = config["db"]["user"]
password = quote_plus(config["db"]["password"])
host = config["db"]["host"]
port = config["db"]["port"]
db = config["db"]["db"]
url = f"mysql://{user}:{password}@{host}:{port}/{db}"
query = f"SELECT * FROM {config['db']['table']}"
start = time.perf_counter()
df = cx.read_sql(url, query)
elapsed = time.perf_counter() - start
print("\n[connectorx.read_sql]")
print(f"rows: {len(df):,}")
print(f"columns: {len(df.columns)}")
print(f"time: {elapsed:.4f}초")
return f"{elapsed:.4f}"
- connectorx.read_sql with partition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# scripts/benchmark.py
# =========================
# 벤치마크 5: connectorx.read_sql + partition
# =========================
def benchmark_connectorx_with_partition():
user = config["db"]["user"]
password = quote_plus(config["db"]["password"])
host = config["db"]["host"]
port = config["db"]["port"]
db = config["db"]["db"]
url = f"mysql://{user}:{password}@{host}:{port}/{db}"
query = f"SELECT * FROM {config['db']['table']}"
start = time.perf_counter()
df = cx.read_sql(url, query, partition_on="id", partition_num=4, partition_range=(1,2_000_000))
elapsed = time.perf_counter() - start
print("\n[connectorx.read_sql with partition]")
print(f"rows: {len(df):,}")
print(f"columns: {len(df.columns)}")
print(f"time: {elapsed:.4f}초")
return f"{elapsed:.4f}"
4. 실행 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# main.py
from scripts.data_generator import *
from scripts.benchmark import *
import pandas as pd
def benchmarks(engine):
times = {
"pandas":list(),
"sqlalchemy":list(),
"pymysql":list(),
"connectorx":list(),
"connectorx_with_partition":list()
}
# 10회 반복
for i in range(10):
times["pandas"].append(benchmark_pandas())
times["sqlalchemy"].append(benchmark_sqlalchemy())
times["pymysql"].append(benchmark_pymysql())
times["connectorx"].append(benchmark_connectorx())
times["connectorx_with_partition"].append(benchmark_connectorx_with_partition())
return times
def main():
init_data()
engine = get_engine()
times = benchmarks(engine)
pd.DataFrame(times).to_csv("time_check_result.csv")
if __name__ == "__main__":
main()
5. 결과
- 데이터량 : 50만건, 컬럼 7개
| try | pandas | sqlalchemy | pymysql | cx | cx with partition |
|---|---|---|---|---|---|
| 1 | 7.6914 | 9.4381 | 7.5063 | 6.688 | 8.6599 |
| 2 | 8.1133 | 10.0019 | 8.4122 | 7.2162 | 6.9585 |
| 3 | 8.7253 | 10.287 | 8.4252 | 6.8618 | 7.4433 |
| 4 | 8.0078 | 9.8607 | 7.7773 | 6.5455 | 6.8261 |
| 5 | 8.5214 | 10.1245 | 9.5504 | 6.8186 | 7.1813 |
| 6 | 8.4877 | 10.74 | 7.6776 | 7.3612 | 8.1982 |
| 7 | 8.6856 | 10.0981 | 7.6401 | 6.6203 | 6.7870 |
| 8 | 9.2614 | 10.2867 | 7.3845 | 7.3345 | 7.2067 |
| 9 | 8.0289 | 10.0144 | 8.1619 | 7.5521 | 8.2916 |
| 10 | 8.0717 | 9.7355 | 8.6035 | 7.2553 | 7.4797 |
| average | 8.35945 | 10.05869 | 8.1139 | 7.02535 | 7.50323 |
6. 결과 고찰
(1) ConnectorX 방식이 다른 방식들보다 빠른 읽기 속도를 보임
- 10회 반복 측정 결과, ConnectorX는 평균 7.025초로 가장 빠른 읽기 속도를 보임
- ConnectorX는 pandas 대비 약 1.33초, SQLAlchemy 대비 약 3.03초, pymysql 대비 약 1.09초 정도 더 빠른 결과
- 이러한 결과는 ConnectorX가 기존 방식보다 Python 레벨에서의 변환 비용을 줄이는 구조를 가지고 있기 때문으로 해석
- 이 중 가장 느린 SQLAlchemy의 경우, ORM의 특징 때문으로 추정되는데, 단순히 데이터를 가져오는 것에서 끝나지 않고 Python 객체와의 매핑작업이 추가되기 때문으로 추정됨.
(2) partition을 나누지 않은 경우가 더 빠른 읽기 속도를 보임
- 파티션을 하지 않은 경우가 7.025초, 파티션을 한 경우가 7.503초로, 하지 않은 경우가 더 빠른 속도를 보임
- 이를 봤을 때, 파티셔닝을 하는 게 항상 빠르지 않다는 것을 알 수 있음
- 이러한 현상의 원인으로는
파티셔닝에 따른 오버헤드일 것으로 추정 : 애매하게 많은 데이터에서는 파티셔닝 작업이 오히려 방해가 됨 - 예를 들어 파티셔닝을 하지 않는 경우와 하는 경우 아래와 같이 작업이 수행됨
1
2
3
4
5
6
7
8
9
10
# 파티셔닝을 하지 않는 경우
(1) DB connection 1개
(2) SELECT * FROM dummy_data
(3) 결과를 순차적으로 DataFrame 변환
# 파티셔닝을 하는 경우
(1) DB connection 여러 개
(2) id 범위별 쿼리 여러 개
(3) 결과 조각 여러 개 생성
(4) 마지막에 DataFrame 병합
- 즉, SQL 쿼리문으로 볼 때, 아래와 같은 작업이 추가됨 (실제 쿼리라는 게 아니라 논리적인 예시)
1
2
3
4
5
SELECT * FROM dummy_data WHERE id >= 1 AND id < 125000;
SELECT * FROM dummy_data WHERE id >= 125000 AND id < 250000;
SELECT * FROM dummy_data WHERE id >= 250000 AND id < 375000;
SELECT * FROM dummy_data WHERE id >= 375000 AND id <= 500000;
-- 및 결과 통합
- 이번 실습한 데이터는 50만건으로, 병렬화 이득보다 파티셔닝에 따른 오버헤드가 더 컸을 것으로 추정됨
6. 고찰에 따른 추가 실험
- 파티셔닝의 효용성을 측정하기 위해, 데이터량을 더 늘려서 실험해봄
- 기존 50만 건 → 200만 건
- 결과는 여전히 connectorx 가 속도 우위
- 데이터량을 더 늘려서 실험해보려 했으나, connection peer가 발생
- 추후 여건을 다시 만들어서 실험해볼 필요 있음
| try | connectorx | connectorx_with_partition |
|---|---|---|
| 1 | 46.3719 | 44.4643 |
| 2 | 39.7806 | 39.2160 |
| 3 | 37.1590 | 40.9290 |
| average | 41.1038 | 41.5364 |
7. 실험 추가
- 회사에서 업무상 1,800만건 정도의 데이터를 다룰 일이 생김
- 파티셔닝을 하지 않았을 때 2시간여가 소요되었으나, 파티셔닝을 한 뒤 1시간여로 시간이 줄어듦
Reference
https://github.com/sfu-db/connector-x
Comments