Skip to content

Commit d1c0de7

Browse files
authored
Merge 5cb08cd into d8e4b72
2 parents d8e4b72 + 5cb08cd commit d1c0de7

File tree

3 files changed

+120
-5
lines changed

3 files changed

+120
-5
lines changed

ydb/core/persqueue/pqrb/read_balancer__balancing.cpp

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,8 @@ void TPartitionFamily::InactivatePartition(ui32 partitionId) {
386386
}
387387

388388
void TPartitionFamily::ChangePartitionCounters(ssize_t active, ssize_t inactive) {
389-
Y_VERIFY_DEBUG((ssize_t)ActivePartitionCount + active >= 0);
390-
Y_VERIFY_DEBUG((ssize_t)InactivePartitionCount + inactive >= 0);
389+
Y_VERIFY_DEBUG((ssize_t)ActivePartitionCount + active >= 0, "ActivePartitionCount: %lu, active: %ld", ActivePartitionCount, active);
390+
Y_VERIFY_DEBUG((ssize_t)InactivePartitionCount + inactive >= 0, "InactivePartitionCount: %lu, inactive: %ld", InactivePartitionCount, inactive);
391391

392392
ActivePartitionCount += active;
393393
InactivePartitionCount += inactive;
@@ -1155,11 +1155,12 @@ void TConsumer::FinishReading(TEvPersQueue::TEvReadingPartitionFinishedRequest::
11551155

11561156
auto& partition = Partitions[partitionId];
11571157

1158-
if (partition.SetFinishedState(r.GetScaleAwareSDK(), r.GetStartedReadingFromEndOffset())) {
1158+
const bool wasInactive = partition.IsInactive();
1159+
if (partition.SetFinishedState(r.GetScaleAwareSDK(), r.GetStartedReadingFromEndOffset()) || wasInactive) {
11591160
PQ_LOG_D("Reading of the partition " << partitionId << " was finished by " << r.GetConsumer()
11601161
<< ", firstMessage=" << r.GetStartedReadingFromEndOffset() << ", " << GetSdkDebugString0(r.GetScaleAwareSDK()));
11611162

1162-
if (ProccessReadingFinished(partitionId, false, ctx)) {
1163+
if (ProccessReadingFinished(partitionId, wasInactive, ctx)) {
11631164
ScheduleBalance(ctx);
11641165
}
11651166
} else if (!partition.IsInactive()) {
@@ -1835,6 +1836,62 @@ void TBalancer::ProcessPendingStats(const TActorContext& ctx) {
18351836
PendingUpdates.clear();
18361837
}
18371838

1839+
<<<<<<< HEAD
1840+
=======
1841+
void TBalancer::Handle(TEvPersQueue::TEvBalancingSubscribe::TPtr& ev, const TActorContext& ctx) {
1842+
auto& record = ev->Get()->Record;
1843+
PQ_LOG_D("Handle TEvPersQueue::TEvBalancingSubscribe " << record.ShortDebugString());
1844+
1845+
auto sender = ActorIdFromProto(record.GetSourceActor());
1846+
auto status = Consumers.contains(record.GetConsumer()) ?
1847+
NKikimrPQ::TEvBalancingSubscribeNotify::BALANCING : NKikimrPQ::TEvBalancingSubscribeNotify::FREE;
1848+
Notify(sender, record.GetConsumer(), status, ctx);
1849+
1850+
Subscriptions[ev->Sender].emplace_back(std::move(sender), std::move(*record.MutableConsumer()));
1851+
}
1852+
1853+
void TBalancer::Handle(TEvPersQueue::TEvBalancingUnsubscribe::TPtr& ev, const TActorContext&) {
1854+
auto& record = ev->Get()->Record;
1855+
PQ_LOG_D("Handle TEvPersQueue::TEvBalancingUnsubscribe " << record.ShortDebugString());
1856+
1857+
auto sender = ActorIdFromProto(record.GetSourceActor());
1858+
auto& consumer = record.GetConsumer();
1859+
1860+
auto it = Subscriptions.find(ev->Sender);
1861+
if (it == Subscriptions.end()) {
1862+
return;
1863+
}
1864+
1865+
std::vector<TSubscription>& subscriptions = it->second;
1866+
std::vector<TSubscription> actualSubscriptions;
1867+
actualSubscriptions.resize(subscriptions.size());
1868+
1869+
for (auto& [existsSender, existsConsumer] : subscriptions) {
1870+
if (sender == existsSender && consumer == existsConsumer) {
1871+
continue;
1872+
}
1873+
1874+
actualSubscriptions.emplace_back(std::move(existsSender), std::move(existsConsumer));
1875+
}
1876+
1877+
subscriptions = std::move(actualSubscriptions);
1878+
}
1879+
1880+
void TBalancer::Notify(const TString& consumer, NKikimrPQ::TEvBalancingSubscribeNotify::EStatus status, const TActorContext& ctx) {
1881+
for (auto& [_, subscriptions] : Subscriptions) {
1882+
for (auto& subscription : subscriptions) {
1883+
if (subscription.Consumer == consumer) {
1884+
Notify(subscription.Sender, consumer, status, ctx);
1885+
}
1886+
}
1887+
}
1888+
}
1889+
1890+
void TBalancer::Notify(const TActorId subscriber, const TString& consumer, NKikimrPQ::TEvBalancingSubscribeNotify::EStatus status, const TActorContext& ctx) {
1891+
ctx.Send(subscriber, new TEvPersQueue::TEvBalancingSubscribeNotify(TabletGeneration(), ++NotifyCookie, TopicPath(), consumer, status));
1892+
}
1893+
1894+
>>>>>>> 45f5616be25 (EXT-1793 Fix when reading from the end of uncommitted partition (#30854))
18381895
TString TBalancer::LogPrefix() const {
18391896
return TStringBuilder() << "[" << TopicActor.TabletID() << "][" << Topic() << "] ";
18401897
}

ydb/core/persqueue/pqrb/read_balancer__balancing.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ struct TConsumer;
1616
class TBalancer;
1717

1818
struct TPartition {
19-
// Client had commited rad offset equals EndOffset of the partition
19+
// Client had commited read offset equals EndOffset of the partition
2020
bool Commited = false;
2121
// ReadSession reach EndOffset of the partition
2222
bool ReadingFinished = false;

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,64 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
622622
readSession2->Close();
623623
}
624624

625+
Y_UNIT_TEST(PartitionSplit_ManySessions_NoCommits_AutoscaleAwareSDK) {
626+
TTopicSdkTestSetup setup = CreateSetup();
627+
TTopicClient client = setup.MakeClient();
628+
629+
TCreateTopicSettings createSettings;
630+
createSettings
631+
.BeginConfigurePartitioningSettings()
632+
.MinActivePartitions(1)
633+
.MaxActivePartitions(100)
634+
.BeginConfigureAutoPartitioningSettings()
635+
.UpUtilizationPercent(2)
636+
.DownUtilizationPercent(1)
637+
.StabilizationWindow(TDuration::Seconds(2))
638+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
639+
.EndConfigureAutoPartitioningSettings()
640+
.EndConfigurePartitioningSettings();
641+
642+
TConsumerSettings<TCreateTopicSettings> consumers(createSettings, setup.GetConsumerName(TEST_CONSUMER));
643+
createSettings.AppendConsumers(consumers);
644+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
645+
646+
auto msg = TString(1_MB, 'a');
647+
648+
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, TString{TEST_TOPIC}, false);
649+
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, TString{TEST_TOPIC}, false);
650+
651+
{
652+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 1)));
653+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 2)));
654+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 3)));
655+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 4)));
656+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 5)));
657+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 6)));
658+
Sleep(TDuration::Seconds(15));
659+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
660+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
661+
}
662+
663+
auto readSession1 = CreateTestReadSession({ .Name="Session-1", .Setup=setup, .Sdk = SdkVersion::Topic, .ExpectedMessagesCount = 6, .AutoCommit = false, .AutoPartitioningSupport = true });
664+
readSession1->WaitAndAssertPartitions({0, 1, 2}, "Must read all partition");
665+
readSession1->WaitAllMessages();
666+
667+
auto readSession2 = CreateTestReadSession({ .Name="Session-2", .Setup=setup, .Sdk = SdkVersion::Topic, .ExpectedMessagesCount = 0, .AutoCommit = false, .AutoPartitioningSupport = true });
668+
669+
Sleep(TDuration::Seconds(1));
670+
671+
readSession1->Close();
672+
673+
auto yetAnotherReadSession1 = CreateTestReadSession({ .Name="YetAnotherSession-1", .Setup=setup, .Sdk = SdkVersion::Topic, .ExpectedMessagesCount = 0, .AutoCommit = false, .AutoPartitioningSupport = true });
674+
yetAnotherReadSession1->SetOffset(0, 6); // read from end offset
675+
676+
Sleep(TDuration::Seconds(1));
677+
678+
readSession2->Close();
679+
680+
yetAnotherReadSession1->WaitAndAssertPartitions({0, 1, 2}, "Must read all children partitions");
681+
}
682+
625683
Y_UNIT_TEST(ControlPlane_CreateAlterDescribe) {
626684
auto autoscalingTestTopic = "autoscalit-topic";
627685
TTopicSdkTestSetup setup = CreateSetup();

0 commit comments

Comments
 (0)