Skip to content

Commit 3378620

Browse files
committed
Reorganize event models and add comprehensive test coverage
Move InboxEvent and OutboxEvent to models directory for better code organization. Update all require references to reflect new paths. Add extensive test suites for: - Consumer components (inbox processor, repository, subscription manager) - Publisher outbox repository - Topology management (stream, overlap guard) - Core utilities (debug helper, model codec setup) Enhance publisher with improved event handling and validation. Update RuboCop configuration and spec helper. All tests passing with 83.83% code coverage.
1 parent 2272e82 commit 3378620

21 files changed

+3144
-63
lines changed

.rubocop.yml

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,23 @@ plugins:
77

88
AllCops:
99
NewCops: enable
10-
TargetRubyVersion: 3.2 # match gemspec minimum to avoid false positives
10+
TargetRubyVersion: 3.2 # match gemspec minimum to avoid false positives
1111
SuggestExtensions: false
1212
Exclude:
13-
- 'bin/*'
14-
- 'pkg/**/*'
15-
- 'tmp/**/*'
16-
- 'vendor/**/*'
17-
- 'spec/fixtures/**/*'
13+
- "bin/*"
14+
- "pkg/**/*"
15+
- "tmp/**/*"
16+
- "vendor/**/*"
17+
- "spec/fixtures/**/*"
1818

1919
# ---------- Layout & Style ----------
2020
Layout/LineLength:
2121
Max: 120
2222
Exclude:
23-
- 'spec/**/*' # allow longer expectation/setup lines
23+
- "spec/**/*" # allow longer expectation/setup lines
2424

2525
Style/Documentation:
26-
Enabled: false # don’t force top-level docs for every class/module in a gem
26+
Enabled: false # don’t force top-level docs for every class/module in a gem
2727

2828
Style/FrozenStringLiteralComment:
2929
Enabled: true
@@ -41,28 +41,28 @@ Style/HashSyntax:
4141
# ---------- Metrics (balanced for libs) ----------
4242
Metrics/BlockLength:
4343
Exclude:
44-
- 'spec/**/*'
45-
- 'Rakefile'
46-
- 'tasks/**/*.rake'
47-
- 'lib/jetstream_bridge/tasks/**/*.rake'
48-
- 'jetstream_bridge.gemspec'
44+
- "spec/**/*"
45+
- "Rakefile"
46+
- "tasks/**/*.rake"
47+
- "lib/jetstream_bridge/tasks/**/*.rake"
48+
- "jetstream_bridge.gemspec"
4949

5050
Metrics/MethodLength:
5151
Max: 25
5252
Exclude:
53-
- 'lib/jetstream_bridge/**/publisher*.rb'
54-
- 'lib/jetstream_bridge/consumer/**/*.rb'
55-
- 'lib/jetstream_bridge/core/debug_helper.rb'
56-
- 'lib/jetstream_bridge/topology/stream.rb'
57-
- 'lib/generators/jetstream_bridge/health_check/health_check_generator.rb'
53+
- "lib/jetstream_bridge/**/publisher*.rb"
54+
- "lib/jetstream_bridge/consumer/**/*.rb"
55+
- "lib/jetstream_bridge/core/debug_helper.rb"
56+
- "lib/jetstream_bridge/topology/stream.rb"
57+
- "lib/generators/jetstream_bridge/health_check/health_check_generator.rb"
5858

5959
Metrics/AbcSize:
6060
Max: 25
6161
Exclude:
62-
- 'lib/jetstream_bridge/**/publisher*.rb'
63-
- 'lib/jetstream_bridge/consumer/**/*.rb'
64-
- 'lib/jetstream_bridge/core/config.rb'
65-
- 'lib/jetstream_bridge/core/debug_helper.rb'
62+
- "lib/jetstream_bridge/**/publisher*.rb"
63+
- "lib/jetstream_bridge/consumer/**/*.rb"
64+
- "lib/jetstream_bridge/core/config.rb"
65+
- "lib/jetstream_bridge/core/debug_helper.rb"
6666

6767
Metrics/CyclomaticComplexity:
6868
Max: 10
@@ -71,11 +71,11 @@ Metrics/PerceivedComplexity:
7171
Max: 10
7272

7373
Metrics/ClassLength:
74-
Max: 150
74+
Max: 155
7575

7676
Metrics/ParameterLists:
7777
Max: 5
78-
CountKeywordArgs: false # don't count keyword args for initialization methods
78+
CountKeywordArgs: false # allow more keyword args for clarity
7979

8080
# ---------- Naming ----------
8181
Naming/PredicateMethod:
@@ -86,23 +86,23 @@ Naming/PredicateMethod:
8686

8787
# ---------- Lint ----------
8888
Lint/IneffectiveAccessModifier:
89-
Enabled: false # Allow class methods after private
89+
Enabled: false # Allow class methods after private
9090

9191
Lint/UselessConstantScoping:
9292
Enabled: false
9393

9494
# ---------- Style ----------
9595
Style/MultilineBlockChain:
9696
Exclude:
97-
- 'spec/**/*' # allow chaining in tests for better readability
97+
- "spec/**/*" # allow chaining in tests for better readability
9898

9999
# Optional: relax just for low-level I/O integration code
100100
# (you can delete this once you refactor)
101101

102102
# ---------- Packaging / Gemspec ----------
103103
# These ship with RuboCop core and rubocop-packaging
104104
Gemspec/RequiredRubyVersion:
105-
Enabled: true # aligns with your gemspec `required_ruby_version`
105+
Enabled: true # aligns with your gemspec `required_ruby_version`
106106

107107
Gemspec/DevelopmentDependencies:
108108
Enabled: true

lib/jetstream_bridge.rb

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
require_relative 'jetstream_bridge/railtie' if defined?(Rails::Railtie)
1313

1414
# Load gem-provided models from lib/
15-
require_relative 'jetstream_bridge/inbox_event'
16-
require_relative 'jetstream_bridge/outbox_event'
15+
require_relative 'jetstream_bridge/models/inbox_event'
16+
require_relative 'jetstream_bridge/models/outbox_event'
1717

1818
# JetstreamBridge main module.
1919
module JetstreamBridge
@@ -101,6 +101,74 @@ def stream_info
101101
fetch_stream_info
102102
end
103103

104+
# Convenience method to publish events
105+
#
106+
# Supports three usage patterns:
107+
#
108+
# 1. Structured parameters (recommended):
109+
# JetstreamBridge.publish(resource_type: 'user', event_type: 'created', payload: { id: 1, name: 'Ada' })
110+
#
111+
# 2. Simplified hash (infers resource_type from event_type):
112+
# JetstreamBridge.publish(event_type: 'user.created', payload: { id: 1, name: 'Ada' })
113+
#
114+
# 3. Complete envelope (advanced):
115+
# JetstreamBridge.publish({ event_type: 'created', resource_type: 'user', payload: {...}, event_id: '...' })
116+
#
117+
# @param event_or_hash [Hash, nil] Event hash or first positional argument
118+
# @param resource_type [String, nil] Resource type (e.g., 'user', 'order')
119+
# @param event_type [String, nil] Event type (e.g., 'created', 'updated', 'user.created')
120+
# @param payload [Hash, nil] Event payload data
121+
# @param subject [String, nil] Optional subject override
122+
# @param options [Hash] Additional options (event_id, occurred_at, trace_id)
123+
# @return [Boolean] true if published successfully
124+
def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **)
125+
publisher = Publisher.new
126+
publisher.publish(event_or_hash, resource_type: resource_type, event_type: event_type, payload: payload,
127+
subject: subject, **)
128+
end
129+
130+
# Convenience method to start consuming messages
131+
#
132+
# Supports two usage patterns:
133+
#
134+
# 1. With a block (recommended):
135+
# consumer = JetstreamBridge.subscribe do |event, context|
136+
# puts "Received: #{event['event_type']}"
137+
# end
138+
# consumer.run!
139+
#
140+
# 2. With auto-run (returns Thread):
141+
# thread = JetstreamBridge.subscribe(run: true) do |event, context|
142+
# puts "Received: #{event['event_type']}"
143+
# end
144+
# thread.join # Wait for consumer to finish
145+
#
146+
# 3. With a handler object:
147+
# handler = ->(event, context) { puts event['event_type'] }
148+
# consumer = JetstreamBridge.subscribe(handler)
149+
# consumer.run!
150+
#
151+
# @param handler [Proc, #call, nil] Message handler (optional if block given)
152+
# @param run [Boolean] If true, automatically runs consumer in a background thread
153+
# @param durable_name [String, nil] Optional durable consumer name override
154+
# @param batch_size [Integer, nil] Optional batch size override
155+
# @yield [event, context] Yields event and context to block if no handler provided
156+
# @return [Consumer, Thread] Consumer instance or Thread if run: true
157+
def subscribe(handler = nil, run: false, durable_name: nil, batch_size: nil, &block)
158+
handler ||= block
159+
raise ArgumentError, 'Handler or block required' unless handler
160+
161+
consumer = Consumer.new(handler, durable_name: durable_name, batch_size: batch_size)
162+
163+
if run
164+
thread = Thread.new { consumer.run! }
165+
thread.abort_on_exception = true
166+
thread
167+
else
168+
consumer
169+
end
170+
end
171+
104172
private
105173

106174
def fetch_stream_info

lib/jetstream_bridge/consumer/consumer.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ class Consumer
1919
IDLE_SLEEP_SECS = 0.05
2020
MAX_IDLE_BACKOFF_SECS = 1.0
2121

22-
def initialize(durable_name: nil, batch_size: nil, &block)
23-
raise ArgumentError, 'handler block required' unless block_given?
22+
attr_reader :durable, :batch_size
23+
24+
def initialize(handler = nil, durable_name: nil, batch_size: nil, &block)
25+
@handler = handler || block
26+
raise ArgumentError, 'handler or block required' unless @handler
2427

25-
@handler = block
2628
@batch_size = Integer(batch_size || DEFAULT_BATCH_SIZE)
2729
@durable = durable_name || JetstreamBridge.config.durable_name
2830
@idle_backoff = IDLE_SLEEP_SECS

lib/jetstream_bridge/core/connection.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,10 @@ def connect!
8181
# Public API for checking connection status
8282
# @return [Boolean] true if NATS client is connected and JetStream is healthy
8383
def connected?
84-
@nc&.connected? && @jts && jetstream_healthy?
84+
return false unless @nc&.connected?
85+
return false unless @jts
86+
87+
jetstream_healthy?
8588
end
8689

8790
# Public API for getting connection timestamp
File renamed without changes.
File renamed without changes.

lib/jetstream_bridge/publisher/publisher.rb

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,92 @@ def initialize(retry_strategy: nil)
1717
@retry_strategy = retry_strategy || PublisherRetryStrategy.new
1818
end
1919

20+
# Publishes an event to NATS JetStream.
21+
#
22+
# Supports two usage patterns:
23+
#
24+
# 1. Structured parameters (recommended):
25+
# publish(resource_type: 'user', event_type: 'created', payload: { id: 1, name: 'Ada' })
26+
#
27+
# 2. Hash/envelope (advanced):
28+
# publish({ event_type: 'user.created', payload: {...}, event_id: '...', ... }, subject: 'custom.subject')
29+
#
30+
# @param event_or_hash [Hash] Either structured params or a complete event envelope
31+
# @param resource_type [String, nil] Resource type (e.g., 'user', 'order')
32+
# @param event_type [String, nil] Event type (e.g., 'created', 'updated')
33+
# @param payload [Hash, nil] Event payload data
34+
# @param subject [String, nil] Optional subject override
2035
# @return [Boolean]
21-
def publish(resource_type:, event_type:, payload:, **options)
36+
def publish(event_or_hash = nil, resource_type: nil, event_type: nil, payload: nil, subject: nil, **options)
2237
ensure_destination!
23-
envelope = build_envelope(resource_type, event_type, payload, options)
24-
subject = JetstreamBridge.config.source_subject
2538

39+
params = { event_or_hash: event_or_hash, resource_type: resource_type, event_type: event_type,
40+
payload: payload, subject: subject, options: options }
41+
envelope, resolved_subject = route_publish_params(params)
42+
43+
do_publish(resolved_subject, envelope)
44+
rescue ArgumentError
45+
# Re-raise validation errors
46+
raise
47+
rescue StandardError => e
48+
# Only catch publishing errors
49+
log_error(false, e)
50+
end
51+
52+
def do_publish(subject, envelope)
2653
if JetstreamBridge.config.use_outbox
2754
publish_via_outbox(subject, envelope)
2855
else
2956
with_retries { publish_to_nats(subject, envelope) }
3057
end
31-
rescue StandardError => e
32-
log_error(false, e)
3358
end
3459

3560
private
3661

62+
# Routes publish parameters to appropriate envelope builder
63+
# @return [Array<Hash, String>] tuple of [envelope, subject]
64+
def route_publish_params(params)
65+
if structured_params?(params)
66+
build_from_structured_params(params)
67+
elsif keyword_or_hash_params?(params)
68+
build_from_keyword_or_hash(params)
69+
else
70+
raise ArgumentError, 'Either provide (resource_type:, event_type:, payload:) or an event hash'
71+
end
72+
end
73+
74+
def structured_params?(params)
75+
params[:resource_type] && params[:event_type] && params[:payload]
76+
end
77+
78+
def keyword_or_hash_params?(params)
79+
params[:event_type] || params[:payload] || params[:event_or_hash].is_a?(Hash)
80+
end
81+
82+
def build_from_structured_params(params)
83+
envelope = build_envelope(params[:resource_type], params[:event_type], params[:payload], params[:options])
84+
resolved_subject = params[:subject] || JetstreamBridge.config.source_subject
85+
[envelope, resolved_subject]
86+
end
87+
88+
def build_from_keyword_or_hash(params)
89+
envelope = if params[:event_or_hash].is_a?(Hash)
90+
normalize_envelope(params[:event_or_hash], params[:options])
91+
else
92+
build_from_keywords(params[:event_type], params[:payload], params[:options])
93+
end
94+
95+
resolved_subject = params[:subject] || params[:options][:subject] || JetstreamBridge.config.source_subject
96+
[envelope, resolved_subject]
97+
end
98+
99+
def build_from_keywords(event_type, payload, options)
100+
raise ArgumentError, 'event_type is required' unless event_type
101+
raise ArgumentError, 'payload is required' unless payload
102+
103+
normalize_envelope({ 'event_type' => event_type, 'payload' => payload }, options)
104+
end
105+
37106
def ensure_destination!
38107
return unless JetstreamBridge.config.destination_app.to_s.empty?
39108

@@ -123,5 +192,36 @@ def build_envelope(resource_type, event_type, payload, options = {})
123192
'payload' => payload
124193
}
125194
end
195+
196+
# Normalize a hash to match envelope structure, allowing partial envelopes
197+
def normalize_envelope(hash, options = {})
198+
hash = hash.transform_keys(&:to_s)
199+
200+
# Allow shorthand: if only 'event_type' and 'payload', infer resource_type
201+
if hash['event_type'] && hash['payload'] && !hash['resource_type']
202+
# Try to infer from dot notation (e.g., 'user.created' -> 'user')
203+
parts = hash['event_type'].split('.')
204+
hash['resource_type'] = parts[0] if parts.size > 1
205+
end
206+
207+
{
208+
'event_id' => hash['event_id'] || options[:event_id] || SecureRandom.uuid,
209+
'schema_version' => hash['schema_version'] || 1,
210+
'event_type' => hash['event_type'] || raise(ArgumentError, 'event_type is required'),
211+
'producer' => hash['producer'] || JetstreamBridge.config.app_name,
212+
'resource_id' => hash['resource_id'] || extract_resource_id(hash['payload']),
213+
'occurred_at' => hash['occurred_at'] || (options[:occurred_at] || Time.now.utc).iso8601,
214+
'trace_id' => hash['trace_id'] || options[:trace_id] || SecureRandom.hex(8),
215+
'resource_type' => hash['resource_type'] || 'event',
216+
'payload' => hash['payload'] || raise(ArgumentError, 'payload is required')
217+
}
218+
end
219+
220+
def extract_resource_id(payload)
221+
return '' unless payload
222+
223+
payload = payload.transform_keys(&:to_s) if payload.respond_to?(:transform_keys)
224+
(payload['id'] || payload[:id] || payload['resource_id'] || payload[:resource_id]).to_s
225+
end
126226
end
127227
end

spec/consumer/consumer_spec.rb

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,32 @@
2222
after { JetstreamBridge.reset! }
2323

2424
describe 'initialization' do
25-
it 'ensures and subscribes the consumer' do
25+
it 'ensures and subscribes the consumer with block' do
2626
described_class.new { |*| nil }
2727
expect(JetstreamBridge::SubscriptionManager)
2828
.to have_received(:new)
2929
.with(jts, JetstreamBridge.config.durable_name, JetstreamBridge.config)
3030
expect(sub_mgr).to have_received(:ensure_consumer!)
3131
expect(sub_mgr).to have_received(:subscribe!)
3232
end
33+
34+
it 'accepts handler as first argument' do
35+
handler = ->(event, context) { nil }
36+
consumer = described_class.new(handler)
37+
expect(consumer).to be_a(described_class)
38+
end
39+
40+
it 'raises error when neither handler nor block provided' do
41+
expect do
42+
described_class.new
43+
end.to raise_error(ArgumentError, /handler or block required/)
44+
end
45+
46+
it 'accepts custom durable_name and batch_size' do
47+
consumer = described_class.new(durable_name: 'custom-durable', batch_size: 50) { |*| nil }
48+
expect(consumer.durable).to eq('custom-durable')
49+
expect(consumer.batch_size).to eq(50)
50+
end
3351
end
3452

3553
describe '#process_batch' do

0 commit comments

Comments
 (0)