Skip to content

Commit 0422d61

Browse files
committed
network_filter: Pass connection close from upstream to downstream
Monitor connection close events on the upstream filter and close the downstream connection if both connections have the same 5-tuple. This fixes the issue where a newly reopened upstream connection with the same 5-tuple causes the downstream connection to become non-functional. Signed-off-by: Jarno Rajahalme <[email protected]>
1 parent e5a8dbb commit 0422d61

File tree

2 files changed

+76
-9
lines changed

2 files changed

+76
-9
lines changed

cilium/network_filter.cc

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ Config::Config(const ::cilium::NetworkFilter& config, bool is_upstream,
115115
if (config.proxylib().length() > 0) {
116116
proxylib_ = std::make_shared<Cilium::GoFilter>(config.proxylib(), config.proxylib_params());
117117
}
118+
119+
ENVOY_LOG(debug, "cilium.network: {} filter configured",
120+
is_upstream_ ? "upstream" : "downstream");
118121
}
119122

120123
void Config::log(Cilium::AccessLog::Entry& entry, ::cilium::EntryType type) {
@@ -123,6 +126,61 @@ void Config::log(Cilium::AccessLog::Entry& entry, ::cilium::EntryType type) {
123126
}
124127
}
125128

129+
void Instance::onEvent(Network::ConnectionEvent event) {
130+
auto& conn = callbacks_->connection();
131+
switch (event) {
132+
case Network::ConnectionEvent::LocalClose:
133+
case Network::ConnectionEvent::RemoteClose:
134+
// close downstream connection when upstream is closed, but only if the original source address
135+
// and port are used on the upstream side.
136+
if (config_->is_upstream_) {
137+
auto& stream_info = conn.streamInfo();
138+
139+
// check if the downstream connection exists
140+
const auto downstream_connection_fs =
141+
stream_info.filterState()->getDataMutable<Network::Cilium::DownstreamConnection>(
142+
Network::Cilium::DownstreamConnection::key());
143+
if (downstream_connection_fs) {
144+
Network::Connection* ds_conn = downstream_connection_fs->connection_;
145+
if (ds_conn) {
146+
// check if upstream and downstream connections have the same source and destination
147+
// addresses, respectively
148+
if (*conn.connectionInfoProvider().remoteAddress() ==
149+
*ds_conn->connectionInfoProvider().localAddress() &&
150+
*conn.connectionInfoProvider().localAddress() ==
151+
*ds_conn->connectionInfoProvider().remoteAddress()) {
152+
ENVOY_CONN_LOG(
153+
debug,
154+
"Upstream connection closed, closing downstream connection due to the same 5-tuple",
155+
conn);
156+
ds_conn->close(Network::ConnectionCloseType::FlushWrite);
157+
} else {
158+
ENVOY_CONN_LOG(debug,
159+
"Upstream connection closed, but it has different 5-tuple, downstream "
160+
"connection not closed (src: {}/{}, dst: {}/{})",
161+
conn, ds_conn->connectionInfoProvider().remoteAddress()->asStringView(),
162+
conn.connectionInfoProvider().localAddress()->asStringView(),
163+
ds_conn->connectionInfoProvider().localAddress()->asStringView(),
164+
conn.connectionInfoProvider().remoteAddress()->asStringView());
165+
}
166+
} else {
167+
ENVOY_CONN_LOG(debug, "Upstream connection closed, downstream connection == nullptr",
168+
conn);
169+
}
170+
} else {
171+
ENVOY_CONN_LOG(
172+
debug, "Upstream connection closed, but no downstream connection filter state found",
173+
conn);
174+
}
175+
} else {
176+
ENVOY_CONN_LOG(debug, "Downstream filter ignoring connection close event", conn);
177+
}
178+
break;
179+
default:
180+
break;
181+
}
182+
}
183+
126184
bool Instance::enforceNetworkPolicy(const Cilium::CiliumPolicyFilterState* policy_fs,
127185
Cilium::CiliumDestinationFilterState* dest_fs,
128186
uint32_t destination_identity,
@@ -138,7 +196,7 @@ bool Instance::enforceNetworkPolicy(const Cilium::CiliumPolicyFilterState* polic
138196

139197
// Is there a pod egress policy?
140198
bool use_proxy_lib = false;
141-
if (policy_fs->pod_ip_.length() > 0) {
199+
if (!policy_fs->pod_ip_.empty()) {
142200
if (!policy_fs->enforcePodNetworkPolicy(conn, destination_identity, destination_port_, sni,
143201
use_proxy_lib, l7proto_)) {
144202
log_entry_.initFromConnection(policy_fs->pod_ip_, policy_fs->proxy_id_, false,
@@ -152,7 +210,7 @@ bool Instance::enforceNetworkPolicy(const Cilium::CiliumPolicyFilterState* polic
152210
}
153211

154212
// Is there an Ingress policy?
155-
if (policy_fs->ingress_policy_name_.length() > 0) {
213+
if (!policy_fs->ingress_policy_name_.empty()) {
156214
log_entry_.initFromConnection(policy_fs->ingress_policy_name_, policy_fs->proxy_id_, false,
157215
policy_fs->source_identity_,
158216
stream_info.downstreamAddressProvider().remoteAddress(),
@@ -186,16 +244,18 @@ bool Instance::enforceNetworkPolicy(const Cilium::CiliumPolicyFilterState* polic
186244
}
187245

188246
Network::FilterStatus Instance::onNewConnection() {
189-
// Upstream handling happens in onDestinationSelected() below.
190-
if (config_->is_upstream_) {
191-
return Network::FilterStatus::Continue;
192-
}
193-
194247
// If there is no upstream filter, onDestinationSelected for the upstream connection
195248
// will be called on the downstream filter instead, but after this call.
196249

197250
auto& conn = callbacks_->connection();
198-
ENVOY_CONN_LOG(debug, "cilium.network: onNewConnection (downstream)", conn);
251+
ENVOY_CONN_LOG(debug, "cilium.network: onNewConnection ({})", conn,
252+
config_->is_upstream_ ? "upstream" : "downstream");
253+
254+
// Upstream handling happens in onDestinationSelected() below.
255+
if (config_->is_upstream_) {
256+
conn.addConnectionCallbacks(*this);
257+
return Network::FilterStatus::Continue;
258+
}
199259

200260
// Buffer data until proxylib policy is available, if configured with proxylib
201261
if (config_->proxylib_.get() != nullptr) {

cilium/network_filter.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ using ConfigSharedPtr = std::shared_ptr<Config>;
5353
/**
5454
* Implementation of a Cilium network filter.
5555
*/
56-
class Instance : public Network::Filter, Logger::Loggable<Logger::Id::filter> {
56+
class Instance : public Network::Filter,
57+
public Network::ConnectionCallbacks,
58+
Logger::Loggable<Logger::Id::filter> {
5759
public:
5860
Instance(const ConfigSharedPtr& config) : config_(config) {}
5961

@@ -70,6 +72,11 @@ class Instance : public Network::Filter, Logger::Loggable<Logger::Id::filter> {
7072
// Network::WriteFilter
7173
Network::FilterStatus onWrite(Buffer::Instance&, bool end_stream) override;
7274

75+
// Network::ConnectionCallbacks
76+
void onEvent(Network::ConnectionEvent event) override;
77+
void onAboveWriteBufferHighWatermark() override {}
78+
void onBelowWriteBufferLowWatermark() override {}
79+
7380
private:
7481
// helper to be used either directly from onNewConnection (no L7 LB),
7582
// or from upstream callback (l7 lb)

0 commit comments

Comments
 (0)