Creating Parquet files
Parquet is a great format for storing dataframes. But we can leverage it even more by using row groups.
If you are unfamiliar with these concepts then check out a tutorial on the workings of Parquet.
To my knowledge there are two benefits of using row groups:
They can be processed in parallel and we can include metadata that can for example allow us to skip row groups that don’t contain the data we need.
Packages
I am using Polars for everything related to dataframes and pyarrow to get more out of Parquet statistics.
Both packages are capable of writing row group metadata, but I’m using pyarrow to inspect them from within Python.
More specifically, I want to see the min_value
/max_value
of a column.
These were previously called min
/max
.
Polars write min_value
/max_value
and pyarrow write both, but only read min
/max
when showing the metadata statistics.
Dataset
Consider an example where we have a dataset with 20 million rows and 2 columns.
The first column is the one we will filter on and the second column holds the values.
from pprint import pprint
from random import uniform
from uuid import uuid4
import polars as pl
import polars.selectors as cs
number_of_ids = 100_000
unique_ids = [str(uuid4()) for _ in range(number_of_ids)]
values_per_id = 200
df = pl.DataFrame({
"id": [id for _ in range(values_per_id) for id in unique_ids],
"value": [100 * uniform(0, 1) for _ in range(values_per_id * number_of_ids)]
})
with pl.Config(fmt_str_lengths=50):
pprint(df.head(3))
shape: (3, 2)
┌──────────────────────────────────────┬───────────┐
│ id ┆ value │
│ --- ┆ --- │
│ str ┆ f64 │
╞══════════════════════════════════════╪═══════════╡
│ fadb6f25-cd72-43a3-bf8b-2b70451ffc51 ┆ 4.799587 │
│ 23814f93-62eb-41f2-8ae7-0b0764d8d99c ┆ 49.216103 │
│ f2865c7a-eacc-411e-a520-e951ae9e8587 ┆ 76.454863 │
└──────────────────────────────────────┴───────────┘
I am going to demonstrate the performance improvements using a single filtering operation on the id
column, where the number of matching rows is small.
Materializing the result into a dataframe is therefore not too time consuming.
Measuring performace can be tricky and with a task involving I/O like in this post, caching can deceptive.
In particular, the first run can be substantially slower than any remaining runs due to caching the dataset in main memory.
So if we make repeated measurements and consider the average run time the first, un-cached run time is swamped by the subsequent, cached counterparts.
Here I only measure the filtering operation once (per file creation/update).
Running the measurements in a different order does not affect the results.
For more info on such a topic, see for instance Matt Dowle’s talk on the original H2O benchmarks.
First we use pyarrow’s default Parquet writer to also write statistics.
import os
from pyarrow.parquet import ParquetFile, ParquetWriter, write_table
filename = "data.parquet"
write_table(df.to_arrow(), filename, compression='lz4', write_statistics=True)
There is only a single row group containing all the data.
parquet_file = ParquetFile(filename)
parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x126a596c0>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 2
num_rows: 20000000
num_row_groups: 1
format_version: 2.6
serialized_size: 637
Let us see how fast it is to filter on the id
column.
from timeit import timeit
id = df.item(0, "id")
query = pl.scan_parquet(filename)
timeit(lambda: query.filter(pl.col("id") == id).collect(), number=1)
0.48111404199153185
Multiple row gropus
Let us see how performance is affected by writing multiple row groups.
A row group should neither be too small nor too big.
I have seen recommendations for sizes between 100_000 and 1_000_000, but try it out to see the effect.
import os
from pyarrow.parquet import ParquetFile, ParquetWriter, write_table
write_table(df.to_arrow(), filename, compression='lz4', row_group_size=100_000)
Now we have multiple row groups.
parquet_file = ParquetFile(filename)
parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x10673dcb0>
created_by: parquet-cpp-arrow version 11.0.0
num_columns: 2
num_rows: 20000000
num_row_groups: 200
format_version: 2.6
serialized_size: 57877
Let us see an example of what the row group statistics looks like for the id
column
parquet_file.metadata.row_group(0).column(0).statistics
<pyarrow._parquet.Statistics object at 0x14a6c49a0>
has_min_max: True
min: 00004554-a342-49a8-98df-131d65a4a730
max: ffff23a0-8662-41ca-ad83-4376e64fcf20
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
When filtering Polars can read the metadata of each row group and use the min
and max
statistics to infer whether or not to read the actual data in the row group.
This provides quite a speed up:
query = pl.scan_parquet(filename)
timeit(lambda: query.filter(pl.col("id") == id).collect(), number=1)
0.11861637502443045
Not bad from adding an extra argument.
But wait! There is more.
Controlled row groups
When writing row groups in an ad hoc manner there is no guarantee that id
s that are lexicographically close ends up in the same row group.
That is, a single row group may cover id
s from a to z, which does not help narrow down which groups to read and which to skip.
We can facilitate more relevant grouping by explicitly creating row groups.
I would like the row groups to be of comparable sizes – again containing around 100_000 rows each.
(In this example every id
has the same number of observations, but to generalize this I do a bit of rounding below.)
row_group_association = (
df.group_by('id')
.agg(pl.count())
.sort('id')
.with_columns(
(pl.col("count").cum_sum() // 100_000).alias("row_group")
)
.select(~cs.by_name("count"))
)
Breaking this down, I compute the number of rows for each id
and sort the id
column.
Then every 100_000-ish consecutive number of rows is binned into a row group.
Here it is important to sort before binning.
Here the dataframe fits in my computer’s memory, so we can split it into smaller chunks based on the row group binning
split_df_by_group = (
df
.join(row_group_association, on='id')
.partition_by("row_group", include_key=False)
)
With Pyarrow we can now write each of the smaller dataframes into a separate row group in a new file.
save_schema = split_df_by_group[0].to_arrow().schema
with ParquetWriter(filename, save_schema, compression="LZ4", version="2.6") as writer:
for frame in split_df_by_group:
writer.write_table(frame.to_arrow())
Now we can inspect the statistics again.
parquet_file = ParquetFile(filename)
print(parquet_file.metadata.row_group(0).column(0).statistics)
<pyarrow._parquet.Statistics object at 0x14a6c7ce0>
has_min_max: True
min: fab07261-63f9-4e92-8176-5317da0511cd
max: fc103817-c993-4dc5-ac9f-c7a946592ea5
null_count: 0
distinct_count: 0
num_values: 100000
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
Note that such a single group now contains a narrow range of id
s and by construction each id
is only in a single row group.
This has a big relative impact on the filtering:
query = pl.scan_parquet(filename)
timeit(lambda: query.filter(pl.col("id") == id).collect(), number=1)
0.002844249946065247
Closing thoughts
So the Parquet format can be tweeked to offer better performance without any change in your Polars code.
But this kind of effort is probably only worth it if you are going to read the file multiple times.
Finally, if data is already split into multiple files and organized in Hive partitioning then there is no point in trying to collect everything in a single big Parquet file – Polars can leverage the Hive partitioning directly.