diff --git a/src/native/eventpipe/ds-eventpipe-protocol.c b/src/native/eventpipe/ds-eventpipe-protocol.c index 4f5e6daf299d5a..bac8ee6df9932c 100644 --- a/src/native/eventpipe/ds-eventpipe-protocol.c +++ b/src/native/eventpipe/ds-eventpipe-protocol.c @@ -920,7 +920,7 @@ eventpipe_protocol_helper_collect_tracing ( payload->serialization_format, payload->rundown_keyword, payload->stackwalk_requested, - payload->session_type == EP_SESSION_TYPE_IPCSTREAM ? ds_ipc_stream_get_stream_ref (stream) : NULL, + ds_ipc_stream_get_stream_ref (stream), NULL, NULL, user_events_data_fd); diff --git a/src/native/eventpipe/ds-ipc-pal-namedpipe.c b/src/native/eventpipe/ds-ipc-pal-namedpipe.c index 2444cb763c6496..0da47f5c772b14 100644 --- a/src/native/eventpipe/ds-ipc-pal-namedpipe.c +++ b/src/native/eventpipe/ds-ipc-pal-namedpipe.c @@ -213,7 +213,7 @@ ds_ipc_poll ( handles [i] = poll_handles_data [i].ipc->overlap.hEvent; if (handles [i] == INVALID_HANDLE_VALUE) { // Invalid handle, wait will fail. Signal error - poll_handles_data [i].events = DS_IPC_POLL_EVENTS_ERR; + poll_handles_data [i].events = IPC_POLL_EVENTS_ERR; } } else { // CLIENT @@ -238,7 +238,7 @@ ds_ipc_poll ( handles [i] = poll_handles_data [i].stream->overlap.hEvent; break; case ERROR_PIPE_NOT_CONNECTED: - poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP; + poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_HANGUP; result = -1; ep_raise_error (); default: @@ -288,7 +288,7 @@ ds_ipc_poll ( // check if we abandoned something DWORD abandonedIndex = wait - WAIT_ABANDONED_0; if (abandonedIndex > 0 || abandonedIndex < (poll_handles_data_len - 1)) { - poll_handles_data [abandonedIndex].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP; + poll_handles_data [abandonedIndex].events = (uint8_t)IPC_POLL_EVENTS_HANGUP; result = -1; ep_raise_error (); } else { @@ -325,20 +325,20 @@ ds_ipc_poll ( if (!success) { DWORD error = GetLastError(); if (error == ERROR_PIPE_NOT_CONNECTED || error == ERROR_BROKEN_PIPE) { - poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP; + poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_HANGUP; } else { if (callback) callback ("Client connection error", error); - poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR; + poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_ERR; result = -1; ep_raise_error (); } } else { - poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED; + poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED; } } else { // SERVER - poll_handles_data [index].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED; + poll_handles_data [index].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED; } result = 1; @@ -694,7 +694,7 @@ ipc_stream_read_func ( DWORD error = GetLastError (); if (error == ERROR_IO_PENDING) { // if we're waiting infinitely, only make one syscall - if (timeout_ms == DS_IPC_TIMEOUT_INFINITE) { + if (timeout_ms == IPC_TIMEOUT_INFINITE) { DS_ENTER_BLOCKING_PAL_SECTION; success = GetOverlappedResult ( ipc_stream->pipe, // pipe @@ -765,7 +765,7 @@ ipc_stream_write_func ( DWORD error = GetLastError (); if (error == ERROR_IO_PENDING) { // if we're waiting infinitely, only make one syscall - if (timeout_ms == DS_IPC_TIMEOUT_INFINITE) { + if (timeout_ms == IPC_TIMEOUT_INFINITE) { DS_ENTER_BLOCKING_PAL_SECTION; success = GetOverlappedResult ( ipc_stream->pipe, // pipe @@ -834,12 +834,24 @@ ipc_stream_close_func (void *object) return ds_ipc_stream_close (ipc_stream, NULL); } +static +IpcPollEvents +ipc_stream_poll_func ( + void *object, + uint32_t timeout_ms) +{ + EP_ASSERT (!"ipc_stream_poll_func needs to be implemented for NamedPipes"); + // TODO: Implement ipc_stream_poll_func for NamedPipes + return IPC_POLL_EVENTS_UNKNOWN; +} + static IpcStreamVtable ipc_stream_vtable = { ipc_stream_free_func, ipc_stream_read_func, ipc_stream_write_func, ipc_stream_flush_func, - ipc_stream_close_func }; + ipc_stream_close_func, + ipc_stream_poll_func }; static DiagnosticsIpcStream * diff --git a/src/native/eventpipe/ds-ipc-pal-socket.c b/src/native/eventpipe/ds-ipc-pal-socket.c index 4a16fb5f81f253..590daa9e7035cb 100644 --- a/src/native/eventpipe/ds-ipc-pal-socket.c +++ b/src/native/eventpipe/ds-ipc-pal-socket.c @@ -588,7 +588,7 @@ ipc_socket_connect ( // the server hasn't called `accept`, so no need to check for timeout or connect error. #if defined(DS_IPC_PAL_AF_INET) || defined(DS_IPC_PAL_AF_INET6) - if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) { + if (timeout_ms != IPC_TIMEOUT_INFINITE) { // Set socket to none blocking. ipc_socket_set_blocking (s, false); } @@ -601,7 +601,7 @@ ipc_socket_connect ( DS_EXIT_BLOCKING_PAL_SECTION; #if defined(DS_IPC_PAL_AF_INET) || defined(DS_IPC_PAL_AF_INET6) - if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) { + if (timeout_ms != IPC_TIMEOUT_INFINITE) { if (result_connect == DS_IPC_SOCKET_ERROR) { if (ipc_get_last_error () == DS_IPC_SOCKET_ERROR_WOULDBLOCK) { ds_ipc_pollfd_t pfd; @@ -627,7 +627,7 @@ ipc_socket_connect ( } } - if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) { + if (timeout_ms != IPC_TIMEOUT_INFINITE) { // Reset socket to blocking. int last_error = ipc_get_last_error (); ipc_socket_set_blocking (s, true); @@ -1146,15 +1146,15 @@ ds_ipc_poll ( // check for hangup first because a closed socket // will technically meet the requirements for POLLIN // i.e., a call to recv/read won't block - poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_HANGUP; + poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_HANGUP; } else if ((poll_fds [i].revents & (POLLERR|POLLNVAL))) { if (callback) callback ("Poll error", (uint32_t)poll_fds [i].revents); - poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR; + poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_ERR; } else if (poll_fds [i].revents & (POLLIN|POLLPRI)) { - poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED; + poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED; } else { - poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_UNKNOWN; + poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_UNKNOWN; if (callback) callback ("unknown poll response", (uint32_t)poll_fds [i].revents); } @@ -1401,7 +1401,7 @@ ipc_stream_read_func ( DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object; ssize_t total_bytes_read = 0; - if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) { + if (timeout_ms != IPC_TIMEOUT_INFINITE) { ds_ipc_pollfd_t pfd; pfd.fd = ipc_stream->client_socket; pfd.events = POLLIN; @@ -1445,7 +1445,7 @@ ipc_stream_write_func ( DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object; ssize_t total_bytes_written = 0; - if (timeout_ms != DS_IPC_TIMEOUT_INFINITE) { + if (timeout_ms != IPC_TIMEOUT_INFINITE) { ds_ipc_pollfd_t pfd; pfd.fd = ipc_stream->client_socket; pfd.events = POLLOUT; @@ -1489,12 +1489,24 @@ ipc_stream_close_func (void *object) return ds_ipc_stream_close (ipc_stream, NULL); } +static +IpcPollEvents +ipc_stream_poll_func ( + void *object, + uint32_t timeout_ms) +{ + EP_ASSERT (object != NULL); + DiagnosticsIpcStream *ipc_stream = (DiagnosticsIpcStream *)object; + return ds_ipc_stream_poll (ipc_stream, timeout_ms); +} + static IpcStreamVtable ipc_stream_vtable = { ipc_stream_free_func, ipc_stream_read_func, ipc_stream_write_func, ipc_stream_flush_func, - ipc_stream_close_func }; + ipc_stream_close_func, + ipc_stream_poll_func }; static DiagnosticsIpcStream * @@ -1668,6 +1680,44 @@ ds_ipc_stream_to_string ( return (result > 0 && result < (int32_t)buffer_len) ? result : 0; } +IpcPollEvents +ds_ipc_stream_poll ( + DiagnosticsIpcStream *ipc_stream, + uint32_t timeout_ms) +{ + EP_ASSERT (ipc_stream != NULL); + + if (ipc_stream->client_socket == DS_IPC_INVALID_SOCKET) + return IPC_POLL_EVENTS_HANGUP; + + ds_ipc_pollfd_t pfd; + pfd.fd = ipc_stream->client_socket; + pfd.events = POLLIN | POLLPRI | POLLOUT; + + int result_poll; + result_poll = ipc_poll_fds (&pfd, 1, timeout_ms); + + if (result_poll < 0) + return IPC_POLL_EVENTS_ERR; + + if (result_poll == 0) + return IPC_POLL_EVENTS_NONE; + + if (pfd.revents == 0) + return IPC_POLL_EVENTS_NONE; + + if (pfd.revents & POLLHUP) + return IPC_POLL_EVENTS_HANGUP; + + if (pfd.revents & (POLLERR | POLLNVAL)) + return IPC_POLL_EVENTS_ERR; + + if (pfd.revents & (POLLIN | POLLPRI | POLLOUT)) + return IPC_POLL_EVENTS_SIGNALED; + + return IPC_POLL_EVENTS_UNKNOWN; +} + #endif /* ENABLE_PERFTRACING */ #ifndef DS_INCLUDE_SOURCE_FILES diff --git a/src/native/eventpipe/ds-ipc-pal-types.h b/src/native/eventpipe/ds-ipc-pal-types.h index d623fb22e400c1..7b75e998b3f0e4 100644 --- a/src/native/eventpipe/ds-ipc-pal-types.h +++ b/src/native/eventpipe/ds-ipc-pal-types.h @@ -22,21 +22,12 @@ typedef struct _DiagnosticsIpcStream DiagnosticsIpcStream; * Diagnostics IPC PAL Enums. */ -typedef enum { - DS_IPC_POLL_EVENTS_NONE = 0x00, // no events - DS_IPC_POLL_EVENTS_SIGNALED = 0x01, // ready for use - DS_IPC_POLL_EVENTS_HANGUP = 0x02, // connection remotely closed - DS_IPC_POLL_EVENTS_ERR = 0x04, // error - DS_IPC_POLL_EVENTS_UNKNOWN = 0x80 // unknown state -} DiagnosticsIpcPollEvents; - typedef enum { DS_IPC_CONNECTION_MODE_CONNECT, DS_IPC_CONNECTION_MODE_LISTEN } DiagnosticsIpcConnectionMode; #define DS_IPC_MAX_TO_STRING_LEN 128 -#define DS_IPC_TIMEOUT_INFINITE (uint32_t)-1 #define DS_IPC_POLL_TIMEOUT_FALLOFF_FACTOR (float)1.25 #define DS_IPC_POLL_TIMEOUT_MIN_MS (uint32_t)10 diff --git a/src/native/eventpipe/ds-ipc-pal-websocket.c b/src/native/eventpipe/ds-ipc-pal-websocket.c index 97eb89025a468a..a50fa1bfd4fb11 100644 --- a/src/native/eventpipe/ds-ipc-pal-websocket.c +++ b/src/native/eventpipe/ds-ipc-pal-websocket.c @@ -248,11 +248,11 @@ ds_ipc_poll ( int client_socket = poll_handles_data [i].stream->client_socket; int pending = ds_rt_websocket_poll (client_socket); if (pending < 0){ - poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_ERR; + poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_ERR; return 1; } if (pending > 0){ - poll_handles_data [i].events = (uint8_t)DS_IPC_POLL_EVENTS_SIGNALED; + poll_handles_data [i].events = (uint8_t)IPC_POLL_EVENTS_SIGNALED; return 1; } } @@ -425,12 +425,24 @@ ipc_stream_close_func (void *object) return ds_ipc_stream_close (ipc_stream, NULL); } +static +IpcPollEvents +ipc_stream_poll_func ( + void *object, + uint32_t timeout_ms) +{ + EP_ASSERT (!"ipc_stream_poll_func needs to be implemented for WebSockets"); + // TODO: Implement ipc_stream_poll_func for WebSockets + return IPC_POLL_EVENTS_UNKNOWN; +} + static IpcStreamVtable ipc_stream_vtable = { ipc_stream_free_func, ipc_stream_read_func, ipc_stream_write_func, ipc_stream_flush_func, - ipc_stream_close_func }; + ipc_stream_close_func, + ipc_stream_poll_func }; static DiagnosticsIpcStream * diff --git a/src/native/eventpipe/ds-ipc-pal.h b/src/native/eventpipe/ds-ipc-pal.h index 362ea3925f92ba..3761099fb4ba66 100644 --- a/src/native/eventpipe/ds-ipc-pal.h +++ b/src/native/eventpipe/ds-ipc-pal.h @@ -140,5 +140,10 @@ ds_ipc_stream_to_string ( ep_char8_t *buffer, uint32_t buffer_len); +IpcPollEvents +ds_ipc_stream_poll ( + DiagnosticsIpcStream *ipc_stream, + uint32_t timeout_ms); + #endif /* ENABLE_PERFTRACING */ #endif /* __DIAGNOSTICS_IPC_PAL_H__ */ diff --git a/src/native/eventpipe/ds-ipc.c b/src/native/eventpipe/ds-ipc.c index 6527d5bbce706d..cb2b051cb9ed20 100644 --- a/src/native/eventpipe/ds-ipc.c +++ b/src/native/eventpipe/ds-ipc.c @@ -122,7 +122,7 @@ inline uint32_t ipc_stream_factory_get_next_timeout (uint32_t current_timeout_ms) { - if (current_timeout_ms == DS_IPC_TIMEOUT_INFINITE) + if (current_timeout_ms == IPC_TIMEOUT_INFINITE) return DS_IPC_POLL_TIMEOUT_MIN_MS; else return (current_timeout_ms >= DS_IPC_POLL_TIMEOUT_MAX_MS) ? @@ -361,7 +361,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call DiagnosticsIpcStream *stream = NULL; - uint32_t poll_timeout_ms = DS_IPC_TIMEOUT_INFINITE; + uint32_t poll_timeout_ms = IPC_TIMEOUT_INFINITE; bool connect_success = true; uint32_t poll_attempts = 0; @@ -382,7 +382,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call } DN_VECTOR_PTR_FOREACH_END; poll_timeout_ms = connect_success ? - DS_IPC_TIMEOUT_INFINITE : + IPC_TIMEOUT_INFINITE : ipc_stream_factory_get_next_timeout (poll_timeout_ms); int32_t ret_val; @@ -392,7 +392,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call ipc_log_poll_handles (&ipc_poll_handles); ret_val = ds_ipc_poll (dn_vector_data_t (&ipc_poll_handles, DiagnosticsIpcPollHandle), dn_vector_size (&ipc_poll_handles), poll_timeout_ms, callback); } else { - if (poll_timeout_ms == DS_IPC_TIMEOUT_INFINITE) + if (poll_timeout_ms == IPC_TIMEOUT_INFINITE) poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MAX_MS; DS_LOG_DEBUG_1 ("ds_ipc_stream_factory_get_next_available_stream - Nothing to poll, sleeping using timeout: %dms.", poll_timeout_ms); ep_rt_thread_sleep ((uint64_t)poll_timeout_ms * NUM_NANOSECONDS_IN_1_MS); @@ -406,13 +406,13 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call DN_VECTOR_FOREACH_BEGIN (DiagnosticsIpcPollHandle, ipc_poll_handle, &ipc_poll_handles) { DiagnosticsPort *port = (DiagnosticsPort *)ipc_poll_handle.user_data; switch (ipc_poll_handle.events) { - case DS_IPC_POLL_EVENTS_HANGUP: + case IPC_POLL_EVENTS_HANGUP: EP_ASSERT (port != NULL); ds_port_reset_vcall (port, callback); DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - HUP :: Poll attempt: %d, connection %d hung up. Connect is reset.", poll_attempts, connection_id); poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MIN_MS; break; - case DS_IPC_POLL_EVENTS_SIGNALED: + case IPC_POLL_EVENTS_SIGNALED: EP_ASSERT (port != NULL); if (!stream) { // only use first signaled stream; will get others on subsequent calls stream = ds_port_get_connected_stream_vcall (port, callback); @@ -422,12 +422,12 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call } DS_LOG_DEBUG_2 ("ds_ipc_stream_factory_get_next_available_stream - SIG :: Poll attempt: %d, connection %d signalled.", poll_attempts, connection_id); break; - case DS_IPC_POLL_EVENTS_ERR: + case IPC_POLL_EVENTS_ERR: ds_port_reset_vcall ((DiagnosticsPort *)ipc_poll_handle.user_data, callback); DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - ERR :: Poll attempt: %d, connection %d errored. Connection is reset.", poll_attempts, connection_id); saw_error = true; break; - case DS_IPC_POLL_EVENTS_NONE: + case IPC_POLL_EVENTS_NONE: DS_LOG_INFO_2 ("ds_ipc_stream_factory_get_next_available_stream - NON :: Poll attempt: %d, connection %d had no events.", poll_attempts, connection_id); break; default: @@ -444,7 +444,7 @@ ds_ipc_stream_factory_get_next_available_stream (ds_ipc_error_callback_func call if (!stream && saw_error) { // Some errors can cause the poll to return instantly, we want to delay if we see an error to avoid // runaway CPU usage. - if (poll_timeout_ms == DS_IPC_TIMEOUT_INFINITE) + if (poll_timeout_ms == IPC_TIMEOUT_INFINITE) poll_timeout_ms = DS_IPC_POLL_TIMEOUT_MAX_MS; DS_LOG_DEBUG_1 ("ds_ipc_stream_factory_get_next_available_stream - Saw error, sleeping using timeout: %dms.", poll_timeout_ms); ep_rt_thread_sleep ((uint64_t)poll_timeout_ms * NUM_NANOSECONDS_IN_1_MS); diff --git a/src/native/eventpipe/ep-ipc-pal-types.h b/src/native/eventpipe/ep-ipc-pal-types.h index a353b8a3c104c8..574654fd64bfc3 100644 --- a/src/native/eventpipe/ep-ipc-pal-types.h +++ b/src/native/eventpipe/ep-ipc-pal-types.h @@ -11,5 +11,19 @@ #include "ep-ipc-pal-types-forward.h" +/* + * Shared Diagnostics/EventPipe IPC PAL Enums. + */ + +typedef enum { + IPC_POLL_EVENTS_NONE = 0x00, // no events + IPC_POLL_EVENTS_SIGNALED = 0x01, // ready for use + IPC_POLL_EVENTS_HANGUP = 0x02, // connection remotely closed + IPC_POLL_EVENTS_ERR = 0x04, // error + IPC_POLL_EVENTS_UNKNOWN = 0x80 // unknown state +} IpcPollEvents; + +#define IPC_TIMEOUT_INFINITE (uint32_t)-1 + #endif /* ENABLE_PERFTRACING */ #endif /* __EVENTPIPE_IPC_PAL_TYPES_H__ */ diff --git a/src/native/eventpipe/ep-ipc-stream.h b/src/native/eventpipe/ep-ipc-stream.h index eeb615675de4dc..633edfa976dc83 100644 --- a/src/native/eventpipe/ep-ipc-stream.h +++ b/src/native/eventpipe/ep-ipc-stream.h @@ -21,6 +21,7 @@ typedef bool (*IpcStreamReadFunc)(void *object, uint8_t *buffer, uint32_t bytes_ typedef bool (*IpcStreamWriteFunc)(void *object, const uint8_t *buffer, uint32_t bytes_to_write, uint32_t *bytes_written, uint32_t timeout_ms); typedef bool (*IpcStreamFlushFunc)(void *object); typedef bool (*IpcStreamCloseFunc)(void *object); +typedef IpcPollEvents (*IpcStreamPollFunc)(void *object, uint32_t timeout_ms); struct _IpcStreamVtable { IpcStreamFreeFunc free_func; @@ -28,6 +29,7 @@ struct _IpcStreamVtable { IpcStreamWriteFunc write_func; IpcStreamFlushFunc flush_func; IpcStreamCloseFunc close_func; + IpcStreamPollFunc poll_func; }; #if defined(EP_INLINE_GETTER_SETTER) || defined(EP_IMPL_IPC_STREAM_GETTER_SETTER) || defined(DS_IMPL_IPC_PAL_NAMEDPIPE_GETTER_SETTER) || defined(DS_IMPL_IPC_PAL_SOCKET_GETTER_SETTER) @@ -77,5 +79,8 @@ ep_ipc_stream_flush_vcall (IpcStream *ipc_stream); bool ep_ipc_stream_close_vcall (IpcStream *ipc_stream); +IpcPollEvents +ep_ipc_stream_poll_vcall (IpcStream *ipc_stream, uint32_t timeout_ms); + #endif /* ENABLE_PERFTRACING */ #endif /* __EVENTPIPE_IPC_STREAM_H__ */ diff --git a/src/native/eventpipe/ep-session.c b/src/native/eventpipe/ep-session.c index ac9cf1d7e36320..56786d0e6f2681 100644 --- a/src/native/eventpipe/ep-session.c +++ b/src/native/eventpipe/ep-session.c @@ -9,7 +9,9 @@ #include "ep-config.h" #include "ep-event.h" #include "ep-file.h" +#include "ep-ipc-stream.h" #include "ep-session.h" +#include "ep-stream.h" #include "ep-event-payload.h" #include "ep-rt.h" @@ -76,6 +78,10 @@ session_tracepoint_write_event ( ep_rt_thread_handle_t event_thread, EventPipeStackContents *stack); +static +bool +session_is_stream_connection_closed (IpcStream *stream); + /* * EventPipeSession. */ @@ -91,7 +97,7 @@ EP_RT_DEFINE_THREAD_FUNC (streaming_thread) ep_rt_thread_params_t *thread_params = (ep_rt_thread_params_t *)data; EventPipeSession *const session = (EventPipeSession *)thread_params->thread_params; - if (session->session_type != EP_SESSION_TYPE_IPCSTREAM && session->session_type != EP_SESSION_TYPE_FILESTREAM) + if (!ep_session_type_uses_streaming_thread (session->session_type)) return 1; if (!thread_params->thread || !ep_rt_thread_has_started (thread_params->thread)) @@ -100,28 +106,44 @@ EP_RT_DEFINE_THREAD_FUNC (streaming_thread) session->streaming_thread = thread_params->thread; bool success = true; - ep_rt_wait_event_handle_t *wait_event = ep_session_get_wait_event (session); ep_rt_volatile_store_uint32_t (&session->started, 1); EP_GCX_PREEMP_ENTER - while (ep_session_get_streaming_enabled (session)) { - bool events_written = false; - if (!ep_session_write_all_buffers_to_file (session, &events_written)) { - success = false; - break; - } + if (ep_session_type_uses_buffer_manager (session->session_type)) { + ep_rt_wait_event_handle_t *wait_event = ep_session_get_wait_event (session); + while (ep_session_get_streaming_enabled (session)) { + bool events_written = false; + if (!ep_session_write_all_buffers_to_file (session, &events_written)) { + success = false; + break; + } - if (!events_written) { - // No events were available, sleep until more are available - ep_rt_wait_event_wait (wait_event, EP_INFINITE_WAIT, false); + if (!events_written) { + // No events were available, sleep until more are available + ep_rt_wait_event_wait (wait_event, EP_INFINITE_WAIT, false); + } + + // Wait until it's time to sample again. + const uint32_t timeout_ns = 100000000; // 100 msec. + ep_rt_thread_sleep (timeout_ns); } + } else if (session->session_type == EP_SESSION_TYPE_USEREVENTS) { + // In a user events session we only monitor the stream to stop the session if it closes. + while (ep_session_get_streaming_enabled (session)) { + EP_ASSERT (session->stream != NULL); + if (session_is_stream_connection_closed (session->stream)) { + success = false; + break; + } - // Wait until it's time to sample again. - const uint32_t timeout_ns = 100000000; // 100 msec. - ep_rt_thread_sleep (timeout_ns); + // Wait until it's time to poll again. + const uint32_t timeout_ns = 100000000; // 100 msec. + ep_rt_thread_sleep (timeout_ns); + } + } else { + EP_UNREACHABLE ("Unsupported session type for streaming thread."); } - session->streaming_thread = NULL; ep_rt_wait_event_set (&session->rt_thread_shutdown_event); EP_GCX_PREEMP_EXIT @@ -159,19 +181,19 @@ void session_create_streaming_thread (EventPipeSession *session) { EP_ASSERT (session != NULL); - EP_ASSERT (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM); + EP_ASSERT (ep_session_type_uses_streaming_thread (session->session_type)); ep_requires_lock_held (); ep_session_set_streaming_enabled (session, true); ep_rt_wait_event_alloc (&session->rt_thread_shutdown_event, true, false); if (!ep_rt_wait_event_is_valid (&session->rt_thread_shutdown_event)) - EP_UNREACHABLE ("Unable to create stream flushing thread shutdown event."); + EP_UNREACHABLE ("Unable to create streaming thread shutdown event."); #ifndef PERFTRACING_DISABLE_THREADS ep_rt_thread_id_t thread_id = ep_rt_uint64_t_to_thread_id_t (0); if (!ep_rt_thread_create ((void *)streaming_thread, (void *)session, EP_THREAD_TYPE_SESSION, &thread_id)) - EP_UNREACHABLE ("Unable to create stream flushing thread."); + EP_UNREACHABLE ("Unable to create streaming thread."); #else ep_session_inc_ref (session); ep_rt_volatile_store_uint32_t (&session->started, 1); @@ -183,18 +205,19 @@ static void session_disable_streaming_thread (EventPipeSession *session) { - EP_ASSERT (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM); + EP_ASSERT (ep_session_type_uses_streaming_thread (session->session_type)); EP_ASSERT (ep_session_get_streaming_enabled (session)); EP_ASSERT (!ep_rt_process_detach ()); - EP_ASSERT (session->buffer_manager != NULL); + EP_ASSERT (!ep_session_type_uses_buffer_manager (session->session_type) || session->buffer_manager != NULL); // The streaming thread will watch this value and exit // when profiling is disabled. ep_session_set_streaming_enabled (session, false); // Thread could be waiting on the event that there is new data to read. - ep_rt_wait_event_set (ep_buffer_manager_get_rt_wait_event_ref (session->buffer_manager)); + if (ep_session_type_uses_buffer_manager (session->session_type)) + ep_rt_wait_event_set (ep_buffer_manager_get_rt_wait_event_ref (session->buffer_manager)); // Wait for the streaming thread to clean itself up. ep_rt_wait_event_handle_t *rt_thread_shutdown_event = &session->rt_thread_shutdown_event; @@ -202,6 +225,15 @@ session_disable_streaming_thread (EventPipeSession *session) ep_rt_wait_event_free (rt_thread_shutdown_event); } +static +bool +session_is_stream_connection_closed (IpcStream *stream) +{ + EP_ASSERT (stream != NULL); + IpcPollEvents poll_event = ep_ipc_stream_poll_vcall (stream, IPC_TIMEOUT_INFINITE); + return poll_event == IPC_POLL_EVENTS_HANGUP || poll_event == IPC_POLL_EVENTS_ERR; +} + /* * session_user_events_tracepoints_init * @@ -282,6 +314,8 @@ ep_session_alloc ( instance->rundown_keyword = rundown_keyword; instance->synchronous_callback = sync_callback; instance->callback_additional_data = callback_additional_data; + instance->user_events_data_fd = -1; + instance->stream = NULL; // Hard coded 10MB for now, we'll probably want to make // this configurable later. @@ -320,6 +354,7 @@ ep_session_alloc ( case EP_SESSION_TYPE_USEREVENTS: // With the user_events_data file, register tracepoints for each provider's tracepoint configurations ep_raise_error_if_nok (session_user_events_tracepoints_init (instance, user_events_data_fd)); + instance->stream = stream; break; default: @@ -550,7 +585,7 @@ ep_session_start_streaming (EventPipeSession *session) if (session->file != NULL) ep_file_initialize_file (session->file); - if (session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM) + if (ep_session_type_uses_streaming_thread (session->session_type)) session_create_streaming_thread (session); if (session->session_type == EP_SESSION_TYPE_SYNCHRONOUS) { @@ -558,7 +593,7 @@ ep_session_start_streaming (EventPipeSession *session) EP_ASSERT (!ep_session_get_streaming_enabled (session)); } - if (session->session_type != EP_SESSION_TYPE_IPCSTREAM && session->session_type != EP_SESSION_TYPE_FILESTREAM) + if (!ep_session_type_uses_streaming_thread (session->session_type)) ep_rt_volatile_store_uint32_t_without_barrier (&session->started, 1); ep_requires_lock_held (); @@ -590,7 +625,7 @@ ep_session_disable (EventPipeSession *session) { EP_ASSERT (session != NULL); - if ((session->session_type == EP_SESSION_TYPE_IPCSTREAM || session->session_type == EP_SESSION_TYPE_FILESTREAM) && ep_session_get_streaming_enabled (session)) + if ((ep_session_type_uses_streaming_thread (session->session_type)) && ep_session_get_streaming_enabled (session)) session_disable_streaming_thread (session); if (session->session_type == EP_SESSION_TYPE_USEREVENTS) @@ -945,8 +980,8 @@ ep_session_get_next_event (EventPipeSession *session) EP_ASSERT (session != NULL); ep_requires_lock_not_held (); - if (!session->buffer_manager) { - EP_ASSERT (!"Shouldn't call get_next_event on a synchronous session."); + if (!ep_session_type_uses_buffer_manager (session->session_type)) { + EP_ASSERT (!"Shouldn't call get_next_event on a session that doesn't use the buffer manager."); return NULL; } @@ -958,8 +993,8 @@ ep_session_get_wait_event (EventPipeSession *session) { EP_ASSERT (session != NULL); - if (!session->buffer_manager) { - EP_ASSERT (!"Shouldn't call get_wait_event on a synchronous session."); + if (!ep_session_type_uses_buffer_manager (session->session_type)) { + EP_ASSERT (!"Shouldn't call get_wait_event on a session that doesn't use the buffer manager."); return NULL; } @@ -1032,6 +1067,12 @@ ep_session_type_uses_buffer_manager (EventPipeSessionType session_type) return (session_type != EP_SESSION_TYPE_SYNCHRONOUS && session_type != EP_SESSION_TYPE_USEREVENTS); } +bool +ep_session_type_uses_streaming_thread (EventPipeSessionType session_type) +{ + return (session_type == EP_SESSION_TYPE_IPCSTREAM || session_type == EP_SESSION_TYPE_FILESTREAM || session_type == EP_SESSION_TYPE_USEREVENTS); +} + #endif /* !defined(EP_INCLUDE_SOURCE_FILES) || defined(EP_FORCE_INCLUDE_SOURCE_FILES) */ #endif /* ENABLE_PERFTRACING */ diff --git a/src/native/eventpipe/ep-session.h b/src/native/eventpipe/ep-session.h index 56af80bf3438ed..0617a876287862 100644 --- a/src/native/eventpipe/ep-session.h +++ b/src/native/eventpipe/ep-session.h @@ -71,6 +71,9 @@ struct _EventPipeSession_Internal { volatile uint32_t ref_count; // The user_events_data file descriptor to register Tracepoints and write user_events to. int user_events_data_fd; + // The IPC continuation stream from initializing the session through the diagnostic server + // Currently only initialized for user_events sessions. + IpcStream *stream; }; #if !defined(EP_INLINE_GETTER_SETTER) && !defined(EP_IMPL_SESSION_GETTER_SETTER) @@ -220,5 +223,8 @@ ep_session_has_started (EventPipeSession *session); bool ep_session_type_uses_buffer_manager (EventPipeSessionType session_type); +bool +ep_session_type_uses_streaming_thread (EventPipeSessionType session_type); + #endif /* ENABLE_PERFTRACING */ #endif /* __EVENTPIPE_SESSION_H__ */ diff --git a/src/native/eventpipe/ep-stream.c b/src/native/eventpipe/ep-stream.c index 76b0c3f0db20d5..cb31ddccef84cc 100644 --- a/src/native/eventpipe/ep-stream.c +++ b/src/native/eventpipe/ep-stream.c @@ -538,6 +538,18 @@ ep_ipc_stream_close_vcall (IpcStream *ipc_stream) return vtable->close_func (ipc_stream); } +IpcPollEvents +ep_ipc_stream_poll_vcall (IpcStream *ipc_stream, uint32_t timeout_ms) +{ + EP_ASSERT (ipc_stream != NULL); + + EP_ASSERT (ipc_stream->vtable != NULL); + IpcStreamVtable *vtable = ipc_stream->vtable; + + EP_ASSERT (vtable->poll_func != NULL); + return vtable->poll_func (ipc_stream, timeout_ms); +} + /* * IpcStreamWriter. */