Streaming large files¶
Jetliner is designed for streaming large Avro files with minimal memory overhead. This guide covers memory-efficient processing techniques.
Architecture¶
Jetliner reads Avro files block-by-block rather than loading entire files into memory:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Avro File │ ──► │ Buffer │ ──► │ DataFrame │
│ (blocks) │ │ (prefetch) │ │ (batches) │
└─────────────┘ └─────────────┘ └─────────────┘
This streaming architecture enables processing files larger than available RAM.
Using AvroReader for streaming control¶
The AvroReader API gives you direct control over batch processing:
import jetliner
with jetliner.AvroReader("large_file.avro") as reader:
for batch in reader:
# Process each batch individually
# Memory is released after each iteration
process(batch)
Processing without accumulation¶
For true streaming (constant memory usage):
import jetliner
total_amount = 0
row_count = 0
with jetliner.AvroReader("huge_file.avro") as reader:
for batch in reader:
# Aggregate without keeping data in memory
total_amount += batch["amount"].sum()
row_count += batch.height
print(f"Total: {total_amount}, Rows: {row_count}")
Writing results incrementally¶
Stream results to disk without accumulating in memory:
import jetliner
with jetliner.AvroReader("input.avro") as reader:
for i, batch in enumerate(reader):
# Process and write each batch
processed = batch.filter(batch["amount"] > 0)
processed.write_parquet(f"output/part_{i:04d}.parquet")
Buffer configuration¶
Jetliner uses prefetching to overlap I/O with processing. Configure buffers based on your environment:
Parameters¶
| Parameter | Default | Description |
|---|---|---|
buffer_blocks |
4 | Number of Avro blocks to prefetch |
buffer_bytes |
64MB | Maximum bytes to buffer |
High-throughput settings¶
For maximum speed when memory is available:
import jetliner
# More prefetching, larger buffer
df = jetliner.scan_avro(
"data.avro",
buffer_blocks=8,
buffer_bytes=128 * 1024 * 1024, # 128MB
).collect()
Memory-constrained settings¶
For environments with limited memory (Lambda, containers):
import jetliner
# Less prefetching, smaller buffer
with jetliner.AvroReader(
"data.avro",
buffer_blocks=2,
buffer_bytes=16 * 1024 * 1024, # 16MB
) as reader:
for batch in reader:
process(batch)
Batch size control¶
Control the number of records per batch:
import jetliner
# Smaller batches for fine-grained control
with jetliner.AvroReader("data.avro", batch_size=10_000) as reader:
for batch in reader:
assert batch.height <= 10_000
process(batch)
# Larger batches for better throughput
with jetliner.AvroReader("data.avro", batch_size=500_000) as reader:
for batch in reader:
process(batch)
Progress tracking¶
Track progress during streaming:
import jetliner
with jetliner.AvroReader("large_file.avro") as reader:
total_rows = 0
batch_count = 0
for batch in reader:
batch_count += 1
total_rows += batch.height
if batch_count % 10 == 0:
print(f"Processed {batch_count} batches, {total_rows:,} rows")
process(batch)
print(f"Complete: {batch_count} batches, {total_rows:,} rows")
With tqdm¶
import jetliner
from tqdm import tqdm
with jetliner.AvroReader("large_file.avro") as reader:
for batch in tqdm(reader, desc="Processing"):
process(batch)
Streaming with scan_avro()¶
The scan_avro() API streams internally and collects results at the end. When combined with a selective filter, predicate pushdown keeps memory usage low by discarding filtered rows during reading:
import jetliner
import polars as pl
# Memory-efficient: only matching rows are accumulated
df = (
jetliner.scan_avro("large_file.avro")
.filter(pl.col("status") == "active") # Selective filter
.collect()
)
To write large results without accumulating in memory, use Polars' streaming sink:
For batch-by-batch processing with query optimization, Polars provides two streaming methods on LazyFrame. Both apply projection pushdown, predicate pushdown, and early stopping before delivering data in batches:
sink_batches() pushes batches to a callback:
import jetliner
import polars as pl
(
jetliner.scan_avro("large_file.avro")
.select(["user_id", "amount"])
.filter(pl.col("amount") > 100)
.sink_batches(lambda batch: process(batch))
)
collect_batches() returns an iterator you pull from:
import jetliner
import polars as pl
lf = (
jetliner.scan_avro("large_file.avro")
.select(["user_id", "amount"])
.filter(pl.col("amount") > 100)
)
for batch in lf.collect_batches():
process(batch)
When to use AvroReader instead
sink_batches and collect_batches give you Polars query composition before batches are delivered, but the reader is managed internally. AvroReader and MultiAvroReader expose the reading process directly: error inspection (.errors, .error_count), schema access, progress tracking (.rows_read, .is_finished), and early termination with resource cleanup. Choose the Polars streaming methods when you need query composition; choose the iterator APIs when you need control over the reader itself.
Next steps¶
- Query Optimization - Reduce data read
- Data Sources - Paths, S3, codecs
- Error Handling - Handle failures gracefully