39
39
import org .elasticsearch .script .ScriptStats ;
40
40
import org .elasticsearch .threadpool .ThreadPoolStats ;
41
41
import org .elasticsearch .transport .TransportStats ;
42
+
42
43
import java .util .HashMap ;
44
+ import java .util .List ;
43
45
import java .util .Map ;
46
+
44
47
import io .prometheus .client .Summary ;
45
48
46
49
/**
@@ -619,6 +622,7 @@ private void updateThreadPoolMetrics(ThreadPoolStats tps) {
619
622
}
620
623
}
621
624
625
+ @ SuppressWarnings ("checkstyle:LineLength" )
622
626
private void registerIngestMetrics () {
623
627
catalog .registerNodeGauge ("ingest_total_count" , "Ingestion total number" );
624
628
catalog .registerNodeGauge ("ingest_total_time_seconds" , "Ingestion total time in seconds" );
@@ -629,22 +633,40 @@ private void registerIngestMetrics() {
629
633
catalog .registerNodeGauge ("ingest_pipeline_total_time_seconds" , "Ingestion total time in seconds" , "pipeline" );
630
634
catalog .registerNodeGauge ("ingest_pipeline_total_current" , "Ingestion total current" , "pipeline" );
631
635
catalog .registerNodeGauge ("ingest_pipeline_total_failed_count" , "Ingestion total failed" , "pipeline" );
636
+
637
+ catalog .registerNodeGauge ("ingest_pipeline_processor_total_count" , "Ingestion total number" , "pipeline" , "processor" );
638
+ catalog .registerNodeGauge ("ingest_pipeline_processor_total_time_seconds" , "Ingestion total time in seconds" , "pipeline" , "processor" );
639
+ catalog .registerNodeGauge ("ingest_pipeline_processor_total_current" , "Ingestion total current" , "pipeline" , "processor" );
640
+ catalog .registerNodeGauge ("ingest_pipeline_processor_total_failed_count" , "Ingestion total failed" , "pipeline" , "processor" );
632
641
}
633
642
643
+ @ SuppressWarnings ("checkstyle:LineLength" )
634
644
private void updateIngestMetrics (IngestStats is ) {
635
645
if (is != null ) {
636
646
catalog .setNodeGauge ("ingest_total_count" , is .getTotalStats ().getIngestCount ());
637
647
catalog .setNodeGauge ("ingest_total_time_seconds" , is .getTotalStats ().getIngestTimeInMillis () / 1000.0 );
638
648
catalog .setNodeGauge ("ingest_total_current" , is .getTotalStats ().getIngestCurrent ());
639
649
catalog .setNodeGauge ("ingest_total_failed_count" , is .getTotalStats ().getIngestFailedCount ());
640
650
641
- for (Map . Entry < String , IngestStats .Stats > entry : is .getStatsPerPipeline (). entrySet ()) {
642
- String pipeline = entry . getKey ();
643
- catalog .setNodeGauge ("ingest_pipeline_total_count" , entry . getValue ().getIngestCount (), pipeline );
644
- catalog .setNodeGauge ("ingest_pipeline_total_time_seconds" , entry . getValue ().getIngestTimeInMillis () / 1000.0 ,
651
+ for (IngestStats .PipelineStat st : is .getPipelineStats ()) {
652
+ String pipeline = st . getPipelineId ();
653
+ catalog .setNodeGauge ("ingest_pipeline_total_count" , st . getStats ().getIngestCount (), pipeline );
654
+ catalog .setNodeGauge ("ingest_pipeline_total_time_seconds" , st . getStats ().getIngestTimeInMillis () / 1000.0 ,
645
655
pipeline );
646
- catalog .setNodeGauge ("ingest_pipeline_total_current" , entry .getValue ().getIngestCurrent (), pipeline );
647
- catalog .setNodeGauge ("ingest_pipeline_total_failed_count" , entry .getValue ().getIngestFailedCount (), pipeline );
656
+ catalog .setNodeGauge ("ingest_pipeline_total_current" , st .getStats ().getIngestCurrent (), pipeline );
657
+ catalog .setNodeGauge ("ingest_pipeline_total_failed_count" , st .getStats ().getIngestFailedCount (), pipeline );
658
+
659
+ List <IngestStats .ProcessorStat > pss = is .getProcessorStats ().get (pipeline );
660
+ if (pss != null ) {
661
+ for (IngestStats .ProcessorStat ps : pss ) {
662
+ String processor = ps .getName ();
663
+ catalog .setNodeGauge ("ingest_pipeline_processor_total_count" , ps .getStats ().getIngestCount (), pipeline , processor );
664
+ catalog .setNodeGauge ("ingest_pipeline_processor_total_time_seconds" , ps .getStats ().getIngestTimeInMillis () / 1000.0 ,
665
+ pipeline , processor );
666
+ catalog .setNodeGauge ("ingest_pipeline_processor_total_current" , ps .getStats ().getIngestCurrent (), pipeline , processor );
667
+ catalog .setNodeGauge ("ingest_pipeline_processor_total_failed_count" , ps .getStats ().getIngestFailedCount (), pipeline , processor );
668
+ }
669
+ }
648
670
}
649
671
}
650
672
}
0 commit comments