25
25
import com .ibm .mq .headers .pcf .PCFMessage ;
26
26
import io .opentelemetry .api .common .AttributeKey ;
27
27
import io .opentelemetry .api .common .Attributes ;
28
+ import io .opentelemetry .api .metrics .LongGauge ;
28
29
import java .util .Arrays ;
29
30
import java .util .List ;
30
31
import java .util .Set ;
31
- import org .jetbrains .annotations .NotNull ;
32
32
import org .slf4j .Logger ;
33
33
import org .slf4j .LoggerFactory ;
34
34
@@ -37,15 +37,94 @@ public final class ChannelMetricsCollector implements Runnable {
37
37
38
38
private static final Logger logger = LoggerFactory .getLogger (ChannelMetricsCollector .class );
39
39
40
- private final MetricCreator metricCreator ;
41
40
private final MetricsCollectorContext context ;
41
+ private final LongGauge activeChannelsGauge ;
42
+ private final LongGauge channelStatusGauge ;
43
+ private final LongGauge receivedCountGauge ;
44
+ private final LongGauge byteSentGauge ;
45
+ private final LongGauge byteReceivedGauge ;
46
+ private final LongGauge buffersSentGauge ;
47
+ private final LongGauge buffersReceivedGauge ;
48
+ private final LongGauge currentSharingConvsGauge ;
49
+ private final LongGauge maxSharingConvsGauge ;
42
50
43
51
/*
44
52
* The Channel Status values are mentioned here http://www.ibm.com/support/knowledgecenter/SSFKSJ_7.5.0/com.ibm.mq.ref.dev.doc/q090880_.htm
45
53
*/
46
- public ChannelMetricsCollector (MetricsCollectorContext context , MetricCreator metricCreator ) {
54
+ public ChannelMetricsCollector (MetricsCollectorContext context ) {
47
55
this .context = context ;
48
- this .metricCreator = metricCreator ;
56
+ this .activeChannelsGauge =
57
+ context
58
+ .getMetricWriteHelper ()
59
+ .getMeter ()
60
+ .gaugeBuilder ("mq.manager.active.channels" )
61
+ .ofLongs ()
62
+ .setUnit ("1" )
63
+ .build ();
64
+ this .channelStatusGauge =
65
+ context
66
+ .getMetricWriteHelper ()
67
+ .getMeter ()
68
+ .gaugeBuilder ("mq.status" )
69
+ .ofLongs ()
70
+ .setUnit ("1" )
71
+ .build ();
72
+ this .receivedCountGauge =
73
+ context
74
+ .getMetricWriteHelper ()
75
+ .getMeter ()
76
+ .gaugeBuilder ("mq.message.received.count" )
77
+ .ofLongs ()
78
+ .setUnit ("1" )
79
+ .build ();
80
+ this .byteSentGauge =
81
+ context
82
+ .getMetricWriteHelper ()
83
+ .getMeter ()
84
+ .gaugeBuilder ("mq.byte.sent" )
85
+ .ofLongs ()
86
+ .setUnit ("1" )
87
+ .build ();
88
+ this .byteReceivedGauge =
89
+ context
90
+ .getMetricWriteHelper ()
91
+ .getMeter ()
92
+ .gaugeBuilder ("mq.byte.received" )
93
+ .ofLongs ()
94
+ .setUnit ("1" )
95
+ .build ();
96
+ this .buffersSentGauge =
97
+ context
98
+ .getMetricWriteHelper ()
99
+ .getMeter ()
100
+ .gaugeBuilder ("mq.buffers.sent" )
101
+ .ofLongs ()
102
+ .setUnit ("1" )
103
+ .build ();
104
+ this .buffersReceivedGauge =
105
+ context
106
+ .getMetricWriteHelper ()
107
+ .getMeter ()
108
+ .gaugeBuilder ("mq.buffers.received" )
109
+ .ofLongs ()
110
+ .setUnit ("1" )
111
+ .build ();
112
+ this .currentSharingConvsGauge =
113
+ context
114
+ .getMetricWriteHelper ()
115
+ .getMeter ()
116
+ .gaugeBuilder ("mq.current.sharing.conversations" )
117
+ .ofLongs ()
118
+ .setUnit ("1" )
119
+ .build ();
120
+ this .maxSharingConvsGauge =
121
+ context
122
+ .getMetricWriteHelper ()
123
+ .getMeter ()
124
+ .gaugeBuilder ("mq.max.sharing.conversations" )
125
+ .ofLongs ()
126
+ .setUnit ("1" )
127
+ .build ();
49
128
}
50
129
51
130
@ Override
@@ -57,6 +136,7 @@ public void run() {
57
136
new int [] {
58
137
CMQCFC .MQCACH_CHANNEL_NAME ,
59
138
CMQCFC .MQCACH_CONNECTION_NAME ,
139
+ CMQCFC .MQIACH_CHANNEL_TYPE ,
60
140
CMQCFC .MQIACH_MSGS ,
61
141
CMQCFC .MQIACH_CHANNEL_STATUS ,
62
142
CMQCFC .MQIACH_BYTES_SENT ,
@@ -108,10 +188,10 @@ public void run() {
108
188
109
189
for (PCFMessage message : messages ) {
110
190
String channelName = MessageBuddy .channelName (message );
191
+ String channelType = MessageBuddy .channelType (message );
111
192
112
193
logger .debug ("Pulling out metrics for channel name {}" , channelName );
113
- List <Metric > responseMetrics = getMetrics (message , channelName , activeChannels );
114
- context .transformAndPrintMetrics (responseMetrics );
194
+ updateMetrics (message , channelName , channelType , activeChannels );
115
195
}
116
196
} catch (PCFException pcfe ) {
117
197
if (pcfe .getReason () == MQRCCF_CHL_STATUS_NOT_FOUND ) {
@@ -135,35 +215,40 @@ public void run() {
135
215
136
216
logger .info (
137
217
"Active Channels in queueManager {} are {}" , context .getQueueManagerName (), activeChannels );
138
- Metric activeChannelsCountMetric =
139
- metricCreator .createMetric (
140
- "mq.manager.active.channels" , activeChannels .size (), Attributes .empty ());
141
- context .transformAndPrintMetric (activeChannelsCountMetric );
218
+ activeChannelsGauge .set (
219
+ activeChannels .size (),
220
+ Attributes .of (AttributeKey .stringKey ("queue.manager" ), context .getQueueManagerName ()));
142
221
143
222
long exitTime = System .currentTimeMillis () - entryTime ;
144
223
logger .debug ("Time taken to publish metrics for all channels is {} milliseconds" , exitTime );
145
224
}
146
225
147
- private @ NotNull List < Metric > getMetrics (
148
- PCFMessage message , String channelName , List <String > activeChannels ) throws PCFException {
149
- List < Metric > responseMetrics = Lists . newArrayList ();
226
+ private void updateMetrics (
227
+ PCFMessage message , String channelName , String channelType , List <String > activeChannels )
228
+ throws PCFException {
150
229
{
151
230
int received = message .getIntParameterValue (CMQCFC .MQIACH_MSGS );
152
- Metric metric =
153
- metricCreator .createMetric (
154
- "mq.message.received.count" ,
155
- received ,
156
- Attributes .of (AttributeKey .stringKey ("channel.name" ), channelName ));
157
- responseMetrics .add (metric );
231
+ receivedCountGauge .set (
232
+ received ,
233
+ Attributes .of (
234
+ AttributeKey .stringKey ("channel.name" ),
235
+ channelName ,
236
+ AttributeKey .stringKey ("channel.type" ),
237
+ channelType ,
238
+ AttributeKey .stringKey ("queue.manager" ),
239
+ context .getQueueManagerName ()));
158
240
}
159
241
{
160
242
int status = message .getIntParameterValue (CMQCFC .MQIACH_CHANNEL_STATUS );
161
- Metric metric =
162
- metricCreator .createMetric (
163
- "mq.status" ,
164
- status ,
165
- Attributes .of (AttributeKey .stringKey ("channel.name" ), channelName ));
166
- responseMetrics .add (metric );
243
+ channelStatusGauge .set (
244
+ status ,
245
+ Attributes .of (
246
+ AttributeKey .stringKey ("channel.name" ),
247
+ channelName ,
248
+ AttributeKey .stringKey ("channel.type" ),
249
+ channelType ,
250
+ AttributeKey .stringKey ("queue.manager" ),
251
+ context .getQueueManagerName ()));
167
252
// We follow the definition of active channel as documented in
168
253
// https://www.ibm.com/docs/en/ibm-mq/9.2.x?topic=states-current-active
169
254
if (status != CMQCFC .MQCHS_RETRYING
@@ -174,64 +259,81 @@ public void run() {
174
259
}
175
260
{
176
261
int bytesSent = message .getIntParameterValue (CMQCFC .MQIACH_BYTES_SENT );
177
- Metric metric =
178
- metricCreator .createMetric (
179
- "mq.byte.sent" ,
180
- bytesSent ,
181
- Attributes .of (AttributeKey .stringKey ("channel.name" ), channelName ));
182
- responseMetrics .add (metric );
262
+ byteSentGauge .set (
263
+ bytesSent ,
264
+ Attributes .of (
265
+ AttributeKey .stringKey ("channel.name" ),
266
+ channelName ,
267
+ AttributeKey .stringKey ("channel.type" ),
268
+ channelType ,
269
+ AttributeKey .stringKey ("queue.manager" ),
270
+ context .getQueueManagerName ()));
183
271
}
184
272
{
185
273
int bytesReceived = message .getIntParameterValue (CMQCFC .MQIACH_BYTES_RECEIVED );
186
- Metric metric =
187
- metricCreator .createMetric (
188
- "mq.byte.received" ,
189
- bytesReceived ,
190
- Attributes .of (AttributeKey .stringKey ("channel.name" ), channelName ));
191
- responseMetrics .add (metric );
274
+ byteReceivedGauge .set (
275
+ bytesReceived ,
276
+ Attributes .of (
277
+ AttributeKey .stringKey ("channel.name" ),
278
+ channelName ,
279
+ AttributeKey .stringKey ("channel.type" ),
280
+ channelType ,
281
+ AttributeKey .stringKey ("queue.manager" ),
282
+ context .getQueueManagerName ()));
192
283
}
193
284
{
194
285
int buffersSent = message .getIntParameterValue (CMQCFC .MQIACH_BUFFERS_SENT );
195
- Metric metric =
196
- metricCreator .createMetric (
197
- "mq.buffers.sent" ,
198
- buffersSent ,
199
- Attributes .of (AttributeKey .stringKey ("channel.name" ), channelName ));
200
- responseMetrics .add (metric );
286
+ buffersSentGauge .set (
287
+ buffersSent ,
288
+ Attributes .of (
289
+ AttributeKey .stringKey ("channel.name" ),
290
+ channelName ,
291
+ AttributeKey .stringKey ("channel.type" ),
292
+ channelType ,
293
+ AttributeKey .stringKey ("queue.manager" ),
294
+ context .getQueueManagerName ()));
201
295
}
202
296
{
203
297
int buffersReceived = message .getIntParameterValue (CMQCFC .MQIACH_BUFFERS_RECEIVED );
204
- Metric metric =
205
- metricCreator .createMetric (
206
- "mq.buffers.received" ,
207
- buffersReceived ,
208
- Attributes .of (AttributeKey .stringKey ("channel.name" ), channelName ));
209
- responseMetrics .add (metric );
298
+ buffersReceivedGauge .set (
299
+ buffersReceived ,
300
+ Attributes .of (
301
+ AttributeKey .stringKey ("channel.name" ),
302
+ channelName ,
303
+ AttributeKey .stringKey ("channel.type" ),
304
+ channelType ,
305
+ AttributeKey .stringKey ("queue.manager" ),
306
+ context .getQueueManagerName ()));
210
307
}
211
308
{
212
309
int currentSharingConvs = 0 ;
213
310
if (message .getParameter (CMQCFC .MQIACH_CURRENT_SHARING_CONVS ) != null ) {
214
311
currentSharingConvs = message .getIntParameterValue (CMQCFC .MQIACH_CURRENT_SHARING_CONVS );
215
312
}
216
- Metric metric =
217
- metricCreator .createMetric (
218
- "mq.current.sharing.conversations" ,
219
- currentSharingConvs ,
220
- Attributes .of (AttributeKey .stringKey ("channel.name" ), channelName ));
221
- responseMetrics .add (metric );
313
+ currentSharingConvsGauge .set (
314
+ currentSharingConvs ,
315
+ Attributes .of (
316
+ AttributeKey .stringKey ("channel.name" ),
317
+ channelName ,
318
+ AttributeKey .stringKey ("channel.type" ),
319
+ channelType ,
320
+ AttributeKey .stringKey ("queue.manager" ),
321
+ context .getQueueManagerName ()));
222
322
}
223
323
{
224
324
int maxSharingConvs = 0 ;
225
325
if (message .getParameter (CMQCFC .MQIACH_MAX_SHARING_CONVS ) != null ) {
226
326
maxSharingConvs = message .getIntParameterValue (CMQCFC .MQIACH_MAX_SHARING_CONVS );
227
327
}
228
- Metric metric =
229
- metricCreator .createMetric (
230
- "mq.max.sharing.conversations" ,
231
- maxSharingConvs ,
232
- Attributes .of (AttributeKey .stringKey ("channel.name" ), channelName ));
233
- responseMetrics .add (metric );
328
+ maxSharingConvsGauge .set (
329
+ maxSharingConvs ,
330
+ Attributes .of (
331
+ AttributeKey .stringKey ("channel.name" ),
332
+ channelName ,
333
+ AttributeKey .stringKey ("channel.type" ),
334
+ channelType ,
335
+ AttributeKey .stringKey ("queue.manager" ),
336
+ context .getQueueManagerName ()));
234
337
}
235
- return responseMetrics ;
236
338
}
237
339
}
0 commit comments