23
23
24
24
import asyncio
25
25
import functools
26
- from typing import Any , Iterable , Optional , Union
26
+ import logging
27
+ from typing import Any , Iterable , Optional , Union , Tuple
27
28
28
29
import _pulsar
29
30
30
31
import pulsar
31
- from pulsar import _listener_wrapper
32
+ from pulsar import Message , _listener_wrapper
32
33
34
+ from pulsar import schema
35
+ _schema = schema
33
36
34
37
class PulsarException (BaseException ):
35
38
"""
@@ -163,15 +166,25 @@ def topic(self) -> str:
163
166
164
167
165
168
class Consumer :
166
- def __init__ (self , consumer : _pulsar .Consumer ) -> None :
169
+ # BUG: schema stuff doesn´t work at all because 90% of the methods are missing
170
+ def __init__ (self , consumer : _pulsar .Consumer ):
167
171
self ._consumer : _pulsar .Consumer = consumer
168
172
173
+ def _prepare_logger (logger ):
174
+ import logging
175
+ def log (level , message ):
176
+ old_threads = logging .logThreads
177
+ logging .logThreads = False
178
+ logger .log (logging .getLevelName (level ), message )
179
+ logging .logThreads = old_threads
180
+ return log
181
+
169
182
async def acknowledge (self , msg : pulsar .Message ) -> None :
170
183
"""
171
184
Acknowledge the reception of a single message.
172
185
"""
173
186
future = asyncio .get_running_loop ().create_future ()
174
- self ._consumer .acknowledge_async (msg , functools .partial (_set_future , future ))
187
+ self ._consumer .acknowledge_async (msg . _message , functools .partial (_set_future , future ))
175
188
await future
176
189
177
190
async def acknowledge_cumulative (self , msg : pulsar .Message ) -> None :
@@ -186,7 +199,7 @@ async def negative_acknowledge(self, msg: pulsar.Message) -> None:
186
199
"""
187
200
Acknowledge the failure to process a single message.
188
201
"""
189
- self ._consumer .negative_acknowledge (msg )
202
+ self ._consumer .negative_acknowledge (msg . _message )
190
203
191
204
async def batch_receive (self ) -> Iterable [pulsar .Message ]:
192
205
"""
@@ -203,7 +216,12 @@ async def receive(self) -> pulsar.Message:
203
216
future = asyncio .get_running_loop ().create_future ()
204
217
205
218
self ._consumer .receive_async (functools .partial (_set_future , future ))
206
- return await future
219
+ msg = await future
220
+
221
+ m = Message ()
222
+ m ._message = msg
223
+ m ._schema = self ._schema
224
+ return m
207
225
208
226
async def close (self ):
209
227
"""
@@ -213,7 +231,7 @@ async def close(self):
213
231
self ._consumer .close_async (functools .partial (_set_future , future , value = None ))
214
232
await future
215
233
216
- async def seek (self , position : tuple [int , int , int , int ] | pulsar .MessageId ):
234
+ async def seek (self , position : Tuple [int , int , int , int ] | pulsar .MessageId ):
217
235
"""
218
236
Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. ...
219
237
"""
@@ -300,6 +318,7 @@ def __init__(self, service_url, **kwargs) -> None:
300
318
"""
301
319
assert service_url .startswith ('pulsar://' ), "The service url must start with 'pulsar://'"
302
320
self ._client = pulsar .Client (service_url , ** kwargs )._client
321
+ self ._consumers = []
303
322
304
323
async def subscribe (self , topic , subscription_name ,
305
324
consumer_type : _pulsar .ConsumerType = _pulsar .ConsumerType .Exclusive ,
@@ -325,10 +344,14 @@ async def subscribe(self, topic, subscription_name,
325
344
batch_index_ack_enabled = False ,
326
345
regex_subscription_mode : _pulsar .RegexSubscriptionMode = _pulsar .RegexSubscriptionMode .PersistentOnly ,
327
346
dead_letter_policy : Union [None , pulsar .ConsumerDeadLetterPolicy ] = None ,) -> Consumer :
347
+ print ("subscribe called" )
328
348
conf = _pulsar .ConsumerConfiguration ()
329
349
conf .consumer_type (consumer_type )
330
350
conf .regex_subscription_mode (regex_subscription_mode )
331
351
conf .read_compacted (is_read_compacted )
352
+
353
+ print ("core conf set" )
354
+
332
355
if message_listener :
333
356
conf .message_listener (_listener_wrapper (message_listener , schema ))
334
357
conf .receiver_queue_size (receiver_queue_size )
@@ -363,18 +386,36 @@ async def subscribe(self, topic, subscription_name,
363
386
if dead_letter_policy :
364
387
conf .dead_letter_policy (dead_letter_policy .policy ())
365
388
389
+ print ("opt conf set" )
390
+
366
391
future = asyncio .get_running_loop ().create_future ()
367
392
393
+ print ("future created" )
394
+
395
+ c = Consumer (None )
368
396
if isinstance (topic , str ):
397
+ print ("single" )
369
398
self ._client .subscribe_async (topic , subscription_name , conf , functools .partial (_set_future , future ))
399
+ c ._consumer = await future
370
400
elif isinstance (topic , list ):
401
+ print ("multi" )
371
402
self ._client .subscribe_topics_async (topic , subscription_name , conf , functools .partial (_set_future , future ))
403
+ c ._consumer = await future
372
404
elif isinstance (topic , pulsar ._retype ):
405
+ print ("regex" )
373
406
self ._client .subscribe_pattern_async (topic , subscription_name , conf , functools .partial (_set_future , future ))
407
+ c ._consumer = await future
374
408
else :
375
409
raise ValueError ("Argument 'topic' is expected to be of a type between (str, list, re.pattern)" )
376
410
377
- return Consumer (await future )
411
+ c ._client = self
412
+ c ._schema = schema
413
+ c ._schema .attach_client (self ._client )
414
+
415
+ print ("consumer created" )
416
+ self ._consumers .append (c )
417
+
418
+ return c
378
419
379
420
async def create_producer (self , topic ,
380
421
producer_name = None ,
0 commit comments