Skip to content

Commit a529a85

Browse files
committed
[EventPipe] Close user_events session upon continuation_stream closure
1 parent e8a2546 commit a529a85

File tree

6 files changed

+97
-29
lines changed

6 files changed

+97
-29
lines changed

src/native/eventpipe/ds-eventpipe-protocol.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,7 @@ eventpipe_protocol_helper_collect_tracing (
920920
payload->serialization_format,
921921
payload->rundown_keyword,
922922
payload->stackwalk_requested,
923-
payload->session_type == EP_SESSION_TYPE_IPCSTREAM ? ds_ipc_stream_get_stream_ref (stream) : NULL,
923+
ds_ipc_stream_get_stream_ref (stream),
924924
NULL,
925925
NULL,
926926
user_events_data_fd);

src/native/eventpipe/ep-session.c

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ EP_RT_DEFINE_THREAD_FUNC (streaming_thread)
9191
ep_rt_thread_params_t *thread_params = (ep_rt_thread_params_t *)data;
9292

9393
EventPipeSession *const session = (EventPipeSession *)thread_params->thread_params;
94-
if (session->session_type != EP_SESSION_TYPE_IPCSTREAM && session->session_type != EP_SESSION_TYPE_FILESTREAM)
94+
if (!ep_session_type_uses_streaming_thread (session->session_type))
9595
return 1;
9696

9797
if (!thread_params->thread || !ep_rt_thread_has_started (thread_params->thread))
@@ -100,28 +100,44 @@ EP_RT_DEFINE_THREAD_FUNC (streaming_thread)
100100
session->streaming_thread = thread_params->thread;
101101

102102
bool success = true;
103-
ep_rt_wait_event_handle_t *wait_event = ep_session_get_wait_event (session);
104103

105104
ep_rt_volatile_store_uint32_t (&session->started, 1);
106105

107106
EP_GCX_PREEMP_ENTER
108-
while (ep_session_get_streaming_enabled (session)) {
109-
bool events_written = false;
110-
if (!ep_session_write_all_buffers_to_file (session, &events_written)) {
111-
success = false;
112-
break;
113-
}
107+
if (ep_session_type_uses_buffer_manager (session->session_type)) {
108+
ep_rt_wait_event_handle_t *wait_event = ep_session_get_wait_event (session);
109+
while (ep_session_get_streaming_enabled (session)) {
110+
bool events_written = false;
111+
if (!ep_session_write_all_buffers_to_file (session, &events_written)) {
112+
success = false;
113+
break;
114+
}
115+
116+
if (!events_written) {
117+
// No events were available, sleep until more are available
118+
ep_rt_wait_event_wait (wait_event, EP_INFINITE_WAIT, false);
119+
}
114120

115-
if (!events_written) {
116-
// No events were available, sleep until more are available
117-
ep_rt_wait_event_wait (wait_event, EP_INFINITE_WAIT, false);
121+
// Wait until it's time to sample again.
122+
const uint32_t timeout_ns = 100000000; // 100 msec.
123+
ep_rt_thread_sleep (timeout_ns);
118124
}
125+
} else if (session->session_type == EP_SESSION_TYPE_USEREVENTS) {
126+
// User events session, write all user events tracepoints to the file.
127+
while (ep_session_get_streaming_enabled (session)) {
128+
EP_ASSERT (session->continuation_stream != NULL);
129+
if (ep_ipc_continuation_stream_connection_closed (session->continuation_stream)) {
130+
success = false;
131+
break;
132+
}
119133

120-
// Wait until it's time to sample again.
121-
const uint32_t timeout_ns = 100000000; // 100 msec.
122-
ep_rt_thread_sleep (timeout_ns);
134+
// Wait until it's time to sample again.
135+
const uint32_t timeout_ns = 100000000; // 100 msec.
136+
ep_rt_thread_sleep (timeout_ns);
137+
}
138+
} else {
139+
EP_UNREACHABLE ("Unsupported session type for streaming thread.");
123140
}
124-
125141
session->streaming_thread = NULL;
126142
ep_rt_wait_event_set (&session->rt_thread_shutdown_event);
127143
EP_GCX_PREEMP_EXIT
@@ -159,19 +175,19 @@ void
159175
session_create_streaming_thread (EventPipeSession *session)
160176
{
161177
EP_ASSERT (session != NULL);
162-
EP_ASSERT (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM);
178+
EP_ASSERT (ep_session_type_uses_streaming_thread (session->session_type));
163179

164180
ep_requires_lock_held ();
165181

166182
ep_session_set_streaming_enabled (session, true);
167183
ep_rt_wait_event_alloc (&session->rt_thread_shutdown_event, true, false);
168184
if (!ep_rt_wait_event_is_valid (&session->rt_thread_shutdown_event))
169-
EP_UNREACHABLE ("Unable to create stream flushing thread shutdown event.");
185+
EP_UNREACHABLE ("Unable to create streaming thread shutdown event.");
170186

171187
#ifndef PERFTRACING_DISABLE_THREADS
172188
ep_rt_thread_id_t thread_id = ep_rt_uint64_t_to_thread_id_t (0);
173189
if (!ep_rt_thread_create ((void *)streaming_thread, (void *)session, EP_THREAD_TYPE_SESSION, &thread_id))
174-
EP_UNREACHABLE ("Unable to create stream flushing thread.");
190+
EP_UNREACHABLE ("Unable to create streaming thread.");
175191
#else
176192
ep_session_inc_ref (session);
177193
ep_rt_volatile_store_uint32_t (&session->started, 1);
@@ -183,18 +199,19 @@ static
183199
void
184200
session_disable_streaming_thread (EventPipeSession *session)
185201
{
186-
EP_ASSERT (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM);
202+
EP_ASSERT (ep_session_type_uses_streaming_thread (session->session_type));
187203
EP_ASSERT (ep_session_get_streaming_enabled (session));
188204

189205
EP_ASSERT (!ep_rt_process_detach ());
190-
EP_ASSERT (session->buffer_manager != NULL);
206+
EP_ASSERT (!ep_session_type_uses_buffer_manager (session->session_type) || session->buffer_manager != NULL);
191207

192208
// The streaming thread will watch this value and exit
193209
// when profiling is disabled.
194210
ep_session_set_streaming_enabled (session, false);
195211

196212
// Thread could be waiting on the event that there is new data to read.
197-
ep_rt_wait_event_set (ep_buffer_manager_get_rt_wait_event_ref (session->buffer_manager));
213+
if (ep_session_type_uses_buffer_manager (session->session_type))
214+
ep_rt_wait_event_set (ep_buffer_manager_get_rt_wait_event_ref (session->buffer_manager));
198215

199216
// Wait for the streaming thread to clean itself up.
200217
ep_rt_wait_event_handle_t *rt_thread_shutdown_event = &session->rt_thread_shutdown_event;
@@ -282,6 +299,8 @@ ep_session_alloc (
282299
instance->rundown_keyword = rundown_keyword;
283300
instance->synchronous_callback = sync_callback;
284301
instance->callback_additional_data = callback_additional_data;
302+
instance->user_events_data_fd = -1;
303+
instance->continuation_stream = NULL;
285304

286305
// Hard coded 10MB for now, we'll probably want to make
287306
// this configurable later.
@@ -320,6 +339,7 @@ ep_session_alloc (
320339
case EP_SESSION_TYPE_USEREVENTS:
321340
// With the user_events_data file, register tracepoints for each provider's tracepoint configurations
322341
ep_raise_error_if_nok (session_user_events_tracepoints_init (instance, user_events_data_fd));
342+
instance->continuation_stream = (IpcContinuationStream *)stream;
323343
break;
324344

325345
default:
@@ -550,15 +570,15 @@ ep_session_start_streaming (EventPipeSession *session)
550570
if (session->file != NULL)
551571
ep_file_initialize_file (session->file);
552572

553-
if (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM)
573+
if (ep_session_type_uses_streaming_thread (session->session_type))
554574
session_create_streaming_thread (session);
555575

556576
if (session->session_type == EP_SESSION_TYPE_SYNCHRONOUS) {
557577
EP_ASSERT (session->file == NULL);
558578
EP_ASSERT (!ep_session_get_streaming_enabled (session));
559579
}
560580

561-
if (session->session_type != EP_SESSION_TYPE_IPCSTREAM && session->session_type != EP_SESSION_TYPE_FILESTREAM)
581+
if (!ep_session_type_uses_streaming_thread (session->session_type))
562582
ep_rt_volatile_store_uint32_t_without_barrier (&session->started, 1);
563583

564584
ep_requires_lock_held ();
@@ -590,7 +610,7 @@ ep_session_disable (EventPipeSession *session)
590610
{
591611
EP_ASSERT (session != NULL);
592612

593-
if ((session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM) && ep_session_get_streaming_enabled (session))
613+
if ((ep_session_type_uses_streaming_thread (session->session_type)) && ep_session_get_streaming_enabled (session))
594614
session_disable_streaming_thread (session);
595615

596616
if (session->session_type == EP_SESSION_TYPE_USEREVENTS)
@@ -945,8 +965,8 @@ ep_session_get_next_event (EventPipeSession *session)
945965
EP_ASSERT (session != NULL);
946966
ep_requires_lock_not_held ();
947967

948-
if (!session->buffer_manager) {
949-
EP_ASSERT (!"Shouldn't call get_next_event on a synchronous session.");
968+
if (!ep_session_type_uses_buffer_manager (session->session_type)) {
969+
EP_ASSERT (!"Shouldn't call get_next_event on a session that doesn't use the buffer manager.");
950970
return NULL;
951971
}
952972

@@ -958,8 +978,8 @@ ep_session_get_wait_event (EventPipeSession *session)
958978
{
959979
EP_ASSERT (session != NULL);
960980

961-
if (!session->buffer_manager) {
962-
EP_ASSERT (!"Shouldn't call get_wait_event on a synchronous session.");
981+
if (!ep_session_type_uses_buffer_manager (session->session_type)) {
982+
EP_ASSERT (!"Shouldn't call get_wait_event on a session that doesn't use the buffer manager.");
963983
return NULL;
964984
}
965985

@@ -1032,6 +1052,12 @@ ep_session_type_uses_buffer_manager (EventPipeSessionType session_type)
10321052
return (session_type != EP_SESSION_TYPE_SYNCHRONOUS && session_type != EP_SESSION_TYPE_USEREVENTS);
10331053
}
10341054

1055+
bool
1056+
ep_session_type_uses_streaming_thread (EventPipeSessionType session_type)
1057+
{
1058+
return (session_type == EP_SESSION_TYPE_IPCSTREAM || session_type == EP_SESSION_TYPE_FILESTREAM || session_type == EP_SESSION_TYPE_USEREVENTS);
1059+
}
1060+
10351061
#endif /* !defined(EP_INCLUDE_SOURCE_FILES) || defined(EP_FORCE_INCLUDE_SOURCE_FILES) */
10361062
#endif /* ENABLE_PERFTRACING */
10371063

src/native/eventpipe/ep-session.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ struct _EventPipeSession_Internal {
7171
volatile uint32_t ref_count;
7272
// The user_events_data file descriptor to register Tracepoints and write user_events to.
7373
int user_events_data_fd;
74+
// The IPC continuation stream from initializing the session through the diagnostic server
75+
IpcContinuationStream *continuation_stream;
7476
};
7577

7678
#if !defined(EP_INLINE_GETTER_SETTER) && !defined(EP_IMPL_SESSION_GETTER_SETTER)
@@ -220,5 +222,8 @@ ep_session_has_started (EventPipeSession *session);
220222
bool
221223
ep_session_type_uses_buffer_manager (EventPipeSessionType session_type);
222224

225+
bool
226+
ep_session_type_uses_streaming_thread (EventPipeSessionType session_type);
227+
223228
#endif /* ENABLE_PERFTRACING */
224229
#endif /* __EVENTPIPE_SESSION_H__ */

src/native/eventpipe/ep-stream.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,21 @@ ep_stream_writer_write (
708708
bytes_written);
709709
}
710710

711+
/*
712+
* IpcContinuationStream.
713+
*/
714+
715+
bool
716+
ep_ipc_continuation_stream_connection_closed (IpcContinuationStream *ipc_continuation_stream)
717+
{
718+
EP_ASSERT (ipc_continuation_stream != NULL);
719+
720+
uint32_t bytes_read = 0;
721+
EventPipeIpcPollEvents poll_event = ep_ipc_stream_poll_vcall ((IpcStream *)ipc_continuation_stream, EP_IPC_POLL_TIMEOUT_MIN_MS);
722+
723+
return poll_event == EP_IPC_POLL_EVENTS_HANGUP;
724+
}
725+
711726
#endif /* !defined(EP_INCLUDE_SOURCE_FILES) || defined(EP_FORCE_INCLUDE_SOURCE_FILES) */
712727
#endif /* ENABLE_PERFTRACING */
713728

src/native/eventpipe/ep-stream.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,5 +355,26 @@ ep_ipc_stream_writer_write (
355355
uint32_t bytes_to_write,
356356
uint32_t *bytes_written);
357357

358+
/*
359+
* IpcContinuationStream
360+
*/
361+
362+
#if defined(EP_INLINE_GETTER_SETTER) || defined(EP_IMPL_STREAM_GETTER_SETTER)
363+
struct _IpcContinuationStream {
364+
#else
365+
struct _IpcContinuationStream_Internal {
366+
#endif
367+
IpcStream ipc_stream;
368+
};
369+
370+
#if !defined(EP_INLINE_GETTER_SETTER) && !defined(EP_IMPL_STREAM_GETTER_SETTER)
371+
struct _IpcContinuationStream {
372+
uint8_t _internal [sizeof (struct _IpcContinuationStream_Internal)];
373+
};
374+
#endif
375+
376+
bool
377+
ep_ipc_continuation_stream_connection_closed (IpcContinuationStream *ipc_continuation_stream);
378+
358379
#endif /* ENABLE_PERFTRACING */
359380
#endif /* __EVENTPIPE_STREAM_H__ */

src/native/eventpipe/ep-types-forward.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ typedef struct _FastSerializableObjectVtable FastSerializableObjectVtable;
5555
typedef struct _FastSerializer FastSerializer;
5656
typedef struct _FileStream FileStream;
5757
typedef struct _FileStreamWriter FileStreamWriter;
58+
typedef struct _IpcContinuationStream IpcContinuationStream;
5859
typedef struct _IpcStreamWriter IpcStreamWriter;
5960
typedef struct _StackHashEntry StackHashEntry;
6061
typedef struct _StackHashKey StackHashKey;

0 commit comments

Comments
 (0)