Skip to content

Commit c7955e0

Browse files
kuzin57Roman Kuzin
andauthored
LOGBROKER-11013 Better fifo (#31159)
Co-authored-by: Roman Kuzin <[email protected]>
1 parent 7a46c62 commit c7955e0

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ bool TConsumerActor::FetchMessagesIfNeeded() {
676676
LOG_D("Skip fetch: infly limit exceeded");
677677
return false;
678678
}
679-
if (metrics.InflightMessageCount >= Storage->MinMessages && metrics.UnprocessedMessageCount >= metrics.LockedMessageCount * 2) {
679+
if (!Config.GetKeepMessageOrder() && metrics.InflightMessageCount >= Storage->MinMessages && metrics.UnprocessedMessageCount >= metrics.LockedMessageCount * 2) {
680680
LOG_D("Skip fetch: there are enough messages. InflightMessageCount=" << metrics.InflightMessageCount
681681
<< ", UnprocessedMessageCount=" << metrics.UnprocessedMessageCount
682682
<< ", LockedMessageCount=" << metrics.LockedMessageCount);

ydb/public/lib/ydb_cli/commands/sqs_workload/sqs_json/sqs_json_client.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
#include <aws/core/http/HttpClientFactory.h>
66
#include <aws/core/utils/Array.h>
77
#include <aws/core/utils/HashingUtils.h>
8-
#include <aws/core/utils/UUID.h>
98
#include <aws/core/utils/memory/stl/SimpleStringStream.h>
109
#include <aws/sqs/model/DeleteMessageBatchRequest.h>
1110
#include <aws/sqs/model/MessageSystemAttributeNameForSends.h>
1211
#include <aws/sqs/model/ReceiveMessageRequest.h>
1312
#include <aws/sqs/model/SendMessageBatchRequest.h>
1413
#include <library/cpp/string_utils/url/url.h>
14+
#include <util/generic/guid.h>
1515
#include <util/generic/string.h>
1616
#include <util/string/cast.h>
1717

@@ -119,10 +119,14 @@ namespace NYdb::NConsoleClient {
119119
auto credentialsProvider =
120120
Aws::MakeShared<Aws::Auth::SimpleAWSCredentialsProvider>(
121121
"credentials-provider", credentials);
122+
const Aws::String signingRegion = clientConfiguration.region.empty()
123+
? Aws::String("ru-central1")
124+
: clientConfiguration.region;
125+
122126
Signer = Aws::MakeShared<Aws::Client::AWSAuthV4Signer>(
123-
"aws-auth-v4-signer", credentialsProvider, kServiceName, clientConfiguration.region,
127+
"aws-auth-v4-signer", credentialsProvider, kServiceName, signingRegion,
124128
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always, true,
125-
Aws::Auth::AWSSigningAlgorithm::ASYMMETRIC_SIGV4);
129+
Aws::Auth::AWSSigningAlgorithm::SIGV4);
126130
}
127131

128132
void TSQSJsonClient::AddHeaders(
@@ -135,8 +139,10 @@ namespace NYdb::NConsoleClient {
135139
request->SetHeaderValue(kContentTypeHeader, kContentTypeValue);
136140
request->SetHeaderValue(kAmzSdkRequestHeader, kAmzSdkRequestValue);
137141
request->SetHeaderValue(kXAmzAPIVersionHeader, kXAmzAPIVersionValue);
138-
request->SetHeaderValue(kAmzSdkInvocationIdHeader,
139-
Aws::Utils::UUID::RandomUUID());
142+
const TString invocationId = CreateGuidAsString();
143+
request->SetHeaderValue(
144+
kAmzSdkInvocationIdHeader,
145+
Aws::String(invocationId.c_str(), invocationId.size()));
140146
}
141147

142148
Aws::Utils::Json::JsonValue TSQSJsonClient::ReadResponseBody(

ydb/public/lib/ydb_cli/commands/sqs_workload/sqs_workload_writer.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,17 @@ namespace NYdb::NConsoleClient {
2222
}
2323

2424
Aws::Vector<Aws::SQS::Model::SendMessageBatchRequestEntry>
25-
CreateSendMessageBatchRequestEntries(const TSqsWorkloadWriterParams& params) {
25+
CreateSendMessageBatchRequestEntries(const TSqsWorkloadWriterParams& params, ui32 messageGroupID, ui32 messageDeduplicationID) {
2626
Aws::Vector<Aws::SQS::Model::SendMessageBatchRequestEntry> entries;
2727
for (ui32 i = 0; i < params.BatchSize; ++i) {
2828
Aws::String messageBody(params.MessageSize, 'a');
2929
Aws::SQS::Model::SendMessageBatchRequestEntry entry;
3030
entry.WithMessageBody(messageBody).WithId(fmt::format("{}", i));
3131
if (params.GroupsAmount > 0) {
32-
auto messageGroupID = std::rand() % params.GroupsAmount;
3332
entry.WithMessageGroupId(fmt::format("{}", messageGroupID));
3433
}
3534

3635
if (params.MaxUniqueMessages > 0) {
37-
auto messageDeduplicationID = std::rand() % params.MaxUniqueMessages;
3836
entry.WithMessageDeduplicationId(std::format("{}", messageDeduplicationID));
3937
}
4038

@@ -70,10 +68,14 @@ namespace NYdb::NConsoleClient {
7068

7169
void TSqsWorkloadWriter::RunLoop(const TSqsWorkloadWriterParams& params,
7270
TInstant endTime) {
71+
std::mt19937_64 rng(std::random_device{}());
72+
std::uniform_int_distribution<ui32> messageGroupsDistribution(0, params.GroupsAmount - 1);
73+
std::uniform_int_distribution<ui32> messageDeduplicationDistribution(0, params.MaxUniqueMessages - 1);
74+
7375
while (Now() < endTime && !params.ErrorFlag->load()) {
7476
Aws::SQS::Model::SendMessageBatchRequest sendMessageBatchRequest;
7577
sendMessageBatchRequest.SetQueueUrl(params.QueueUrl.c_str());
76-
sendMessageBatchRequest.SetEntries(CreateSendMessageBatchRequestEntries(params));
78+
sendMessageBatchRequest.SetEntries(CreateSendMessageBatchRequestEntries(params, messageGroupsDistribution(rng), messageDeduplicationDistribution(rng)));
7779
sendMessageBatchRequest.SetAdditionalCustomHeaderValue(
7880
AMZ_TARGET_HEADER, SQS_TARGET_SEND_MESSAGE_BATCH);
7981

0 commit comments

Comments
 (0)