ConnectorX

1. 소개

  • RDB 조회 결과를 Pandas, PyArrow, Polars 등 Dataframe 계열 객체로 빠르게 로드하는 라이브러리
  • SQL query를 실행하고, 결과를 Pandas DataFrame으로 빠르고 메모리 효율적으로 로딩한다.
  • 즉, 파이썬에서 RDB 데이터를 빠르게 DataFrame 으로 로딩하는 기능을 제공한다.

2. 사용법

(1) 설치

  • 아래와 같이 사용하며, pandas 또는 pyarrow 와 같이 DataFrame 형태를 다룰 수 있는 라이브러리를 함께 설치하길 권장
1
2
3
4
5
# pip
pip install connectorx

# uv
uv add connectorx

(2) 기본 문법

1
2
3
4
5
6
7
8
9
import connectorx as cx

db_url = f"mysql://{user}:{password}@{host}:{port}/{db}"
query = "SELECT * FROM table" 

df = cx.read_sql(
    conn = db_url, # 연결 DB URL
    query = query                          
)

(3) 병렬 읽기

  • auto_increment id 처럼 숫자형이고, NULL이 없는 column을 파티션 대상으로 지정해 병렬 읽기 수행이 가능하다.
  • partition_on 파라미터 : 병렬처리를 위해 파티션을 나눌 대상 컬럼 이름
  • partition_num 파라미터 : 데이터 전체를 몇 개의 파티션으로 나눌 것인지
  • partition_range 파라미터 : 파티션 대상 컬럼의 전체 값 범위. (선택적)
  • 대상 컬럼은 꼭 id 가 아니더라도 생성일시와 같은 연속적이고 분포가 고른 데이터면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
import connectorx as cx

db_url = f"mysql://{user}:{password}@{host}:{port}/{db}"
query = "SELECT * FROM table" 

df = cx.read_sql(
    conn = db_url, # 연결 DB URL
    query = query,
    partition_on = "id", 
    partition_num = 8,
    partition_range = (1, 500_000)              
)

3. 장단점과 적용 권장 상황

(1) 장점

장점 설명
빠른 속도 • partition 을 지정시 병렬 스레드로 데이터를 읽어 빠른 조회
메모리 효율 • Rust 기반 + zero-copy 원칙으로 더 낮은 메모리 사용량
여러 DB를 지원함 • MySQL, MariaDB, PostgreSQL, Oracle, SQLite 등 여러 RDB 지원
• 결과물로는 Pandas, PyArrow, Dask 등 지원
Pandas 와의 연계 • 결과물이 바로 DataFrame 형태로 나와 사용하기 쉬움

(2) 한계

한계 설명
쓰기 작업이 불가 • 읽기 전용 라이브러리로, 쓰기 작업 불가
모든 쿼리에서 항상 빠르지 않음 • 소량 데이터에서는 속도의 차이가 거의 없음
내부적인 추가 쿼리 • partition 확인 및 count 확인을 위한 추가 쿼리가 내부적으로 수행됨
• 이로 인한 약간의 오버헤드 발생 가능
• 또한, 데이터가 변경되는 중에 읽어들일시 일관성이 무너질 위험

(3) 적용을 권장하는 상황

  • 대용량 SELECT 결과를 Python DataFrame으로 빠르게 가져오고 싶을 때
  • pandas.read_sql이 느리거나 메모리를 많이 쓸 때
  • DB → pandas / polars / arrow 로딩이 병목일 때
  • 숫자형 partition column이 있어서 병렬 읽기가 가능할 때

(4) 적용을 권장하지 않는 상황

  • 소량 데이터 조회
  • INSERT / UPDATE / DELETE 작업 (불가능)
  • ORM이 필요한 애플리케이션 코드
  • transaction 제어가 중요한 작업
  • 실시간으로 계속 변경되는 OLTP 테이블의 일관 조회

4. RDB 데이터를 불러오는 방식 비교

(1) 기존

  • pandas.read_sql() , SQLAlchemy , pymysql , psycopg , sqlite 등을 많이 사용해왔음
  • 위 방법들로 RDB 데이터를 읽어와 DataFrame 형태로 변환할 때 아래와 같은 작업을 거침
1
2
3
4
5
6
DB에서 결과 조회
→ Python DB driver가 DB 결과 데이터를 읽음
→ DB 값을 Python 객체로 변환함
→ row를 tuple/list/dict 같은 Python 자료구조로 구성함
→ pandas DataFrame으로 변환함
→ pandas 내부 구조에 맞게 column 단위로 재배치함
  • (1) 데이터가 많을수록 DB driver fetch 횟수와 전송량이 증가함
  • (2) 각 row와 column 값을 Python 객체로 변환하는 비용이 발생함
  • (2) Python 객체로 변환된 데이터를 다시 DataFrame 구조로 바꾸는 비용이 발생
  • (3) 변환 중에는 동일한 데이터가 여러 형태로 메모리에 동시에 존재할 수 있음

(2) Connector X

  • 반복적인 데이터 복사와 Python 레벨에서의 오버헤드를 줄이는 데 집중함
  • Rust로 작성되어 있으며, DB에서 읽은 데이터를 Rust 내부에서 처리한 뒤 DataFrame으로 변환 가능한 형태로 Python에 전달함
  • 따라서 row들을 Python 객체로 먼저 만든 뒤 다시 DataFrame으로 조립하는 기존 방식보다 변환 비용과 복사 비용을 줄일 수 있음
1
2
3
4
DB에서 결과 조회
→ ConnectorX가 DB와 직접 연결해 데이터를 읽음
→ Rust 내부에서 데이터를 효율적으로 처리함
→ Python 객체 변환을 최소화하면서 DataFrame으로 전달함

5. 성능 이득 원리

(1) Partition

  • ConnectorX의 기능으로, 데이터를 몇 개의 덩어리로 분할한 뒤, 이를 병렬로 읽는 방법
  • 분할 기준은 id 컬럼과 같이 숫자형이면서, 되도록이면 연속적인 값을 가진 컬럼이 될 수 있다.
  • partition_range 를 직접 지정해주면, 내부적으로 min/max 조회를 하지 않아 성능 이득을 볼 수 있다.
  • 다만, 파티셔닝을 하는 데에도 추가적인 작업이 필요하므로, 데이터량에 따라 성능 이득을 볼 수도 있고, 오히려 성능이 떨어질 수도 있다.
1
2
3
partition_on = "분할 기준이 될 컬럼명"
partition_num = "몇 개의 덩어리로 나눌 것인지"
partition_range = "분할 기준 컬럼의 최소, 최대값 범위"

(2) Python Object 변환을 줄임

  • 기존 방식에서는 DB의 각 row와 column 값이 Python 객체로 변환됨
  • 반면 ConnectorX는 Rust 레벨에서 데이터를 처리하고, DataFrame이 사용할 수 있는 형태로 직접 구성함
  • 따라서 Python 인터프리터를 거치는 반복 작업이 줄으듦으로 속도에서 이득을 볼 수 있음

(2) Python Object 변환을 줄임

  • 기존 방식에서는 DB에서 읽은 각 row와 column 값이 Python 객체로 변환됨(예: int, str, datetime, tuple 등)
  • 이후 이 Python 객체 기반의 row 데이터를 다시 pandas DataFrame으로 변환함
  • 반면 ConnectorX는 Rust에서 대부분의 데이터처리를 거치고 DataFrame과 유사한 형태로 반환함
  • 따라서 대량의 row를 Python 객체로 반복 생성하는 비용이 줄어들어 읽기 성능에서 이점을 볼 수 있음

(3) 메모리 복사 횟수를 줄이는 구조

  • 기존 방식에서는 DB 결과가 Python 객체, list/tuple, pandas DataFrame 등 여러 단계를 거치며 변환됨
  • 이 과정에서 중간 자료구조가 생성되거나, DataFrame 내부 구조로 옮겨지는 과정에서 추가 복사가 발생할 수 있음
  • ConnectorX는 이러한 중간 변환 단계를 줄이는 것을 지향함
  • DB에서 읽은 데이터를 Rust 내부에서 처리하고, DataFrame으로 변환하기 좋은 형태로 전달함
  • 따라서 (2)번의 Python 객체 생성 비용뿐 아니라 중간 자료구조 생성 비용과 메모리 복사 비용도 줄일 수 있음

실습

0. 실습 코드

  • 아래 깃헙에서 확인 가능

study/00_Languages/04_Python/std_connectorx at main · whdrns2013/study

1. 설정값

1
2
3
4
5
6
7
8
9
10
11
12
# config/config.ini
[db]
baseurl=mysql+pymysql://
host={ip주소}
port={port번호}
user={DB유저명}
password={DB패스워드}
db={DB명}
table=dummy_data

[setting]
dummy_data_size=500000

2. 더미 데이터 테이블 스키마

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# models/dummy_data.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer, Float, DateTime
from config.config import config

class Base(DeclarativeBase):
    pass

class DummyData(Base):
    __tablename__ = config["db"]["table"]
    id          :Mapped[int]    = mapped_column(Integer, primary_key=True, autoincrement=True)
    name        :Mapped[str]    = mapped_column(String(50), nullable=True)
    email       :Mapped[str]    = mapped_column(String(50), nullable=True)
    age         :Mapped[int]    = mapped_column(Integer, nullable=True)
    country     :Mapped[str]    = mapped_column(String(2), nullable=True)
    score       :Mapped[float]  = mapped_column(Float, nullable=True)
    created_at  :Mapped[str]    = mapped_column(DateTime, nullable=False)

1. 더미 데이터 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# scripts/data_generator.py
from config.config import config
from models.dummy_data import DummyData
from sqlalchemy.orm import sessionmake

def generate_dummy_data(data_size:int = int(config["setting"]["dummy_data_size"])):
    countries = ["KR", "US", "JP", "CN", "DE", "FR", "GB", "IN"]
    rows = []
    for i in range(1, data_size + 1):
        rows.append((
            i,
            f"user_{i}",
            f"user_{i}@example.com",
            20 + (i % 50),
            countries[i % len(countries)],
            round((i % 10000) / 100, 2),
            f"2026-01-{(i % 28) + 1:02d} 12:00:00"
        ))
    return rows

def insert_dummy_data(data:list[tuple], engine=get_engine()):
    Session = sessionmaker(bind=engine)
    with Session() as s:
        for id, name, email, age, country, score, created_at in data:
            dummy = DummyData(
                id = id, name = name, email = email, age = age,
                country = country, score = score, created_at = created_at
            )
            s.add(dummy)
        s.commit()
    return
    

2. DB 생성 및 데이터 삽입

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# scripts/data_generator.py
from config.config import config
from sqlalchemy import create_engine
from urllib.parse import quote_plus
from models.dummy_data import DummyData

def get_engine(baseurl:str=config["db"]["baseurl"],
               user:str=config["db"]["user"],
               password:str=config["db"]["password"],
               host:str=config["db"]["host"],
               port:str=config["db"]["port"],
               db:str=config["db"]["db"]):
    url = f"{baseurl}{user}:{quote_plus(password)}@{host}:{port}/{db}"
    connect_args = {
        "connect_timeout": 30,
        "read_timeout": 600,
        "write_timeout": 600,
        "charset": "utf8mb4",
        }
    engine = create_engine(url, echo=False, connect_args=connect_args)
    return engine

def create_db(engine = get_engine()):
    DummyData.__table__.create(bind=engine)

def init_data():
    try:
        create_db()
        datas = generate_dummy_data()
        insert_dummy_data(datas)
    except:
        datas = generate_dummy_data()
        try:
            insert_dummy_data(datas)
        except:
            print("data insert failed")

3. 벤치마크 코드

  • 공통 import
1
2
3
4
5
6
7
8
9
10
11
# scripts/benchmark.py
import time
import pandas as pd
from config.config import config
from scripts.data_generator import get_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
from models.dummy_data import DummyData
import pymysql
from urllib.parse import quote_plus
import connectorx as cx
  • pandas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# scripts/benchmark.py
# =========================
# 벤치마크 1: pandas
# =========================

def benchmark_pandas(engine=None):
    if engine is None:
        engine = get_engine()
    
    query = f"SELECT * FROM {config['db']['table']}"
    start = time.perf_counter()
    df = pd.read_sql(query, engine)
    elapsed = time.perf_counter() - start

    print("\n[pandas.read_sql + pymysql connection]")
    print(f"rows: {len(df):,}")
    print(f"columns: {len(df.columns)}")
    print(f"time: {elapsed:.4f}초")

    return f"{elapsed:.4f}"
  • SQLAlchemy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# scripts/benchmark.py
# =========================
# 벤치마크 2: sqlalchemy
# =========================

def benchmark_sqlalchemy(engine=None):
    if engine is None:
        engine = get_engine()
    
    Session = sessionmaker(bind=engine)
    start = time.perf_counter()
    with Session() as s:
        smtm = select(DummyData)
        datas = s.scalars(smtm).all()
    elapsed = time.perf_counter() - start
    
    print("\n[sqlalchemy]")
    print(f"rows: {len(datas):,}")
    print(f"time: {elapsed:.4f}초")
    
    return f"{elapsed:.4f}"
  • pymysql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# scripts/benchmark.py
# =========================
# 벤치마크 3: pymysql
# =========================

def benchmark_pymysql():
    
    query = f"SELECT * FROM {config['db']['table']}"
    conn = pymysql.connect(
        host=config["db"]["host"],
        port=int(config["db"]["port"]),
        user=config["db"]["user"],
        password=config["db"]["password"],
        database=config["db"]["db"],
    )
    cur = conn.cursor()
    
    start = time.perf_counter()
    cur.execute(query)
    datas = cur.fetchall()
    elapsed = time.perf_counter() - start
    cur.close()
    conn.close()
    
    print("\n[pymysql]")
    print(f"rows: {len(datas):,}")
    print(f"time: {elapsed:.4f}초")
    
    return f"{elapsed:.4f}"
  • connectorx.read_sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# scripts/benchmark.py
# =========================
# 벤치마크 4: connectorx.read_sql
# =========================

def benchmark_connectorx():
    
    user = config["db"]["user"]
    password = quote_plus(config["db"]["password"])
    host = config["db"]["host"]
    port = config["db"]["port"]
    db = config["db"]["db"]
    url = f"mysql://{user}:{password}@{host}:{port}/{db}"
    
    query = f"SELECT * FROM {config['db']['table']}"
    
    start = time.perf_counter()
    df = cx.read_sql(url, query)
    elapsed = time.perf_counter() - start

    print("\n[connectorx.read_sql]")
    print(f"rows: {len(df):,}")
    print(f"columns: {len(df.columns)}")
    print(f"time: {elapsed:.4f}초")

    return f"{elapsed:.4f}"
  • connectorx.read_sql with partition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# scripts/benchmark.py
# =========================
# 벤치마크 5: connectorx.read_sql + partition
# =========================

def benchmark_connectorx_with_partition():
    
    user = config["db"]["user"]
    password = quote_plus(config["db"]["password"])
    host = config["db"]["host"]
    port = config["db"]["port"]
    db = config["db"]["db"]
    url = f"mysql://{user}:{password}@{host}:{port}/{db}"
    
    query = f"SELECT * FROM {config['db']['table']}"
    
    start = time.perf_counter()
    df = cx.read_sql(url, query, partition_on="id", partition_num=4, partition_range=(1,2_000_000))
    elapsed = time.perf_counter() - start

    print("\n[connectorx.read_sql with partition]")
    print(f"rows: {len(df):,}")
    print(f"columns: {len(df.columns)}")
    print(f"time: {elapsed:.4f}초")

    return f"{elapsed:.4f}"

4. 실행 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# main.py
from scripts.data_generator import *
from scripts.benchmark import *
import pandas as pd

def benchmarks(engine):
    times = {
        "pandas":list(),
        "sqlalchemy":list(),
        "pymysql":list(),
        "connectorx":list(),
        "connectorx_with_partition":list()
             }
    # 10회 반복
    for i in range(10):
        times["pandas"].append(benchmark_pandas())
        times["sqlalchemy"].append(benchmark_sqlalchemy())
        times["pymysql"].append(benchmark_pymysql())
        times["connectorx"].append(benchmark_connectorx())
        times["connectorx_with_partition"].append(benchmark_connectorx_with_partition())
    return times
    
def main():
    init_data()
    engine = get_engine()
    times = benchmarks(engine)
    pd.DataFrame(times).to_csv("time_check_result.csv")
    

if __name__ == "__main__":
    main()

5. 결과

  • 데이터량 : 50만건, 컬럼 7개
try pandas sqlalchemy pymysql cx cx with partition
1 7.6914 9.4381 7.5063 6.688 8.6599
2 8.1133 10.0019 8.4122 7.2162 6.9585
3 8.7253 10.287 8.4252 6.8618 7.4433
4 8.0078 9.8607 7.7773 6.5455 6.8261
5 8.5214 10.1245 9.5504 6.8186 7.1813
6 8.4877 10.74 7.6776 7.3612 8.1982
7 8.6856 10.0981 7.6401 6.6203 6.7870
8 9.2614 10.2867 7.3845 7.3345 7.2067
9 8.0289 10.0144 8.1619 7.5521 8.2916
10 8.0717 9.7355 8.6035 7.2553 7.4797
average 8.35945 10.05869 8.1139 7.02535 7.50323

6. 결과 고찰

(1) ConnectorX 방식이 다른 방식들보다 빠른 읽기 속도를 보임

  • 10회 반복 측정 결과, ConnectorX는 평균 7.025초로 가장 빠른 읽기 속도를 보임
  • ConnectorX는 pandas 대비 약 1.33초, SQLAlchemy 대비 약 3.03초, pymysql 대비 약 1.09초 정도 더 빠른 결과
  • 이러한 결과는 ConnectorX가 기존 방식보다 Python 레벨에서의 변환 비용을 줄이는 구조를 가지고 있기 때문으로 해석
  • 이 중 가장 느린 SQLAlchemy의 경우, ORM의 특징 때문으로 추정되는데, 단순히 데이터를 가져오는 것에서 끝나지 않고 Python 객체와의 매핑작업이 추가되기 때문으로 추정됨.

(2) partition을 나누지 않은 경우가 더 빠른 읽기 속도를 보임

  • 파티션을 하지 않은 경우가 7.025초, 파티션을 한 경우가 7.503초로, 하지 않은 경우가 더 빠른 속도를 보임
  • 이를 봤을 때, 파티셔닝을 하는 게 항상 빠르지 않다는 것을 알 수 있음
  • 이러한 현상의 원인으로는 파티셔닝에 따른 오버헤드 일 것으로 추정 : 애매하게 많은 데이터에서는 파티셔닝 작업이 오히려 방해가 됨
  • 예를 들어 파티셔닝을 하지 않는 경우와 하는 경우 아래와 같이 작업이 수행됨
1
2
3
4
5
6
7
8
9
10
# 파티셔닝을 하지 않는 경우
(1) DB connection 1개
(2) SELECT * FROM dummy_data
(3) 결과를 순차적으로 DataFrame 변환

# 파티셔닝을 하는 경우
(1) DB connection 여러 개
(2) id 범위별 쿼리 여러 개
(3) 결과 조각 여러 개 생성
(4) 마지막에 DataFrame 병합
  • 즉, SQL 쿼리문으로 볼 때, 아래와 같은 작업이 추가됨 (실제 쿼리라는 게 아니라 논리적인 예시)
1
2
3
4
5
SELECT * FROM dummy_data WHERE id >= 1 AND id < 125000;
SELECT * FROM dummy_data WHERE id >= 125000 AND id < 250000;
SELECT * FROM dummy_data WHERE id >= 250000 AND id < 375000;
SELECT * FROM dummy_data WHERE id >= 375000 AND id <= 500000;
-- 및 결과 통합
  • 이번 실습한 데이터는 50만건으로, 병렬화 이득보다 파티셔닝에 따른 오버헤드가 더 컸을 것으로 추정됨

6. 고찰에 따른 추가 실험

  • 파티셔닝의 효용성을 측정하기 위해, 데이터량을 더 늘려서 실험해봄
  • 기존 50만 건 → 200만 건
  • 결과는 여전히 connectorx 가 속도 우위
  • 데이터량을 더 늘려서 실험해보려 했으나, connection peer가 발생
  • 추후 여건을 다시 만들어서 실험해볼 필요 있음
try connectorx connectorx_with_partition
1 46.3719 44.4643
2 39.7806 39.2160
3 37.1590 40.9290
average 41.1038 41.5364

7. 실험 추가

  • 회사에서 업무상 1,800만건 정도의 데이터를 다룰 일이 생김
  • 파티셔닝을 하지 않았을 때 2시간여가 소요되었으나, 파티셔닝을 한 뒤 1시간여로 시간이 줄어듦

Reference

https://github.com/sfu-db/connector-x

Comments