1
1
#!/usr/bin/env python
2
2
3
+ from io import StringIO
4
+ from sys import maxunicode
3
5
4
6
from ..utils import floatToGoString
5
7
from ..validation import (
8
10
9
11
CONTENT_TYPE_LATEST = 'application/openmetrics-text; version=1.0.0; charset=utf-8'
10
12
"""Content type of the latest OpenMetrics text format"""
13
+ ESCAPING_HEADER_TAG = 'escaping'
14
+
15
+
16
+ ALLOWUTF8 = 'allow-utf-8'
17
+ UNDERSCORES = 'underscores'
18
+ DOTS = 'dots'
19
+ VALUES = 'values'
11
20
12
21
13
22
def _is_valid_exemplar_metric (metric , sample ):
@@ -20,30 +29,36 @@ def _is_valid_exemplar_metric(metric, sample):
20
29
return False
21
30
22
31
23
- def generate_latest (registry ):
32
+ def generate_latest_fn (escaping ):
33
+ '''Returns a generate_latest function that will always use the given escaping.'''
34
+ return lambda registry : generate_latest (registry , escaping )
35
+
36
+
37
+ def generate_latest (registry , escaping ):
24
38
'''Returns the metrics from the registry in latest text format as a string.'''
25
39
output = []
26
40
for metric in registry .collect ():
27
41
try :
28
42
mname = metric .name
29
43
output .append ('# HELP {} {}\n ' .format (
30
- escape_metric_name (mname ), _escape (metric .documentation )))
31
- output .append (f'# TYPE { escape_metric_name (mname )} { metric .type } \n ' )
44
+ escape_metric_name (mname , escaping ), _escape (metric .documentation , ALLOWUTF8 , False )))
45
+ output .append (f'# TYPE { escape_metric_name (mname , escaping )} { metric .type } \n ' )
32
46
if metric .unit :
33
- output .append (f'# UNIT { escape_metric_name (mname )} { metric .unit } \n ' )
47
+ output .append (f'# UNIT { escape_metric_name (mname , escaping )} { metric .unit } \n ' )
34
48
for s in metric .samples :
35
- if not _is_valid_legacy_metric_name (s .name ):
36
- labelstr = escape_metric_name (s .name )
49
+ if escaping == ALLOWUTF8 and not _is_valid_legacy_metric_name (s .name ):
50
+ labelstr = escape_metric_name (s .name , escaping )
37
51
if s .labels :
38
52
labelstr += ', '
39
53
else :
40
54
labelstr = ''
41
55
42
56
if s .labels :
43
57
items = sorted (s .labels .items ())
58
+ # Label values always support UTF-8
44
59
labelstr += ',' .join (
45
60
['{}="{}"' .format (
46
- escape_label_name (k ), _escape (v ))
61
+ escape_label_name (k , escaping ), _escape (v , ALLOWUTF8 , False ))
47
62
for k , v in items ])
48
63
if labelstr :
49
64
labelstr = "{" + labelstr + "}"
@@ -71,9 +86,9 @@ def generate_latest(registry):
71
86
timestamp = ''
72
87
if s .timestamp is not None :
73
88
timestamp = f' { s .timestamp } '
74
- if _is_valid_legacy_metric_name (s .name ):
89
+ if ( escaping != ALLOWUTF8 ) or _is_valid_legacy_metric_name (s .name ):
75
90
output .append ('{}{} {}{}{}\n ' .format (
76
- s .name ,
91
+ _escape ( s .name , escaping , False ) ,
77
92
labelstr ,
78
93
floatToGoString (s .value ),
79
94
timestamp ,
@@ -94,24 +109,114 @@ def generate_latest(registry):
94
109
return '' .join (output ).encode ('utf-8' )
95
110
96
111
97
- def escape_metric_name (s : str ) -> str :
112
+ def escape_metric_name (s : str , escaping : str ) -> str :
98
113
"""Escapes the metric name and puts it in quotes iff the name does not
99
114
conform to the legacy Prometheus character set.
100
115
"""
101
- if _is_valid_legacy_metric_name (s ):
116
+ if len (s ) == 0 :
102
117
return s
103
- return '"{}"' .format (_escape (s ))
118
+ if escaping == ALLOWUTF8 :
119
+ if not _is_valid_legacy_metric_name (s ):
120
+ return '"{}"' .format (_escape (s , escaping , False ))
121
+ return _escape (s , escaping , False )
122
+ elif escaping == UNDERSCORES :
123
+ if _is_valid_legacy_metric_name (s ):
124
+ return s
125
+ return _escape (s , escaping , False )
126
+ elif escaping == DOTS :
127
+ return _escape (s , escaping , False )
128
+ elif escaping == VALUES :
129
+ if _is_valid_legacy_metric_name (s ):
130
+ return s
131
+ return _escape (s , escaping , False )
132
+ return s
104
133
105
134
106
- def escape_label_name (s : str ) -> str :
135
+ def escape_label_name (s : str , escaping : str ) -> str :
107
136
"""Escapes the label name and puts it in quotes iff the name does not
108
137
conform to the legacy Prometheus character set.
109
138
"""
110
- if _is_valid_legacy_labelname (s ):
139
+ if len (s ) == 0 :
111
140
return s
112
- return '"{}"' .format (_escape (s ))
141
+ if escaping == ALLOWUTF8 :
142
+ if not _is_valid_legacy_labelname (s ):
143
+ return '"{}"' .format (_escape (s , escaping , True ))
144
+ return _escape (s , escaping , True )
145
+ elif escaping == UNDERSCORES :
146
+ if _is_valid_legacy_labelname (s ):
147
+ return s
148
+ return _escape (s , escaping , True )
149
+ elif escaping == DOTS :
150
+ return _escape (s , escaping , True )
151
+ elif escaping == VALUES :
152
+ if _is_valid_legacy_labelname (s ):
153
+ return s
154
+ return _escape (s , escaping , True )
155
+ return s
113
156
114
157
115
- def _escape (s : str ) -> str :
158
+ def _escape (s : str , escaping : str , is_labelname : bool ) -> str :
116
159
"""Performs backslash escaping on backslash, newline, and double-quote characters."""
117
- return s .replace ('\\ ' , r'\\' ).replace ('\n ' , r'\n' ).replace ('"' , r'\"' )
160
+ if escaping == ALLOWUTF8 :
161
+ return s .replace ('\\ ' , r'\\' ).replace ('\n ' , r'\n' ).replace ('"' , r'\"' )
162
+ elif escaping == UNDERSCORES :
163
+ escaped = StringIO ()
164
+ for i , b in enumerate (s ):
165
+ if _is_valid_legacy_rune (b , i , is_labelname ):
166
+ escaped .write (b )
167
+ else :
168
+ escaped .write ('_' )
169
+ return escaped .getvalue ()
170
+ elif escaping == DOTS :
171
+ escaped = StringIO ()
172
+ for i , b in enumerate (s ):
173
+ if b == '_' :
174
+ escaped .write ('__' )
175
+ elif b == '.' :
176
+ escaped .write ('_dot_' )
177
+ elif _is_valid_legacy_rune (b , i , is_labelname ):
178
+ escaped .write (b )
179
+ else :
180
+ escaped .write ('__' )
181
+ return escaped .getvalue ()
182
+ elif escaping == VALUES :
183
+ escaped = StringIO ()
184
+ escaped .write ("U__" )
185
+ for i , b in enumerate (s ):
186
+ if b == '_' :
187
+ escaped .write ("__" )
188
+ elif _is_valid_legacy_rune (b , i , is_labelname ):
189
+ escaped .write (b )
190
+ elif not _is_valid_utf8 (b ):
191
+ escaped .write ("_FFFD_" )
192
+ else :
193
+ escaped .write ('_' )
194
+ escaped .write (format (ord (b ), 'x' ))
195
+ escaped .write ('_' )
196
+ return escaped .getvalue ()
197
+ return s
198
+
199
+
200
+ def _is_valid_legacy_rune (b : str , i : int , is_labelname : bool ) -> bool :
201
+ if len (b ) != 1 :
202
+ raise ValueError ("Input 'b' must be a single character." )
203
+ if (
204
+ ('a' <= b <= 'z' )
205
+ or ('A' <= b <= 'Z' )
206
+ or (b == '_' )
207
+ or ('0' <= b <= '9' and i > 0 )
208
+ ):
209
+ return True
210
+ return not is_labelname and b == ':'
211
+
212
+
213
+ _SURROGATE_MIN = 0xD800
214
+ _SURROGATE_MAX = 0xDFFF
215
+
216
+
217
+ def _is_valid_utf8 (s : str ) -> bool :
218
+ if 0 <= ord (s ) < _SURROGATE_MIN :
219
+ return True
220
+ if _SURROGATE_MAX < ord (s ) <= maxunicode :
221
+ return True
222
+ return False
0 commit comments