Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
21023fd
Refactor batch_write in async_store
willdealtry Jun 30, 2023
1abf14c
Stray character
willdealtry Aug 3, 2023
342d466
Add none (empty) type which is promotable to any other type.
Jul 24, 2023
88ffa19
Register empty type handler and fix compilation errors
Jul 24, 2023
5eb6282
Fix indentation
Jul 26, 2023
a81118d
Add comments for the type hanling classes
Jul 26, 2023
037eb21
Chage if/else ordering when checking for empty type vs UTF_DYNAMIC
Jul 27, 2023
95ba288
Add BSL headers
Jul 27, 2023
2089c0d
Remove #include duplication
Jul 27, 2023
2f00339
Add static assert
Jul 27, 2023
39a7bf8
Indentation format fix
Jul 27, 2023
a2c60ba
Create py::none after taking the lock. Though it's allowed to create …
Jul 27, 2023
a64feaf
Every type must be able to become empty type
Jul 27, 2023
83882e5
Add line break
Jul 27, 2023
25d0871
Change aggregator_set_data so that it does not rely on work done in t…
Jul 27, 2023
3f4decf
Add explicit constexpr branch for numerical and empty types in column…
Jul 28, 2023
bd63ff2
Make Empty type trivially compatible to each other type
Jul 28, 2023
3daf150
Make changes to the Buffer class so that it can support EmptyType
Aug 9, 2023
5689bfd
Do not handle NaNs the same way Nones are handled
Aug 9, 2023
296fa1e
Fix the order of checks for nan and is_unicode
Aug 9, 2023
f5315be
Fix logical error in checking the type of a column
Aug 10, 2023
cc5cde7
Address review comments
Aug 10, 2023
a4f6658
Addressing review comments
Aug 10, 2023
311543c
Address review comments
Aug 11, 2023
957c2c9
Test if EmptyType is trivially compatible with each other tyoe and vi…
Aug 14, 2023
0be4972
Apply suggestions from code review
vasil-pashov Aug 14, 2023
513ca95
Fix has_valid_type_promotion
Aug 14, 2023
a85ed88
Add coment about ASCII_DYNAMIC
Aug 15, 2023
5ee6169
Fix trivially_compatible_types
Aug 15, 2023
31641b5
Merge branch 'master' into dev/vasil.pashov/add-none-type
vasil-pashov Aug 16, 2023
159bcf3
Fix compilation error after fixing merge conflict
Aug 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ pybind11_add_module(arcticdb_ext MODULE
python/arctic_version.hpp
python/reader.hpp
python/adapt_read_dataframe.hpp
python/python_handlers.hpp
python/python_handlers.cpp
)

set (additional_link_flags "")
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/async_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ std::pair<entity::VariantKey, std::optional<Segment>> lookup_match_in_dedup_map(
return std::make_pair(de_dup_key.value(), std::nullopt);
}
}
}
}
9 changes: 7 additions & 2 deletions cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ std::pair<size_t, size_t> ColumnEncoder::max_compressed_size(

while (auto block = column_data.next<TDT>()) {
const auto nbytes = block.value().nbytes();
util::check(nbytes, "Zero-sized block");
uncompressed_bytes += nbytes;
if constexpr(!is_empty_type(TDT::DataTypeTag::data_type)) {
util::check(nbytes > 0, "Zero-sized block");
uncompressed_bytes += nbytes;
}
// For the empty type the column will contain 0 size of user data however the encoder might need add some
// encoder specific data to the buffer, thus the uncompressed size will be 0 but the max_compressed_bytes
// might be non-zero.
max_compressed_bytes += Encoder::max_compressed_size(codec_opts, block.value());
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/codec/codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ struct ColumnEncoder {
ARCTICDB_TRACE(log::codec(), "Column data has {} blocks", column_data.num_blocks());

while (auto block = column_data.next<TDT>()) {
util::check(block.value().nbytes(), "Zero-sized block");
if constexpr(!is_empty_type(TDT::DataTypeTag::data_type)) {
util::check(block.value().nbytes() > 0, "Zero-sized block");
}
Encoder::encode(codec_opts, block.value(), field, out, pos);
}
});
Expand Down
48 changes: 24 additions & 24 deletions cpp/arcticdb/column_store/column_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@ inline py::array array_at(const SegmentInMemory& frame, std::size_t col_pos, py:
using TypeTag = std::decay_t<decltype(tag)>;
constexpr auto data_type = TypeTag::DataTypeTag::data_type;
std::string dtype;
py::array::ShapeContainer shapes{0};
ssize_t esize = get_type_size(data_type);
constexpr ssize_t esize = is_sequence_type(data_type) && is_fixed_string_type(data_type) ? 1 : get_type_size(data_type);
if constexpr (is_sequence_type(data_type)) {
if(is_fixed_string_type(data_type)) {
if constexpr (is_fixed_string_type(data_type)) {
dtype = data_type == DataType::ASCII_FIXED64 ? "<S0" : "<U0";
esize = 1;
}
else {
} else {
dtype = "O";
}
} else {
} else if constexpr(is_numeric_type(data_type) || is_bool_type(data_type)) {
constexpr auto dim = TypeTag::DimensionTag::value;
util::check(dim == Dimension::Dim0, "Only scalars supported, {}", data_type);
if constexpr (data_type == DataType::NANOSECONDS_UTC64) {
Expand All @@ -47,34 +44,35 @@ inline py::array array_at(const SegmentInMemory& frame, std::size_t col_pos, py:
} else {
dtype = fmt::format("{}{:d}", get_dtype_specifier(data_type), esize);
}
} else if constexpr (is_empty_type(data_type)) {
dtype= "O";
Comment on lines +47 to +48
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note related to this PR: we will need adaptation on the Python layer because float is used for empty series when pandas < 2 is used.

} else {
static_assert(!sizeof(data_type), "Unhandled data type");
}
py::array::StridesContainer strides{esize};
return py::array{py::dtype{dtype}, std::move(shapes), std::move(strides)};
return py::array{py::dtype{dtype}, py::array::ShapeContainer{0}, py::array::StridesContainer{esize}};
});
}
return visit_field(frame.field(col_pos), [&, frame=frame, col_pos=col_pos] (auto &&tag) {
using TypeTag = std::decay_t<decltype(tag)>;
constexpr auto dt = TypeTag::DataTypeTag::data_type;
auto &buffer = frame.column(col_pos).data().buffer();
constexpr auto data_type = TypeTag::DataTypeTag::data_type;
const auto &buffer = frame.column(col_pos).data().buffer();
std::string dtype;
std::vector<shape_t> shapes;
shapes.push_back(frame.row_count());
ssize_t esize = get_type_size(dt);
if constexpr (is_sequence_type(dt)) {
if (is_fixed_string_type(dt)) {
ssize_t esize = get_type_size(data_type);
if constexpr (is_sequence_type(data_type)) {
if (is_fixed_string_type(data_type)) {
esize = buffer.bytes() / frame.row_count();
auto char_count = esize;
if (dt == DataType::UTF_FIXED64) {
if (data_type == DataType::UTF_FIXED64) {
char_count /= UNICODE_WIDTH;
}
dtype = fmt::format((dt == DataType::ASCII_FIXED64 ? "<S{:d}" : "<U{:d}"), char_count);
dtype = fmt::format((data_type == DataType::ASCII_FIXED64 ? "<S{:d}" : "<U{:d}"), char_count);
} else {
dtype = "O";
}
} else {
} else if constexpr(is_numeric_type(data_type) || is_bool_type(data_type)) {
constexpr auto dim = TypeTag::DimensionTag::value;
util::check(dim == Dimension::Dim0, "Only scalars supported, {}", frame.field(col_pos));
if constexpr (dt == DataType::NANOSECONDS_UTC64) {
if constexpr (data_type == DataType::NANOSECONDS_UTC64) {
// NOTE: this is safe as of Pandas < 2.0 because `datetime64` _always_ has been using nanosecond resolution,
// i.e. Pandas < 2.0 _always_ provides `datetime64[ns]` and ignores any other resolution.
// Yet, this has changed in Pandas 2.0 and other resolution can be used,
Expand All @@ -84,15 +82,17 @@ inline py::array array_at(const SegmentInMemory& frame, std::size_t col_pos, py:
// rely uniquely on the resolution-less 'M' specifier if it this doable.
dtype = "datetime64[ns]";
} else {
dtype = fmt::format("{}{:d}", get_dtype_specifier(dt), esize);
dtype = fmt::format("{}{:d}", get_dtype_specifier(data_type), esize);
}
} else if constexpr (is_empty_type(data_type)) {
dtype = "O";
} else {
static_assert(!sizeof(data_type), "Unhandled data type");
}
std::vector<stride_t> strides;
strides.push_back(esize);
// Note how base is passed to the array to register the data owner.
// It's especially important to keep the frame data object alive for as long as the array is alive
// so that regular python ref counting logic handles the liveness
return py::array(py::dtype{dtype}, std::move(shapes), std::move(strides), buffer.data(), anchor);
return py::array(py::dtype{dtype}, {frame.row_count()}, {esize}, buffer.data(), anchor);
});
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/arcticdb/entity/type_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ inline std::optional<entity::TypeDescriptor> has_valid_type_promotion(entity::Ty
if (source == target)
return target;

// Empty type is coercible to any type
if(is_empty_type(source.data_type())) {
return target;
}

// Nothing is coercible to the empty type.
if(is_empty_type(target.data_type())) {
return std::nullopt;
}

auto source_type = source.data_type();
auto target_type = target.data_type();
auto source_size = slice_bit_size(source_type);
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/entity/types-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ auto visit_dim(DataType dt, Callable &&c) {
DT_CASE(ASCII_DYNAMIC64)
DT_CASE(UTF_FIXED64)
DT_CASE(UTF_DYNAMIC64)
DT_CASE(EMPTYVAL)
#undef DT_CASE
default: util::raise_rte("Invalid dtype '{}' in visit dim", datatype_to_str(dt));
}
Expand All @@ -64,6 +65,7 @@ auto visit_type(DataType dt, Callable &&c) {
DT_CASE(ASCII_DYNAMIC64)
DT_CASE(UTF_FIXED64)
DT_CASE(UTF_DYNAMIC64)
DT_CASE(EMPTYVAL)
#undef DT_CASE
default: util::raise_rte("Invalid dtype '{}' in visit type", datatype_to_str(dt));
}
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/entity/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ std::string_view datatype_to_str(const DataType dt) {
TO_STR(ASCII_DYNAMIC64)
TO_STR(UTF_FIXED64)
TO_STR(UTF_DYNAMIC64)
TO_STR(EMPTYVAL)
// TO_STR(UTF8_STRING)
// TO_STR(BYTES)
// TO_STR(PICKLE)
Expand Down
14 changes: 14 additions & 0 deletions cpp/arcticdb/entity/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ enum class ValueType : uint8_t {

UTF_DYNAMIC = 11,
ASCII_DYNAMIC = 12,
/// Used to represent null types. Each type can be converted to Empty and Empty can be converted to each type.
EMPTY = 13,
COUNT // Not a real value type, should not be added to proto descriptor. Used to count the number of items in the enum
};

// Sequence types are composed of more than one element
Expand Down Expand Up @@ -152,12 +155,17 @@ constexpr bool is_utf_type(ValueType v) {
return v == ValueType::UTF8_FIXED || v == ValueType::UTF_DYNAMIC;
}

constexpr bool is_empty_type(ValueType v) {
return v == ValueType::EMPTY;
}

enum class SizeBits : uint8_t {
UNKNOWN_SIZE_BITS = 0,
S8 = 1,
S16 = 2,
S32 = 3,
S64 = 4,
COUNT
};

constexpr SizeBits get_size_bits(uint8_t size) {
Expand Down Expand Up @@ -196,6 +204,7 @@ enum class DataType : uint8_t {
UTF_FIXED64 = detail::combine_val_bits(ValueType::UTF8_FIXED, SizeBits::S64),
UTF_DYNAMIC64 = detail::combine_val_bits(ValueType::UTF_DYNAMIC, SizeBits::S64),
BYTES_DYNAMIC64 = detail::combine_val_bits(ValueType::BYTES, SizeBits::S64),
EMPTYVAL = detail::combine_val_bits(ValueType::EMPTY, SizeBits::S64),
#undef DT_COMBINE
UNKNOWN = 0,
};
Expand Down Expand Up @@ -282,6 +291,10 @@ constexpr bool is_utf_type(DataType v){
return is_utf_type(slice_value_type(v));
}

constexpr bool is_empty_type(DataType v){
return is_empty_type(slice_value_type(v));
}

static_assert(slice_value_type((DataType::UINT16)) == ValueType(1));
static_assert(get_type_size(DataType::UINT32) == 4);
static_assert(get_type_size(DataType::UINT64) == 8);
Expand Down Expand Up @@ -371,6 +384,7 @@ DATA_TYPE_TAG(ASCII_FIXED64, std::uint64_t)
DATA_TYPE_TAG(ASCII_DYNAMIC64, std::uint64_t)
DATA_TYPE_TAG(UTF_FIXED64, std::uint64_t)
DATA_TYPE_TAG(UTF_DYNAMIC64, std::uint64_t)
DATA_TYPE_TAG(EMPTYVAL, std::uint64_t)
#undef DATA_TYPE_TAG

enum class Dimension : uint8_t {
Expand Down
15 changes: 7 additions & 8 deletions cpp/arcticdb/pipeline/frame_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,20 @@ void aggregator_set_data(
for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) {
if (*ptr_data == none.ptr()) {
agg.set_no_string_at(col, s, not_a_string());
}
else if(is_py_nan(*ptr_data)){
} else if(is_py_nan(*ptr_data)){
agg.set_no_string_at(col, s, nan_placeholder());
}
else {
} else {
if constexpr (is_utf_type(slice_value_type(dt))) {
auto wrapper= convert::py_unicode_to_buffer(*ptr_data);
agg.set_string_at(col, s, wrapper.buffer_, wrapper.length_);
}
else {
} else {
auto wrapper = convert::pystring_to_buffer(*ptr_data);
agg.set_string_at(col, s, wrapper.buffer_, wrapper.length_);
}
}
}
}
} else {
} else if constexpr (is_numeric_type(dt) || is_bool_type(dt)) {
auto ptr = tensor.template ptr_cast<RawType>(row);
if (sparsify_floats) {
if constexpr (is_floating_point_type(dt)) {
Expand All @@ -172,6 +169,8 @@ void aggregator_set_data(
agg.set_array(col, t);
}
}
} else if constexpr (!is_empty_type(dt)) {
static_assert(!sizeof(dt), "Unknown data type");
Comment on lines +172 to +173
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a else clause and raise if it's reached?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of doing the same thing isn't it? We should do nothing in case of empty type is passed and if dt is not empty then we have unhandled type and the code should not compile at all. This is a shorthand for:

...
else if constexpr (is_empty_type(dt)) {
} else {
            static_assert(!sizeof(dt), "Unknown data type");
}

and it's better than throwing since if we have type which is not handled we won't even compile.

}
});
}
Expand Down Expand Up @@ -202,4 +201,4 @@ size_t get_slice_rowcounts(
std::pair<size_t, size_t> offset_and_row_count(
const std::shared_ptr<pipelines::PipelineContext>& context);

} //namespace arcticdb
} //namespace arcticdb
15 changes: 13 additions & 2 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ void advance_skipped_cols(
}
}

template<typename IteratorType>
bool remaining_fields_empty(IteratorType it, const PipelineContextRow& context) {
while(it.has_next()) {
const StreamDescriptor& stream_desc = context.descriptor();
const Field& field = stream_desc.fields(it.source_field_pos());
if(!is_empty_type(field.type().data_type())) {
return false;
}
}
return true;
}

void decode_into_frame_static(
SegmentInMemory &frame,
PipelineContextRow &context,
Expand Down Expand Up @@ -338,7 +350,7 @@ void decode_into_frame_static(
it.dest_col(),
m.frame_field_descriptor_.name());

util::check(data != end, "Reached end of input block with {} fields to decode", it.remaining_fields());
util::check(data != end || remaining_fields_empty(it, context), "Reached end of input block with {} fields to decode", it.remaining_fields());
decode_or_expand(data, buffer.data() + m.offset_bytes_, encoded_field, m.source_type_desc_, m.dest_bytes_, buffers);
ARCTICDB_TRACE(log::codec(), "Decoded column {} to position {}", field_name, data - begin);

Expand Down Expand Up @@ -1121,7 +1133,6 @@ folly::Future<std::vector<VariantKey>> fetch_data(
decode_into_frame_dynamic(frame, row, std::move(key_seg.segment()), buffers);
else
decode_into_frame_static(frame, row, std::move(key_seg.segment()), buffers);

return std::get<AtomKey>(key_seg.variant_key());
});
}
Expand Down
5 changes: 5 additions & 0 deletions cpp/arcticdb/processing/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ struct OutputType<DataTypeTag<DataType::NANOSECONDS_UTC64>, void> {
using type = ScalarTagType<DataTypeTag<DataType::NANOSECONDS_UTC64>>;
};

template<>
struct OutputType<DataTypeTag<DataType::EMPTYVAL>, void> {
using type = ScalarTagType<DataTypeTag<DataType::EMPTYVAL>>;
};

void add_data_type_impl(DataType data_type, std::optional<DataType>& current_data_type);

struct MinMaxAggregatorData {
Expand Down
27 changes: 27 additions & 0 deletions cpp/arcticdb/processing/test/test_has_valid_type_promotion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,30 @@ TEST(HasValidTypePromotion, FloatFloat) {
EXPECT_FALSE(has_valid_type_promotion(float64, float32));
}

TEST(HasValidTypePromotion, EmptyToEverything) {
using namespace arcticdb;
using namespace arcticdb::entity;
TypeDescriptor source(ValueType::EMPTY, SizeBits::S64, Dimension::Dim0);
for(int value_type = int(ValueType::UNKNOWN_VALUE_TYPE); value_type < int(ValueType::COUNT); ++value_type) {
for(int size_bits = int(SizeBits::UNKNOWN_SIZE_BITS); size_bits < int(SizeBits::COUNT); ++size_bits) {
const TypeDescriptor target(ValueType(value_type), SizeBits(size_bits), Dimension::Dim0);
ASSERT_EQ(has_valid_type_promotion(source, target), target);
}
}
}

TEST(HasValidTypePromotion, EverythingToEmpty) {
using namespace arcticdb;
using namespace arcticdb::entity;
const TypeDescriptor target(ValueType::EMPTY, SizeBits::S64, Dimension::Dim0);
for(int value_type = int(ValueType::UNKNOWN_VALUE_TYPE); value_type < int(ValueType::COUNT); ++value_type) {
for(int size_bits = int(SizeBits::UNKNOWN_SIZE_BITS); size_bits < int(SizeBits::COUNT); ++size_bits) {
const TypeDescriptor source(ValueType(value_type), SizeBits(size_bits), Dimension::Dim0);
if(!is_empty_type(source.data_type())) {
ASSERT_FALSE(has_valid_type_promotion(source, target).has_value());
} else {
ASSERT_EQ(has_valid_type_promotion(source, target), target);
}
}
}
}
30 changes: 30 additions & 0 deletions cpp/arcticdb/python/python_handlers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#include <python/python_handlers.hpp>
#include <python/gil_lock.hpp>

namespace arcticdb {
void EmptyHandler::handle_type(
const uint8_t*&,
uint8_t* dest,
const VariantField&,
const entity::TypeDescriptor&,
size_t dest_bytes,
std::shared_ptr<BufferHolder>
) {
ARCTICDB_SAMPLE(HandleEmpty, 0);
util::check(dest != nullptr, "Got null destination pointer");
const size_t num_rows = dest_bytes / get_type_size(DataType::EMPTYVAL);
static_assert(get_type_size(DataType::EMPTYVAL) == sizeof(PyObject*));
const PyObject** ptr_dest = reinterpret_cast<const PyObject**>(dest);
py::none none = {};
for(auto row = 0u; row < num_rows; ++row) {
none.inc_ref();
*ptr_dest++ = none.ptr();
}
}
}
Loading