@@ -176,25 +176,52 @@ def refine_batch_metrics(stats)
176176 # current is a tuple of [event_count, byte_size] store the reference locally to avoid repeatedly
177177 # reading and retrieve unrelated values
178178 current_data_point = stats [ :batch ] [ :current ]
179- {
179+ # average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details.
180+ event_count_average_flow_metric = stats [ :batch ] [ :event_count ] [ :average ] . value
181+ event_count_average_lifetime = event_count_average_flow_metric [ "lifetime" ] ? event_count_average_flow_metric [ "lifetime" ] . round : 0
182+ byte_size_average_flow_metric = stats [ :batch ] [ :byte_size ] [ :average ] . value
183+ byte_size_average_lifetime = byte_size_average_flow_metric [ "lifetime" ] ? byte_size_average_flow_metric [ "lifetime" ] . round : 0
184+ result = {
180185 :event_count => {
181186 # current_data_point is an instance of org.logstash.instrument.metrics.gauge.LazyDelegatingGauge so need to invoke getValue() to obtain the actual value
182187 :current => current_data_point . value [ 0 ] ,
183188 :average => {
184- # average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details.
185- :lifetime => stats [ :batch ] [ :event_count ] [ :average ] . value [ "lifetime" ] ? stats [ :batch ] [ :event_count ] [ :average ] . value [ "lifetime" ] . round : 0
189+ :lifetime => event_count_average_lifetime
186190 }
187191 } ,
188192 :byte_size => {
189193 :current => current_data_point . value [ 1 ] ,
190194 :average => {
191- :lifetime => stats [ :batch ] [ :byte_size ] [ :average ] . value [ "lifetime" ] ? stats [ :batch ] [ :byte_size ] [ :average ] . value [ "lifetime" ] . round : 0
195+ :lifetime => byte_size_average_lifetime
192196 }
193197 }
194198 }
199+ # Enrich byte_size and event_count averages with the last 1, 5, 15 minutes averages if available
200+ publish_average_count_flow_metric ( event_count_average_flow_metric , result , :last_1_minute )
201+ publish_average_count_flow_metric ( event_count_average_flow_metric , result , :last_5_minutes )
202+ publish_average_count_flow_metric ( event_count_average_flow_metric , result , :last_15_minutes )
203+ publish_average_size_flow_metric ( byte_size_average_flow_metric , result , :last_1_minute )
204+ publish_average_size_flow_metric ( byte_size_average_flow_metric , result , :last_5_minutes )
205+ publish_average_size_flow_metric ( byte_size_average_flow_metric , result , :last_15_minutes )
206+ result
195207 end
196208 private :refine_batch_metrics
197209
210+
211+ def publish_average_count_flow_metric ( average_flow_metric , result , time_window )
212+ if average_flow_metric [ time_window . to_s ]
213+ result [ :event_count ] [ :average ] [ time_window ] = average_flow_metric [ time_window . to_s ] . round
214+ end
215+ end
216+ private :publish_average_count_flow_metric
217+
218+ def publish_average_size_flow_metric ( average_flow_metric , result , time_window )
219+ if average_flow_metric [ time_window . to_s ]
220+ result [ :byte_size ] [ :average ] [ time_window ] = average_flow_metric [ time_window . to_s ] . round
221+ end
222+ end
223+ private :publish_average_size_flow_metric
224+
198225 def report ( stats , extended_stats = nil , opts = { } )
199226 ret = {
200227 :events => stats [ :events ] ,
0 commit comments