Skip to content

Commit ee37a05

Browse files
committed
guarantee data source order on CS
1 parent 371f8ee commit ee37a05

File tree

19 files changed

+135
-81
lines changed

19 files changed

+135
-81
lines changed

ydb/core/protos/kqp.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,10 +750,12 @@ message TEvKqpScanCursor {
750750
message TColumnShardScanSimple {
751751
optional uint64 SourceIdx = 1;
752752
optional uint32 StartRecordIndex = 2;
753+
optional uint64 OptionalPortionId = 3;
753754
}
754755
message TColumnShardScanNotSortedSimple {
755756
optional uint64 SourceIdx = 1;
756757
optional uint32 StartRecordIndex = 2;
758+
optional uint64 OptionalPortionId = 3;
757759
}
758760
oneof Implementation {
759761
TColumnShardScanPlain ColumnShardPlain = 10;

ydb/core/tx/columnshard/engines/column_engine.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,19 @@ class IColumnEngine {
127127
}
128128
};
129129

130+
class TSelectedPortionInfo {
131+
private:
132+
YDB_READONLY_DEF(std::shared_ptr<TPortionInfo>, Portion);
133+
YDB_READONLY_DEF(bool, IsVisible);
134+
135+
public:
136+
TSelectedPortionInfo(const std::shared_ptr<TPortionInfo> portion, const bool isVisible)
137+
: Portion(portion)
138+
, IsVisible(isVisible)
139+
{
140+
}
141+
};
142+
130143
void FetchDataAccessors(const std::shared_ptr<TDataAccessorsRequest>& request) const;
131144

132145
static ui64 GetMetadataLimit();
@@ -146,7 +159,7 @@ class IColumnEngine {
146159
return DoRegisterTable(pathId);
147160
}
148161
virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0;
149-
virtual std::vector<std::shared_ptr<TPortionInfo>> Select(
162+
virtual std::vector<TSelectedPortionInfo> Select(
150163
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& ownPortions) const = 0;
151164
virtual std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0;
152165
virtual ui64 GetCompactionPriority(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const std::set<TInternalPathId>& pathIds,

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -459,9 +459,10 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up
459459
}
460460
}
461461

462-
std::vector<std::shared_ptr<TPortionInfo>> TColumnEngineForLogs::Select(
463-
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& ownPortions) const {
464-
std::vector<std::shared_ptr<TPortionInfo>> out;
462+
std::vector<TColumnEngineForLogs::TSelectedPortionInfo> TColumnEngineForLogs::Select(TInternalPathId pathId, TSnapshot snapshot,
463+
const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting,
464+
const std::optional<THashSet<TInsertWriteId>>& ownPortions) const {
465+
std::vector<TSelectedPortionInfo> out;
465466
auto spg = GranulesStorage->GetGranuleOptional(pathId);
466467
if (!spg) {
467468
return out;
@@ -482,7 +483,8 @@ std::vector<std::shared_ptr<TPortionInfo>> TColumnEngineForLogs::Select(
482483
const bool takePortion = pkRangesFilter.IsUsed(*portion);
483484
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", takePortion ? "portion_selected" : "portion_skipped")("pathId", pathId)("portion", portion->DebugString());
484485
if (takePortion) {
485-
out.emplace_back(portion);
486+
AFL_VERIFY(nonconflicting != conflicting)("nonconflicting", nonconflicting);
487+
out.emplace_back(portion, nonconflicting);
486488
}
487489
}
488490
for (const auto& [_, portion] : spg->GetPortions()) {
@@ -505,7 +507,8 @@ std::vector<std::shared_ptr<TPortionInfo>> TColumnEngineForLogs::Select(
505507
const bool takePortion = pkRangesFilter.IsUsed(*portion);
506508
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", takePortion ? "portion_selected" : "portion_skipped")("pathId", pathId)("portion", portion->DebugString());
507509
if (takePortion) {
508-
out.emplace_back(portion);
510+
AFL_VERIFY(nonconflicting != conflicting)("nonconflicting", nonconflicting);
511+
out.emplace_back(portion, nonconflicting);
509512
}
510513
}
511514

ydb/core/tx/columnshard/engines/column_engine_logs.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class TColumnEngineForLogs: public IColumnEngine {
169169
void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
170170
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
171171

172-
std::vector<std::shared_ptr<TPortionInfo>> Select(
172+
std::vector<TSelectedPortionInfo> Select(
173173
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& withUncommittedOnlyForTheseWrites) const override;
174174

175175
bool IsPortionExists(const TInternalPathId pathId, const ui64 portionId) const {

ydb/core/tx/columnshard/engines/metadata_accessor.cpp

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,21 @@ std::unique_ptr<NReader::NCommon::ISourcesConstructor> TUserTableAccessor::Selec
4242
const NReader::TReadDescription& readDescription, const bool isPlain) const {
4343
AFL_VERIFY(readDescription.PKRangesFilter);
4444
// here we select portions for a read
45-
std::vector<std::shared_ptr<TPortionInfo>> portions =
46-
context.GetEngine().Select(
47-
PathId.InternalPathId,
48-
readDescription.GetSnapshot(),
49-
*readDescription.PKRangesFilter,
50-
readDescription.readNonconflictingPortions,
51-
readDescription.readConflictingPortions,
52-
readDescription.ownPortions
53-
);
45+
std::vector<IColumnEngine::TSelectedPortionInfo> portions =
46+
context.GetEngine().Select(PathId.InternalPathId, readDescription.GetSnapshot(), *readDescription.PKRangesFilter,
47+
readDescription.readNonconflictingPortions, readDescription.readConflictingPortions, readDescription.ownPortions);
5448
if (!isPlain) {
5549
std::deque<NReader::NSimple::TSourceConstructor> sources;
5650
for (auto&& i : portions) {
57-
sources.emplace_back(NReader::NSimple::TSourceConstructor(std::move(i), readDescription.GetSorting()));
51+
sources.emplace_back(NReader::NSimple::TSourceConstructor(i.GetPortion(), i.GetIsVisible(), readDescription.GetSorting()));
5852
}
5953
return std::make_unique<NReader::NSimple::TPortionsSources>(std::move(sources), readDescription.GetSorting());
6054
} else {
61-
return std::make_unique<NReader::NPlain::TPortionSources>(std::move(portions));
55+
std::vector<std::shared_ptr<TPortionInfo>> sources;
56+
for (auto&& i : portions) {
57+
sources.emplace_back(i.GetPortion());
58+
}
59+
return std::make_unique<NReader::NPlain::TPortionSources>(std::move(sources));
6260
}
6361
}
6462

ydb/core/tx/columnshard/engines/predicate/filter.h

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -195,26 +195,53 @@ class IScanCursor {
195195
}
196196
};
197197

198-
class TSimpleScanCursor: public IScanCursor {
198+
class ISimpleScanCursor: public IScanCursor {
199+
protected:
200+
std::optional<ui32> SourceIdx;
201+
YDB_READONLY_PROTECT(ui32, RecordIndex, 0);
202+
YDB_READONLY_PROTECT_DEF(std::optional<ui64>, PortionId);
203+
204+
virtual bool IsInitialized() const override {
205+
return !!SourceIdx;
206+
}
207+
208+
virtual bool DoCheckSourceIntervalUsage(const ui32 sourceIdx, const ui32 indexStart, const ui32 recordsCount) const override {
209+
AFL_VERIFY(SourceIdx);
210+
AFL_VERIFY(sourceIdx == *SourceIdx);
211+
if (indexStart >= RecordIndex) {
212+
return true;
213+
}
214+
AFL_VERIFY(indexStart + recordsCount <= RecordIndex);
215+
return false;
216+
}
217+
218+
public:
219+
ISimpleScanCursor() = default;
220+
ISimpleScanCursor(const ui32 sourceIdx, const ui32 recordIndex, const std::optional<ui64>& portionId)
221+
: SourceIdx(sourceIdx)
222+
, RecordIndex(recordIndex)
223+
, PortionId(portionId)
224+
{
225+
}
226+
};
227+
228+
class TSimpleScanCursor: public ISimpleScanCursor {
199229
private:
200230
YDB_READONLY_DEF(std::shared_ptr<NArrow::TSimpleRow>, PrimaryKey);
201-
std::optional<ui32> SourceIdx;
202-
YDB_READONLY(ui32, RecordIndex, 0);
203231

204232
virtual void DoSerializeToProto(NKikimrKqp::TEvKqpScanCursor& proto) const override {
205233
AFL_VERIFY(SourceIdx);
206234
proto.MutableColumnShardSimple()->SetSourceIdx(*SourceIdx);
207235
proto.MutableColumnShardSimple()->SetStartRecordIndex(RecordIndex);
236+
if (PortionId) {
237+
proto.MutableColumnShardSimple()->SetOptionalPortionId(*PortionId);
238+
}
208239
}
209240

210241
virtual const std::shared_ptr<NArrow::TSimpleRow>& DoGetPKCursor() const override {
211242
return PrimaryKey;
212243
}
213244

214-
virtual bool IsInitialized() const override {
215-
return !!SourceIdx;
216-
}
217-
218245
virtual bool DoCheckEntityIsBorder(const ICursorEntity& entity, bool& usage) const override {
219246
AFL_VERIFY(SourceIdx);
220247
if (*SourceIdx != entity.GetEntityId()) {
@@ -241,50 +268,39 @@ class TSimpleScanCursor: public IScanCursor {
241268
return TConclusionStatus::Fail("incorrect record index for cursor initialization");
242269
}
243270
RecordIndex = proto.GetColumnShardSimple().GetStartRecordIndex();
244-
return TConclusionStatus::Success();
245-
}
246-
247-
virtual bool DoCheckSourceIntervalUsage(const ui32 sourceIdx, const ui32 indexStart, const ui32 recordsCount) const override {
248-
AFL_VERIFY(SourceIdx);
249-
AFL_VERIFY(sourceIdx == *SourceIdx);
250-
if (indexStart >= RecordIndex) {
251-
return true;
271+
if (proto.GetColumnShardSimple().HasOptionalPortionId()) {
272+
PortionId = proto.GetColumnShardSimple().GetOptionalPortionId();
252273
}
253-
AFL_VERIFY(indexStart + recordsCount <= RecordIndex);
254-
return false;
274+
return TConclusionStatus::Success();
255275
}
256276

257277
public:
258278
TSimpleScanCursor() = default;
259279

260-
TSimpleScanCursor(const std::shared_ptr<NArrow::TSimpleRow>& pk, const ui32 sourceIdx, const ui32 recordIndex)
261-
: PrimaryKey(pk)
262-
, SourceIdx(sourceIdx)
263-
, RecordIndex(recordIndex)
280+
TSimpleScanCursor(
281+
const std::shared_ptr<NArrow::TSimpleRow>& pk, const ui32 sourceIdx, const ui32 recordIndex, const std::optional<ui64>& optonalPortionId)
282+
: ISimpleScanCursor(sourceIdx, recordIndex, optonalPortionId)
283+
, PrimaryKey(pk)
264284
{
265285
}
266286
};
267287

268-
class TNotSortedSimpleScanCursor: public TSimpleScanCursor {
288+
class TNotSortedSimpleScanCursor: public ISimpleScanCursor {
269289
private:
270-
std::optional<ui32> SourceIdx;
271-
YDB_READONLY(ui32, RecordIndex, 0);
272-
273290
virtual void DoSerializeToProto(NKikimrKqp::TEvKqpScanCursor& proto) const override {
274291
auto& data = *proto.MutableColumnShardNotSortedSimple();
275292
AFL_VERIFY(SourceIdx);
276293
data.SetSourceIdx(*SourceIdx);
277294
data.SetStartRecordIndex(RecordIndex);
295+
if (PortionId) {
296+
data.SetOptionalPortionId(*PortionId);
297+
}
278298
}
279299

280300
virtual const std::shared_ptr<NArrow::TSimpleRow>& DoGetPKCursor() const override {
281301
return Default<std::shared_ptr<NArrow::TSimpleRow>>();
282302
}
283303

284-
virtual bool IsInitialized() const override {
285-
return !!SourceIdx;
286-
}
287-
288304
virtual bool DoCheckEntityIsBorder(const ICursorEntity& entity, bool& usage) const override {
289305
AFL_VERIFY(SourceIdx);
290306
if (*SourceIdx != entity.GetEntityId()) {
@@ -312,25 +328,17 @@ class TNotSortedSimpleScanCursor: public TSimpleScanCursor {
312328
return TConclusionStatus::Fail("incorrect record index for cursor initialization");
313329
}
314330
RecordIndex = data.GetStartRecordIndex();
315-
return TConclusionStatus::Success();
316-
}
317-
318-
virtual bool DoCheckSourceIntervalUsage(const ui32 sourceIdx, const ui32 indexStart, const ui32 recordsCount) const override {
319-
AFL_VERIFY(SourceIdx);
320-
AFL_VERIFY(sourceIdx == *SourceIdx);
321-
if (indexStart >= RecordIndex) {
322-
return true;
331+
if (data.HasOptionalPortionId()) {
332+
PortionId = data.GetOptionalPortionId();
323333
}
324-
AFL_VERIFY(indexStart + recordsCount <= RecordIndex);
325-
return false;
334+
return TConclusionStatus::Success();
326335
}
327336

328337
public:
329338
TNotSortedSimpleScanCursor() = default;
330339

331-
TNotSortedSimpleScanCursor(const ui32 sourceIdx, const ui32 recordIndex)
332-
: SourceIdx(sourceIdx)
333-
, RecordIndex(recordIndex)
340+
TNotSortedSimpleScanCursor(const ui32 sourceIdx, const ui32 recordIndex, const std::optional<ui64>& optionalPortionId)
341+
: ISimpleScanCursor(sourceIdx, recordIndex, optionalPortionId)
334342
{
335343
}
336344
};

ydb/core/tx/columnshard/engines/reader/common_reader/common/accessors_ordering.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class TDataSourceConstructor: public ICursorEntity, public TMoveOnly {
1111
private:
1212
TReplaceKeyAdapter Start;
1313
TReplaceKeyAdapter Finish;
14+
std::optional<ui64> SortingKey;
1415
ui32 SourceIdx = 0;
1516
bool SourceIdxInitialized = false;
1617

@@ -38,9 +39,10 @@ class TDataSourceConstructor: public ICursorEntity, public TMoveOnly {
3839
return std::move(Finish);
3940
}
4041

41-
TDataSourceConstructor(TReplaceKeyAdapter&& start, TReplaceKeyAdapter&& finish)
42+
TDataSourceConstructor(TReplaceKeyAdapter&& start, TReplaceKeyAdapter&& finish, const std::optional<ui64>& specialSortingKey)
4243
: Start(std::move(start))
4344
, Finish(std::move(finish))
45+
, SortingKey(specialSortingKey)
4446
{
4547
}
4648

@@ -82,6 +84,16 @@ class TDataSourceConstructor: public ICursorEntity, public TMoveOnly {
8284
}
8385

8486
bool operator()(const TDataSourceConstructor& l, const TDataSourceConstructor& r) const {
87+
if (l.SortingKey != r.SortingKey) {
88+
if (!!l.SortingKey && !!r.SortingKey) {
89+
return l.SortingKey > r.SortingKey;
90+
}
91+
if (!!l.SortingKey) {
92+
return true;
93+
}
94+
AFL_VERIFY(!!r.SortingKey);
95+
return false;
96+
}
8597
auto cmp = l.Start.Compare(r.Start);
8698
if (cmp == std::partial_ordering::less) {
8799
return false;

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,6 @@ class TExecutionContext {
7474
const std::shared_ptr<NArrow::NSSA::NGraph::NExecution::TExecutionVisitor>& GetExecutionVisitorVerified() const;
7575
};
7676

77-
class IPortionDataSource {
78-
public:
79-
virtual ui64 GetPortionId() const = 0;
80-
virtual ~IPortionDataSource() = default;
81-
};
82-
8377
class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource {
8478
public:
8579
enum class EType {
@@ -301,6 +295,8 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource {
301295
const TFetchedResult& GetStageResult() const;
302296

303297
TFetchedResult& MutableStageResult();
298+
299+
virtual std::optional<ui64> GetPortionIdOptional() const = 0;
304300
};
305301

306302
} // namespace NKikimr::NOlap::NReader::NCommon

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ class IDataSource: public NCommon::IDataSource {
191191
}
192192
};
193193

194-
class TPortionDataSource: public IDataSource, public NCommon::IPortionDataSource {
194+
class TPortionDataSource: public IDataSource {
195195
private:
196196
using TBase = IDataSource;
197197
const TPortionInfo::TConstPtr Portion;
@@ -292,7 +292,7 @@ class TPortionDataSource: public IDataSource, public NCommon::IPortionDataSource
292292
return Portion;
293293
}
294294

295-
ui64 GetPortionId() const override {
295+
std::optional<ui64> GetPortionIdOptional() const override {
296296
return Portion->GetPortionId();
297297
}
298298

ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ TEvRequestFilter::TEvRequestFilter(const TPortionDataSource& source, const std::
88
: ExternalTaskId(source.GetContext()->GetCommonContext()->GetReadMetadata()->GetScanIdentifier())
99
, MinPK(source.GetPortionInfo().IndexKeyStart())
1010
, MaxPK(source.GetPortionInfo().IndexKeyEnd())
11-
, PortionId(source.GetPortionId())
11+
, PortionId(source.GetPortionInfo().GetPortionId())
1212
, RecordsCount(source.GetRecordsCount())
1313
, MaxVersion(source.GetContext()->GetCommonContext()->GetReadMetadata()->GetRequestSnapshot())
1414
, Subscriber(subscriber)

0 commit comments

Comments
 (0)