refactor(hash-aggr): Simplify aggregate hash table#23309
Conversation
alamb
left a comment
There was a problem hiding this comment.
Thank you @2010YOUY01 -- I think this looks like a nice cleanup / consolidation to me
| } | ||
|
|
||
| /// Marker for ordered raw rows -> partial state aggregation. | ||
| pub(in crate::aggregates) struct PartialMarker; |
There was a problem hiding this comment.
do you plan to remove this PartialMarker for the ordered streams too? Maybe we could do the same simplification
There was a problem hiding this comment.
Yes, this can be done similarly.
But I think there is a better fast path to implement for fully ordered case (query group by a,b, input order order by a,b), this idea can be implemented in a completely separated path without depending on the aggregate hash tables; while the partially ordered case still relies on hash table, and only do early emit optimization.
So I think probably its easier to do after we have further optimized the full order fast path.
Filed #23318 to track.
| // table semantics. Consider remove `AggregateMode` and only use `AggregateTableMode` | ||
| // after the refactor has finished. | ||
| // | ||
| // Issue: <https://github.com/apache/datafusion/pull/22729> |
There was a problem hiding this comment.
This seems like a PR (not an issue) -- is it the link you intended?
Note I agree with the fact that AggregateMode seems overly complicated. I vaguely remember trying to remove it once but I can't find the PR now (maybe I never pushed it)
There was a problem hiding this comment.
Yes, that's a wrong link. Fixed
| .iter_mut() | ||
| .zip(evaluated_batch.accumulator_args.iter()) | ||
| { | ||
| match mode { |
There was a problem hiding this comment.
it probably doesn't matter as this is one match per column, but I wonder if we should do the match outside and the loop inside 🤔
There was a problem hiding this comment.
I agree, this way seems a little bit easier to read: 6871dc7
|
|
||
| /// Hash table used only for converting raw input rows directly into partial | ||
| /// aggregate state rows after partial aggregation has been skipped. | ||
| pub(in crate::aggregates) struct PartialSkipHashTable { |
There was a problem hiding this comment.
i like that this has a named newtype wrapper
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-aggr-simplify-ht (7f68c23) to e4aa41d (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-aggr-simplify-ht (7f68c23) to e4aa41d (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-aggr-simplify-ht (7f68c23) to e4aa41d (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
| .zip(evaluated_batch.accumulator_args.iter()) | ||
| { | ||
| match mode { | ||
| AggregateTableMode::Partial | AggregateTableMode::Single => { |
There was a problem hiding this comment.
I am concerned codes here seem switching to the way like row_hash.rs: all logic in one place, and using if else to decide where we go.
There was a problem hiding this comment.
I wonder if it would help to factor these decisions into other abstractions? Like maybe a method on AggregateTableMode 🤔
| // Filters apply only when the table consumes raw input rows. Final and | ||
| // partial-reduce modes consume partial states, so their filters are not | ||
| // applicable. | ||
| let filters = match mode { |
There was a problem hiding this comment.
Same concern like what in aggregate_batch
| /// - `Final`: merge_batch + evaluate | ||
| /// - `PartialReduce`: merge_batch + state | ||
| /// - `Single`: update_batch + evaluate | ||
| pub(in crate::aggregates) struct AggregateHashTable { |
There was a problem hiding this comment.
I think maybe we should keep different AggregateHashTables for different usage, and remove duplicated codes based on this?
Due to :
- Concerns in
aggregate_batchand other methods. - Some method like
partial_skip_table, it is logic for partial aggr and seems not common.
There was a problem hiding this comment.
For Proposed alternative
I have considered this approach: separating aggregate hash tables by mode and extracting shared internal utilities.
My concern is that this would create a shallow module. To fully understand the implementation, we would still need to reason about the same underlying mode differences, while the external AggregateHashTable variants would add complexity with almost no functional contribution.
So I think keeping the mode as an internal flag is simpler overall.
Complexity concern
For the concern about growing internal complexity, see the rationale in the issue. I think this should remain manageable in the long term.
Partial skip table
Yes, this is a valid concern. The partial skip table logic could be moved out of this module entirely to make the structure cleaner. However, I think this can be delayed a bit, and I plan to do it as a follow-up.
#23113
There was a problem hiding this comment.
My concern is that this would create a shallow module. To fully understand the implementation, we would still need to reason about the same underlying mode differences, while the external AggregateHashTable variants would add complexity with almost no functional contribution.
One idea could be to leave the structure that is already on main, and refactor the common behavior out as templated functions on the base class
So for example, leave AggregateHashTable<AggrMode>
But then in the parts that are very similar like aggregate_batch instead of
impl AggregateHashTable<PartialReduceMarker> {
...
pub(in crate::aggregates) fn aggregate_batch(
&mut self,
batch: &RecordBatch,
) -> Result<()> {
let evaluated_batch = self.evaluate_batch(batch)?;
let state = self.state.building_mut();
let timer = self.group_by_metrics.aggregation_time.timer();
for group_values in &evaluated_batch.grouping_set_args {
state
.group_values
.intern(group_values, &mut state.batch_group_indices)?;
let group_indices = &state.batch_group_indices;
let total_num_groups = state.group_values.len();
for (acc, values) in state
.accumulators
.iter_mut()
.zip(evaluated_batch.accumulator_args.iter())
{
acc.merge_batch(values, group_indices, total_num_groups)?;
}
}
drop(timer);
Ok(())
}
..
}It could be structured something so that the specialization was called
impl AggregateHashTable<PartialReduceMarker> {
...
pub(in crate::aggregates) fn aggregate_batch(
&mut self,
batch: &RecordBatch,
) -> Result<()> {
// call a templated function with a closure with the hash table specific functionality
self.aggregate_batch_inner(batch, |acc, values, group_indices, total_num_groups| {
acc.merge_batch(values, group_indices, total_num_groups)
})?;
}
...
}And then move the common logic into the generic method
/// Implement in "base" class (not the specialization)
impl AggregateHashTable<AggrMode> {
...
fn <F: Fn(...) -> Result<()>)aggregate_batch_inner(
batch: &RecordBatch,
agg_function: F,
) -> Result<()> {
let evaluated_batch = self.evaluate_batch(batch)?;
let state = self.state.building_mut();
let timer = self.group_by_metrics.aggregation_time.timer();
for group_values in &evaluated_batch.grouping_set_args {
state
.group_values
.intern(group_values, &mut state.batch_group_indices)?;
let group_indices = &state.batch_group_indices;
let total_num_groups = state.group_values.len();
for (acc, values) in state
.accumulators
.iter_mut()
.zip(evaluated_batch.accumulator_args.iter())
{
// *** Note here call the generic function agg_function: F
agg_function(values, group_indices, total_num_groups)?; <---------
}
}
drop(timer);
Ok(())
}
}
}There was a problem hiding this comment.
The templated-function approach seems like a good idea. I’ll give it a try and see how it looks.
If the templated function is not expressive enough in some places/at some point, we can still fully separate the implementation by aggregation variant.
|
Thanks for the review @Rachelint and @alamb If there are other ideas for making the structure simpler and easier to understand, I’d be happy to discuss and experiment further. I think managing complexity is a hard but important problem, and it’s interesting to figure out methodologies to address it. |
Which issue does this PR close?
Rationale for this change
Part of #22710
Addressing @alamb ’s comment from #23181 (review)
The motivation behind the EPIC was that the existing aggregation code had too many internal flags and was reused by many different execution paths, making it hard to maintain. The previous refactor made two main changes:
Now that the refactor is almost done, with only single aggregation mode and spilling behavior left, I think decision 1 was still solid, while decision 2 was an overcorrection.
A module usually becomes hard to maintain when it has many tightly coupled internal flags. In the old
GroupsHashAggregateStreamcase, there were ~10 such flags. But a small number of flags, I think <= 3, is still clean and manageable.This PR unifies the aggregate hash tables used by different modes into a single implementation. The behavioral differences are controlled by one internal
modeflag. See the comments incommon.rs: struct AggregateHashTable, for details.Since this refactor is close to completion, I can confirm the internal complexity of
AggregateHashTableis not likely to bloat in the long term, so unifying these implementations seems like the better design.What changes are included in this PR?
Replace separated aggregate hash table implementation with a unified one
Are these changes tested?
Existing tests
Are there any user-facing changes?