Batch Operations: Bulk Insert, COPY, and Batch Size Tuning
Batch Operations: Bulk Insert, COPY, and Batch Size Tuning
Loading or updating large volumes of data row-by-row is prohibitively slow. Batch operations reduce overhead by orders of magnitude. This article covers bulk insert techniques, PostgreSQL's COPY command, batch updates, and the art of choosing the right batch size.
Row-by-Row is Slow
Inserting one row at a time incurs overhead for each statement:
* Parse SQL
* Plan query
* Execute plan
* Commit (if auto-commit)
* Network round trip
# SLOW: one round trip per row
for row in dataset:
cursor.execute("INSERT INTO users (email, name) VALUES (%s, %s)", row)
For 100,000 rows, that is 100,000 round trips. Batch operations reduce this to one.
Bulk Insert with Multi-Row VALUES
The simplest batch insert sends multiple rows in a single statement:
INSERT INTO users (email, name) VALUES
('alice@example.com', 'Alice'),
('bob@example.com', 'Bob'),
('carol@example.com', 'Carol');
Using psycopg2 execute_values
from psycopg2.extras import execute_values
data = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
# ... 1000 rows
]
execute_values(
cursor,
"INSERT INTO users (email, name) VALUES %s",
data,
template="(%s, %s)",
page_size=1000
)
Using asyncpg
import asyncpg
# executemany with prepared statement reuse
await conn.executemany(
"INSERT INTO users (email, name) VALUES ($1, $2)",
[("alice@example.com", "Alice"), ("bob@example.com", "Bob")]
)
The COPY Command
COPY is PostgreSQL's most efficient data loading mechanism. It streams data in a binary or text format directly into a table, bypassing the SQL layer:
-- From file
COPY users (email, name) FROM '/path/to/users.csv' WITH (FORMAT CSV, HEADER true);
-- From standard input (via driver)
COPY users (email, name) FROM STDIN WITH (FORMAT CSV);
Python with COPY
import io
import csv
buffer = io.StringIO()
writer = csv.writer(buffer)
writer.writerows([
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
])
buffer.seek(0)
cursor.copy_expert(
"COPY users (email, name) FROM STDIN WITH CSV",
buffer
)
Performance Comparison
| Method | Time for 1M rows | Network Rounds | |--------|-----------------|-----------------| | Row-by-row INSERT | ~120 seconds | 1,000,000 | | Batch INSERT (1000 rows) | ~8 seconds | 1,000 | | COPY (binary) | ~1.5 seconds | 1 | | COPY (CSV) | ~2 seconds | 1 |
Batch Updates
Updating rows in bulk follows a different pattern. Use a temporary table or unnest:
Using UNNEST
UPDATE users SET email = data.email
FROM (SELECT UNNEST(%s) AS id, UNNEST(%s) AS email) AS data
WHERE users.id = data.id;
user_ids = [1, 2, 3, 4, 5]
emails = ["alice@new.com", "bob@new.com", "carol@new.com", "dave@new.com", "eve@new.com"]
cursor.execute("""
UPDATE users SET email = data.email
FROM (SELECT UNNEST(%s::int[]) AS id, UNNEST(%s::text[]) AS email) AS data
WHERE users.id = data.id
""", (user_ids, emails))
Using a Temporary Table
# Create temp table
cursor.execute("""
CREATE TEMP TABLE tmp_updates (
id INTEGER PRIMARY KEY,
email TEXT,
name TEXT
) ON COMMIT DROP
""")
# COPY into temp table
buffer = io.StringIO()
writer = csv.writer(buffer)
writer.writerows(update_data)
buffer.seek(0)
cursor.copy_expert("COPY tmp_updates FROM STDIN WITH CSV", buffer)
# Join update
cursor.execute("""
UPDATE users u
SET email = t.email, name = t.name
FROM tmp_updates t
WHERE u.id = t.id
""")
Batch Size Tuning
The optimal batch size depends on row width, network latency, and available memory.
General Guidelines
| Row Width | Recommended Batch Size | |-----------|----------------------| | Narrow (few columns) | 1000-5000 | | Medium (10-20 columns) | 500-2000 | | Wide (JSONB, TEXT, many columns) | 100-500 |
Finding the Sweet Spot
import time
def benchmark_batch_size(cursor, data, batch_sizes):
for batch_size in batch_sizes:
start = time.time()
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
execute_values(
cursor,
"INSERT INTO users (email, name) VALUES %s",
batch,
page_size=batch_size
)
elapsed = time.time() - start
print(f"Batch size {batch_size}: {elapsed:.2f}s")
Typical results for narrow rows:
Batch size 100: 3.2s
Batch size 500: 1.8s
Batch size 1000: 1.2s
Batch size 5000: 1.5s # Diminishing returns begin
Batch size 10000: 2.1s # Worse: memory pressure
Error Handling
All-or-Nothing (Transactional)
try:
conn = psycopg2.connect(dsn)
cursor = conn.cursor()
execute_values(cursor, "INSERT INTO users (email) VALUES %s", data)
conn.commit()
except Exception as e:
conn.rollback()
print(f"Batch failed: {e}")
Savepoint per Batch
for i, batch in enumerate(batches):
try:
cursor.execute("SAVEPOINT batch_sp")
execute_values(cursor, "INSERT INTO users (email) VALUES %s", batch)
cursor.execute("RELEASE SAVEPOINT batch_sp")
except Exception as e:
cursor.execute("ROLLBACK TO SAVEPOINT batch_sp")
print(f"Batch {i} failed, skipping: {e}")
ON CONFLICT for Idempotent Loads
INSERT INTO users (id, email, name) VALUES %s
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name;
Best Practices
* **Use COPY for initial loads** and bulk inserts where latency is acceptable.
2\. **Use batch INSERTs (execute_values)** for operational bulk operations. 3\. **Disable triggers and indexes** temporarily for very large loads, then re-enable:
ALTER TABLE users SET (autovacuum_enabled = false);
ALTER TABLE users DISABLE TRIGGER ALL;
DROP INDEX idx_users_email;
-- Perform batch load
CREATE INDEX CONCURRENTLY idx_users_email ON users (email);
ALTER TABLE users ENABLE TRIGGER ALL;
ALTER TABLE users SET (autovacuum_enabled = true);
4\. **Increase max_wal_size** for large batch operations to avoid excessive checkpoints. 5\. **Use UNLOGGED tables** for staging (data lost on crash but much faster):
CREATE UNLOGGED TABLE staging_users (LIKE users INCLUDING ALL);
-- Load into staging, then INSERT INTO users SELECT FROM staging
Batch operations are essential for ETL pipelines, data migrations, and any workflow that moves large volumes of data. Measure your batch sizes, use COPY when possible, and always handle errors gracefully.