Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions elfo-network/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use elfo_core::{errors::RequestError, tracing::TraceId, AnyMessage, RequestId};
use elfo_utils::likely;

use crate::codec::format::{
NetworkAddr, NetworkEnvelope, NetworkEnvelopePayload, FLAG_IS_LAST_RESPONSE, KIND_MASK,
KIND_REGULAR, KIND_REQUEST_ALL, KIND_REQUEST_ANY, KIND_RESPONSE_FAILED, KIND_RESPONSE_IGNORED,
KIND_RESPONSE_OK,
NetworkAddr, NetworkEnvelope, NetworkEnvelopePayload, FLAG_IS_LAST_RESPONSE,
FLAG_IS_UNBOUNDED_SEND, KIND_MASK, KIND_REGULAR, KIND_REQUEST_ALL, KIND_REQUEST_ANY,
KIND_RESPONSE_FAILED, KIND_RESPONSE_IGNORED, KIND_RESPONSE_OK,
};

#[derive(Default)]
Expand Down Expand Up @@ -266,5 +266,6 @@ fn do_decode(frame: &mut Cursor<&[u8]>) -> Result<NetworkEnvelope, DecodeError>
recipient,
trace_id,
payload,
bounded: flags & FLAG_IS_UNBOUNDED_SEND == 0,
})
}
8 changes: 6 additions & 2 deletions elfo-network/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use elfo_core::{errors::RequestError, scope, Message};
use elfo_utils::likely;

use crate::codec::format::{
NetworkEnvelope, NetworkEnvelopePayload, FLAG_IS_LAST_RESPONSE, KIND_REGULAR, KIND_REQUEST_ALL,
KIND_REQUEST_ANY, KIND_RESPONSE_FAILED, KIND_RESPONSE_IGNORED, KIND_RESPONSE_OK,
NetworkEnvelope, NetworkEnvelopePayload, FLAG_IS_LAST_RESPONSE, FLAG_IS_UNBOUNDED_SEND,
KIND_REGULAR, KIND_REQUEST_ALL, KIND_REQUEST_ANY, KIND_RESPONSE_FAILED, KIND_RESPONSE_IGNORED,
KIND_RESPONSE_OK,
};

#[derive(Debug, Display, From)]
Expand Down Expand Up @@ -105,6 +106,9 @@ fn do_encode(
if is_last_response {
flags |= FLAG_IS_LAST_RESPONSE;
}
if !envelope.bounded {
flags |= FLAG_IS_UNBOUNDED_SEND;
}
dst.write_u8(flags | kind)?;

// sender
Expand Down
2 changes: 2 additions & 0 deletions elfo-network/src/codec/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use elfo_utils::likely;

// Flags are shifted by 4 bits to the left because of the kind.
pub(crate) const FLAG_IS_LAST_RESPONSE: u8 = 1 << 7;
pub(crate) const FLAG_IS_UNBOUNDED_SEND: u8 = 1 << 6;

pub(crate) const KIND_MASK: u8 = 0xF;
pub(crate) const KIND_REGULAR: u8 = 0;
Expand All @@ -56,6 +57,7 @@ pub(crate) const KIND_RESPONSE_IGNORED: u8 = 5;
#[derive(Debug)]
pub(crate) struct NetworkEnvelope {
pub(crate) sender: NetworkAddr,
pub(crate) bounded: bool,
pub(crate) recipient: NetworkAddr,
pub(crate) trace_id: TraceId,
pub(crate) payload: NetworkEnvelopePayload,
Expand Down
1 change: 1 addition & 0 deletions elfo-network/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod tests {
payload: NetworkEnvelopePayload::Regular {
message: AnyMessage::new(message),
},
bounded: true,
}
}

Expand Down
1 change: 1 addition & 0 deletions elfo-network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ async fn send_regular<M: Message>(
payload: NetworkEnvelopePayload::Regular {
message: AnyMessage::new(message),
},
bounded: false,
};

let send_future = socket.write.send(&envelope);
Expand Down
1 change: 1 addition & 0 deletions elfo-network/src/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ mod tests {
payload: NetworkEnvelopePayload::Regular {
message: AnyMessage::new(TestSocketMessage("a".repeat(i * 10))),
},
bounded: true,
};

feed_frame(&mut client_socket, &envelope);
Expand Down
103 changes: 93 additions & 10 deletions elfo-network/src/worker/flows_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use fxhash::FxHashMap;
use metrics::{decrement_gauge, increment_gauge};
use tracing::{debug, info};

use elfo_core::{addr::NodeNo, Addr, Envelope};
use elfo_core::{addr::NodeNo, errors::RequestError, Addr, Envelope, ResponseToken};

use super::flow_control::RxFlowControl;
use crate::{codec::format::NetworkAddr, protocol::internode};
Expand Down Expand Up @@ -41,11 +41,24 @@ impl Drop for RxFlows {
}
}

#[derive(Debug)]
pub(super) enum RxFlowEvent {
Message {
envelope: Envelope,
bounded: bool,
},
Response {
token: ResponseToken,
response: Result<Envelope, RequestError>,
},
}

struct RxFlowData {
control: RxFlowControl,
/// If actor's mailbox is full, the message is queued here.
/// If actor's mailbox is full, the events are queued here.
/// The second element of the tuple is `true` if message was routed.
queue: Option<VecDeque<(Envelope, bool)>>,
queue: Option<VecDeque<(RxFlowEvent, bool)>>,

/// The number of routed envelopes in `queue`.
routed: i32,
}
Expand Down Expand Up @@ -115,7 +128,7 @@ impl RxFlows {
})
}

pub(super) fn dequeue(&mut self, addr: Addr) -> Option<(Envelope, bool)> {
pub(super) fn dequeue(&mut self, addr: Addr) -> Option<(RxFlowEvent, bool)> {
debug_assert!(addr.is_local());

let flow = self.map.get_mut(&addr)?;
Expand Down Expand Up @@ -173,10 +186,6 @@ pub(super) struct RxFlow<'a> {
}

impl RxFlow<'_> {
pub(super) fn is_stable(&self) -> bool {
self.flow.queue.is_none()
}

pub(super) fn acquire_direct(&mut self, tx_knows: bool) {
self.flow.control.do_acquire(tx_knows);
}
Expand All @@ -191,19 +200,93 @@ impl RxFlow<'_> {
})
}

pub(super) fn enqueue(self, envelope: Envelope, routed: bool) {
/// Enqueue response to the event queue. Returns `Err` if response must be
/// processed in real-time.
pub(super) fn try_enqueue_response(
self,
token: ResponseToken,
response: Result<Envelope, RequestError>,
) -> Result<(), (ResponseToken, Result<Envelope, RequestError>)> {
let Some(queue) = self.flow.queue.as_mut() else {
return Err((token, response));
};

queue.push_back((RxFlowEvent::Response { token, response }, false));

Ok(())
}

/// Enqueue unbounded send. Returns `Err` if message must be processed in real-time.
pub(super) fn try_enqueue_unbounded(
self,
envelope: Envelope,
routed: bool,
) -> Result<(), Envelope> {
let Some(queue) = self.flow.queue.as_mut() else {
return Err(envelope);
};

queue.push_back((
RxFlowEvent::Message {
envelope,
bounded: false,
},
routed,
));

if routed {
self.flow.routed += 1;
}

Ok(())
}

/// Enqueue send. Returns `Err(..)` if there's no pusher.
pub(super) fn try_enqueue_send(
mut self,
envelope: Envelope,
routed: bool,
) -> Result<(), Envelope> {
let Some(queue) = self.flow.queue.as_mut() else {
return Err(envelope);
};

queue.push_back((
RxFlowEvent::Message {
envelope,
bounded: true,
},
routed,
));
self.acquire_direct(!routed);

Ok(())
}

/// Enqueue send. Returns `true` if pusher must be spawned.
pub(super) fn enqueue_send(self, envelope: Envelope, routed: bool) -> bool {
let mut spawn_pusher = false;
let addr = self.addr;

self.flow
.queue
.get_or_insert_with(|| {
info!(addr = %addr, "destination actor is full, queueing");
spawn_pusher = true;
VecDeque::new()
})
.push_back((envelope, routed));
.push_back((
RxFlowEvent::Message {
envelope,
bounded: true,
},
routed,
));

if routed {
self.flow.routed += 1;
}

spawn_pusher
}
}
Loading
Loading