Skip to content
26 changes: 26 additions & 0 deletions be/src/column/hash_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,32 @@ class EqualOnSliceWithHash {
bool operator()(const SliceWithHash& x, const SliceWithHash& y) const {
// by comparing hash value first, we can avoid comparing real data
// which may touch another memory area and has bad cache locality.
if (x.size != y.size) return false;
return x.hash == y.hash && memequal(x.data, x.size, y.data, y.size);
}
};

template <PhmapSeed seed>
class TSliceWithHash : public Slice {
public:
size_t hash;
TSliceWithHash(const Slice& src) : Slice(src.data, src.size) { hash = SliceHashWithSeed<seed>()(src); }
TSliceWithHash(const uint8_t* p, size_t s, size_t h) : Slice(p, s), hash(h) {}
};

template <PhmapSeed seed>
class THashOnSliceWithHash {
public:
std::size_t operator()(const TSliceWithHash<seed>& slice) const { return slice.hash; }
};

template <PhmapSeed seed>
class TEqualOnSliceWithHash {
public:
bool operator()(const TSliceWithHash<seed>& x, const TSliceWithHash<seed>& y) const {
// by comparing hash value first, we can avoid comparing real data
// which may touch another memory area and has bad cache locality.
if (x.size != y.size) return false;
return x.hash == y.hash && memequal(x.data, x.size, y.data, y.size);
}
};
Expand Down
31 changes: 0 additions & 31 deletions be/src/exec/vectorized/aggregate/agg_hash_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,6 @@ using DateAggHashSet = phmap::flat_hash_set<DateValue, StdHashWithSeed<DateValue
template <PhmapSeed seed>
using TimeStampAggHashSet = phmap::flat_hash_set<TimestampValue, StdHashWithSeed<TimestampValue, seed>>;

// By storing hash value in slice, we can save the cost of
// 1. re-calculate hash value of the slice
// 2. touch slice memory area which may cause high latency of memory access.
// and the tradeoff is we allocate 8-bytes hash value in slice.
// But now we allocate all slice data on a single memory pool(4K per allocation)
// the internal fragmentation can offset these 8-bytes hash value.

template <PhmapSeed seed>
class TSliceWithHash : public Slice {
public:
size_t hash;
TSliceWithHash(const Slice& src) : Slice(src.data, src.size) { hash = SliceHashWithSeed<seed>()(src); }
TSliceWithHash(const uint8_t* p, size_t s, size_t h) : Slice(p, s), hash(h) {}
};

template <PhmapSeed seed>
class THashOnSliceWithHash {
public:
std::size_t operator()(const TSliceWithHash<seed>& slice) const { return slice.hash; }
};

template <PhmapSeed seed>
class TEqualOnSliceWithHash {
public:
bool operator()(const TSliceWithHash<seed>& x, const TSliceWithHash<seed>& y) const {
// by comparing hash value first, we can avoid comparing real data
// which may touch another memory area and has bad cache locality.
return x.hash == y.hash && memequal(x.data, x.size, y.data, y.size);
}
};

template <PhmapSeed seed>
using SliceAggHashSet =
phmap::flat_hash_set<TSliceWithHash<seed>, THashOnSliceWithHash<seed>, TEqualOnSliceWithHash<seed>>;
Expand Down
114 changes: 109 additions & 5 deletions be/src/exprs/agg/distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "exprs/agg/aggregate.h"
#include "exprs/agg/sum.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "udf/udf_internal.h"
#include "util/phmap/phmap_dump.h"

Expand All @@ -51,6 +50,8 @@ struct DistinctAggregateState<PT, FixedLengthPTGuard<PT>> {
return pair.second * phmap::item_serialize_size<HashSet<T>>::value;
}

void prefetch(T key) { set.prefetch(key); }

int64_t disctint_count() const { return set.size(); }

size_t serialize_size() const { return set.dump_bound(); }
Expand Down Expand Up @@ -155,10 +156,80 @@ struct DistinctAggregateState<PT, BinaryPTGuard<PT>> {
SliceHashSet set;
};

template <PrimitiveType PT, AggDistinctType DistinctType, typename T = RunTimeCppType<PT>>
class DistinctAggregateFunction final
: public AggregateFunctionBatchHelper<DistinctAggregateState<PT>,
DistinctAggregateFunction<PT, DistinctType, T>> {
// use a different way to do serialization to gain performance.
template <PrimitiveType PT, typename = guard::Guard>
struct DistinctAggregateStateV2 {};

template <PrimitiveType PT>
struct DistinctAggregateStateV2<PT, FixedLengthPTGuard<PT>> {
using T = RunTimeCppType<PT>;
using SumType = RunTimeCppType<SumResultPT<PT>>;
static constexpr size_t item_size = sizeof(T);
using MyHashSet = HashSet<T>;

size_t update(T key) {
auto pair = set.insert(key);
return pair.second * item_size;
}

void prefetch(T key) { set.prefetch(key); }

int64_t disctint_count() const { return set.size(); }

size_t serialize_size() const { return set.size() * item_size + sizeof(size_t); }

void serialize(uint8_t* dst) const {
size_t size = set.size();
memcpy(dst, &size, sizeof(size));
dst += sizeof(size);
for (auto& key : set) {
memcpy(dst, &key, sizeof(key));
dst += sizeof(T);
}
}

size_t deserialize_and_merge(const uint8_t* src, size_t len) {
size_t size = 0;
memcpy(&size, src, sizeof(size));
set.rehash(set.size() + size);

src += sizeof(size);
const T* data = reinterpret_cast<const T*>(src);
static const size_t prefetch_dist = 4;
for (size_t i = 0; i < size; i++) {
if ((i + prefetch_dist) < size) {
set.prefetch(data[i + prefetch_dist]);
}
set.insert(data[i]);
}
return size * item_size;
}

SumType sum_distinct() const {
SumType sum{};
// Sum distinct doesn't support timestamp and date type
if constexpr (IsDateTime<SumType>) {
return sum;
}

for (auto& key : set) {
sum += key;
}
return sum;
}

MyHashSet set;
};

template <PrimitiveType PT>
struct DistinctAggregateStateV2<PT, BinaryPTGuard<PT>> : public DistinctAggregateState<PT> {};

// Dear god this template class as template parameter kills me!
template <PrimitiveType PT, template <PrimitiveType X> class TDistinctAggState, AggDistinctType DistinctType,
typename T = RunTimeCppType<PT>>
class TDistinctAggregateFunction
: public AggregateFunctionBatchHelper<TDistinctAggState<PT>,
TDistinctAggregateFunction<PT, TDistinctAggState, DistinctType, T>> {
public:
using ColumnType = RunTimeColumnType<PT>;

Expand All @@ -173,6 +244,33 @@ class DistinctAggregateFunction final
ctx->impl()->add_mem_usage(mem_usage);
}

void update_batch_single_state(FunctionContext* ctx, size_t batch_size, const Column** columns,
AggDataPtr state) const override {
const ColumnType* column = down_cast<const ColumnType*>(columns[0]);
size_t mem_usage = 0;
auto& agg_state = this->data(state);

if constexpr (IsSlice<T>) {
MemPool* mem_pool = ctx->impl()->mem_pool();
for (size_t i = 0; i < batch_size; ++i) {
mem_usage += agg_state.update(mem_pool, column->get_slice(i));
}
} else {
auto* data = column->get_data().data();
// after doing some benchmark, we see a lot of perf gain by prefetching when doing count(distinct)
// but only when there is no group by clause or group by number is low.
// prefetching has down side if group by number is high.
static const size_t prefetch_dist = 4;
for (size_t i = 0; i < batch_size; ++i) {
if ((i + prefetch_dist) < batch_size) {
agg_state.prefetch(data[i + prefetch_dist]);
}
mem_usage += agg_state.update(data[i]);
}
}
ctx->impl()->add_mem_usage(mem_usage);
}

void merge(FunctionContext* ctx, const Column* column, AggDataPtr state, size_t row_num) const override {
DCHECK(column->is_binary());
const auto* input_column = down_cast<const BinaryColumn*>(column);
Expand Down Expand Up @@ -259,4 +357,10 @@ class DistinctAggregateFunction final
}
};

template <PrimitiveType PT, AggDistinctType DistinctType, typename T = RunTimeCppType<PT>>
class DistinctAggregateFunction : public TDistinctAggregateFunction<PT, DistinctAggregateState, DistinctType, T> {};

template <PrimitiveType PT, AggDistinctType DistinctType, typename T = RunTimeCppType<PT>>
class DistinctAggregateFunctionV2 : public TDistinctAggregateFunction<PT, DistinctAggregateStateV2, DistinctType, T> {};

} // namespace starrocks::vectorized