Skip to content

Eventstream Refactor #816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 59 commits into
base: main
Choose a base branch
from
Open

Eventstream Refactor #816

wants to merge 59 commits into from

Conversation

bretambrose
Copy link
Contributor

@bretambrose bretambrose commented Jun 19, 2025

This PR is a substantial rewrite of the eventstream RPC bindings as well as the code-generated clients for eventstream-based services. This refactor was necessitated by a variety of deadlock and race condition problems with the original implementation. The complexity of the original implementation made targeted fixes nearly impossible to apply.

Refactor Goals

  • No blocking in destructors. In order to try and maintain behavioral compatibility with the previous implementation, we try and synchronously simulate the asynchronous events that would happen during a blocking destroy.
  • Simplified synchronization model where
    • User callbacks are never invoked inside a lock
    • C APIs are never invoked inside a lock

Public API Changes

The original implementation exposed a large amount of unnecessary details in the public API. As part of the refactor, we make a number of publicly visible changes that, while technically breaking, we believe should not be user-impacting. We consider a change to be user-impacting if it is a breaking change to a type that is used during service client interaction.

We detail each change below as well as the reasoning why we think making this change is safe. Obviously, if you were mocking out any of these changed type contracts, then they will be breaking.

  • All OperationModelContext subclasses have been made private. These types were used internally by the service model and there is no reason to expose them.
  • ContinuationCallbackData removed. Was not user-facing. Unneeded in refactor
  • ClientContinuationHandler - Public functions that were only for internal use have been removed. Class now useless but has been retained in case users were tracking operations by it.
  • ClientContinuation - Internal type that has been re-implemented as the private type ClientContinuationImpl
  • ClientOperation
    • Constructor type signature has changed - This type is only constructed internally by generated code
    • GetOperationResult API removed - This function could not be called externally without triggering exceptions by multi-consuming a promise's future
    • WithLaunchMode - This function persists but no longer does anything useful. Launch mode is no longer relevant to the processing of operations and was a mistake to include originally.
  • ClientConnection - This is an internal type used by generated service clients.
    • Constructor signature changed.
    • SendPing and SendPingResponse removed.
    • Connect and NewStream signatures changed.
    • bool operator removed

Additional Changes

We now launch an EchoTest RPC server in CI and run a much larger suite of tests against it.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Bret Ambrose added 30 commits April 11, 2025 13:55
@bretambrose bretambrose changed the title Eventstream refactor2 Eventstream Refactor Jun 19, 2025
@@ -493,13 +416,16 @@ namespace Aws
private:
union AWS_EVENTSTREAMRPC_API OperationResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd refactor OperationResult (e.g. replace it with Crt::Variant), because this is too error-prone in its current implementation. For example, the move assignment operator for TaggedResult can potentially cause a memory leak, because it doesn't call destructors for m_operationResult.m_response / m_operationResult.m_error:

TaggedResult tr;
tr = std::move(other_tr);
tr = std::move(one_more_tr);  // m_response or m_error from other_tr leak here

MessageType messageType,
uint32_t messageFlags,
OnMessageFlushCallback onMessageFlushCallback) noexcept;
std::shared_ptr<ClientContinuationImpl> NewStream() noexcept;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having ClientContinuationImpl in public API doesn't seem right. The ClientConnection users won't be able to do anything meaningful with this method, unless they have access to this private class, which basically limits the set of possible users of this method to only ClientOperation.

However, I don't see a good solution right away. In essence, ClientConnection creates a new instance of ClientOperation, as it's a thin wrapper around ClientContinuationImpl, so from the technical perspective, a new method like ClientOperation createNewClientOperation(...); would make more sense. But it'll probably look ugly in the ClientOperation kids. And it looks weird from logic perspective - connection creates operation.
A better alternative might be to make NewStream() private and mark ClientOperation as a friend of ClientConnection.

Comment on lines +572 to +574
* @deprecated This no longer does anything useful. Launch policy was added because of a
* mistake in the implementation that was attempting to chain two promises together.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivial: Maybe remove all WithLaunchMode calls from our codebase?

m_sharedState.m_onFlushPromise = std::move(flushPromise);
}

~OnMessageFlushCallbackContainer() = default;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivial: This destructor can be safely removed

{
}

~OnMessageFlushCallbackContainerWrapper() = default;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivial: This destructor can be safely removed

m_valueByteBuf =
Crt::ByteBufNewCopy(lhs.m_allocator, lhs.m_valueByteBuf.buffer, lhs.m_valueByteBuf.len);
m_underlyingHandle = lhs.m_underlyingHandle;
m_underlyingHandle.header_value.variable_len_val = m_valueByteBuf.buffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check for AWS_EVENT_STREAM_HEADER_STRING and AWS_EVENT_STREAM_HEADER_BYTE_BUF for this assignment to make sense?

m_sharedState.m_desiredState = ClientState::Connected;
m_sharedState.m_currentState = ClientState::PendingConnect;
m_sharedState.m_callbackContext = {
std::move(disconnectCallback), std::move(errorCallback), std::move(connectionSuccessCallback)};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The corresponding ctor takes const refs, so std::move just adding noise here

Comment on lines +998 to +1000
AWS_LOGF_INFO(
AWS_LS_COMMON_GENERAL, "EventStreamClient2 - Host name %d bytes long", (int)hostName.length());
AWS_LOGF_INFO(AWS_LS_COMMON_GENERAL, "EventStreamClient2 - Host name : %s", hostName.c_str());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is logged above already


bool ClientConnectionImpl::IsOpen() const noexcept
{
std::lock_guard<std::mutex> lock(const_cast<ClientConnectionImpl *>(this)->m_sharedStateLock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make m_sharedStateLock mutable maybe?

*
* (1) Synchronous failure to initiate a connection
* (2) Connection setup failure (async callback)
* (3) Connnection (that was successfully established) shutdown completion (async callback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in Connection

Comment on lines +1031 to 1036
msg_args->payload = messageAmendment.GetPayload().has_value()
? (aws_byte_buf *)(&(messageAmendment.GetPayload().value()))
: nullptr;
msg_args->message_type = AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT;
msg_args->message_flags = 0U;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messageAmendment.m_payload will be destroyed on the block end, so msg_args->payload will cause use-after-free

Comment on lines +2190 to +2200
if (contentHeader->GetValueAsString(contentType) && contentType != CONTENT_TYPE_APPLICATION_JSON)
{
/* Missing required content type header. */
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"The content type (%s) header was specified with an unsupported value (%s).",
CONTENT_TYPE_HEADER,
contentType.c_str());
result.m_statusCode = EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE;
return result;
}
Copy link
Contributor

@sfod sfod Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should also fail if the content type header is not of string type.

ContinuationSharedState();

ContinuationStateType m_currentState;
ContinuationStateType m_desiredState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_desiredState seems never used

}

void StreamResponseHandler::OnStreamEvent(Crt::ScopedResource<AbstractShapeBase> response) {}
// We use a task to release the C continuation to make it impossible to trigger the termination callback
// in a callstack that includes ClientContinationImpl methods or state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in ClientContinationImpl

Comment on lines +2331 to +2332
default:
AWS_FATAL_ASSERT(false);
Copy link
Contributor

@sfod sfod Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert can be fired when this function got Response and ClientContinuationImpl::Close is called in parallel.
This happens when Close sets m_sharedState.m_currentState = ContinuationStateType::PendingClose, which leads to isResponse == false.
I think checking for PendingClose when determining the value for isResponse should resolve this issue.

Crt::List<EventStreamHeader> continuationMessageHeaders;
for (size_t i = 0; i < rawMessage->headers_count; ++i)
{
continuationMessageHeaders.emplace_back(EventStreamHeader(rawMessage->headers[i], m_allocator));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventStreamHeader explicitly specified here causes unnecessary move constructor call (which actually copies data). Not big deal, but if it can be easily avoided, why not:

continuationMessageHeaders.emplace_back(rawMessage->headers[i], m_allocator);

There are few other places where type shouldn't be used with emplace_back.

You can take a look at godbolt example

Comment on lines +2284 to +2288
// Horribly awkward cast due to the infuriating original API design
Crt::Allocator *allocator = m_allocator;
auto errorResponse = Crt::ScopedResource<OperationError>(
static_cast<OperationError *>(result.m_message.value().m_shape.release()),
[allocator](OperationError *shape) { Crt::Delete(shape, allocator); });
Copy link
Contributor

@sfod sfod Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cast from ScopedResource<Base> to ScopedResource<Derived> indeed looks awkward. Similar cast happens in couple other places in this source. And in lot of places in the generated code.

We can define CastToBase and CastToDerived for ScopedResource in aws-crt-cpp like this:

template <
    typename Derived,
    typename Base,
    typename std::enable_if<std::is_base_of<Base, Derived>::value, bool>::type = true>
ScopedResource<Base> CastToBase(ScopedResource<Derived> derived)
{
    const auto &deleter = derived.get_deleter();
    return ScopedResource<Base>(
        derived.release(), [deleter](Base *base) { deleter(static_cast<Derived *>(base)); });
}

template <
    typename Base,
    typename Derived,
    typename std::enable_if<std::is_base_of<Base, Derived>::value, bool>::type = true>
ScopedResource<Derived> CastToDerived(ScopedResource<Base> base)
{
    return ScopedResource<Derived>(static_cast<Derived *>(base.release()), base.get_deleter());
}

// plus specializations for std::is_base_of<Base, Derived> == false, to print human-readable errors
// maybe also add second parameter for a custom deleter

and then these awkward blocks will transform to something kinda still verbose, but more readable:

auto errorResponse = Crt::CastToDerived<AbstractShapeBase, OperationError>(
    std::move(result.m_message->m_shape));

and

auto errorShape = m_operationModelContext->AllocateOperationErrorFromPayload(
    modelName, payloadStringView, m_allocator);
auto shape = Crt::CastToBase<OperationError, AbstractShapeBase>(std::move(errorShape));
result.m_message = MessageDeserialization{
    EventStreamMessageRoutingType::Error,
    std::move(shape)
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants