Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions ext/hermann/hermann_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,6 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
rd_kafka_set_logger(config->rk, logger);
rd_kafka_set_log_level(config->rk, LOG_DEBUG);

/* TODO: offset calculation */
config->start_offset = RD_KAFKA_OFFSET_END;

/* Add brokers */
if (rd_kafka_brokers_add(config->rk, config->brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
Expand Down Expand Up @@ -794,11 +791,13 @@ static VALUE consumer_allocate(VALUE klass) {
* @param topic VALUE a Ruby string
* @param brokers VALUE a Ruby string containing list of host:port
* @param partition VALUE a Ruby number
* @param offset VALUE a Ruby number
*/
static VALUE consumer_initialize(VALUE self,
VALUE topic,
VALUE brokers,
VALUE partition) {
VALUE partition,
VALUE offset) {

HermannInstanceConfig* consumerConfig;
char* topicPtr;
Expand All @@ -819,6 +818,17 @@ static VALUE consumer_initialize(VALUE self,
consumerConfig->exit_eof = 0;
consumerConfig->quiet = 0;

if ( FIXNUM_P(offset) ) {
consumerConfig->start_offset = FIX2LONG(offset);
} else if ( SYMBOL_P(offset) ) {
if ( offset == ID2SYM(rb_intern("start")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING;
else if ( offset == ID2SYM(rb_intern("end")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
} else {
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
}

return self;
}

Expand Down Expand Up @@ -1007,7 +1017,7 @@ void Init_hermann_lib() {
rb_define_alloc_func(c_consumer, consumer_allocate);

/* Initialize */
rb_define_method(c_consumer, "initialize", consumer_initialize, 3);
rb_define_method(c_consumer, "initialize", consumer_initialize, 4);
rb_define_method(c_consumer, "initialize_copy", consumer_init_copy, 1);

/* Consumer has method 'consume' */
Expand Down
1 change: 1 addition & 0 deletions ext/hermann/hermann_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef struct HermannInstanceConfig {
int isConnected;

int isErrored;

char *error;
} HermannInstanceConfig;

Expand Down
12 changes: 11 additions & 1 deletion lib/hermann/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ class Consumer
#
def initialize(topic, opts = {})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you update the RDoc to include the :offset option documentation including mention of :start and :end

@topic = topic

offset = opts.delete(:offset)
raise "Bad offset: #{offset}" unless valid_offset?(offset)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind typing this exception and defining it in lib/hermann/errors.rb?


if Hermann.jruby?
zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id)

@internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, group_id, topic, opts)
else
brokers, partition = require_values_at(opts, :brokers, :partition)

@internal = Hermann::Lib::Consumer.new(topic, brokers, partition)
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition, offset)
end
end

Expand All @@ -49,6 +53,12 @@ def shutdown
end
end

private

def valid_offset?(offset)
offset.nil? || offset.is_a?(Fixnum) || offset == :start || offset == :end
end

def require_values_at(opts, *args)
args.map do |a|
raise "Please provide :#{a} option!" unless opts[a]
Expand Down
25 changes: 25 additions & 0 deletions scripts/consume_msgs_loop_localhost_mri.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require 'rubygems'

$LOAD_PATH << File.dirname(__FILE__) + "/../lib"
$LOAD_PATH << File.dirname(__FILE__) + "/../ext"
require 'hermann'
require 'hermann/consumer'

t1 = 0
threads = []
100.times do |i|
threads << Thread.new do
puts "booting #{i}"
c = Hermann::Consumer.new( "maxwell", brokers: "localhost:9092", partition: i, offset: :start)
c.consume() do
|msg| puts("Received: #{msg}")
if(t1 == 0)
t1 = Time.now
end
t2 = Time.now
elapsed = t2 - t1
puts("Total elapsed time: #{elapsed} seconds")
end
end
end
threads.each(&:join)
8 changes: 7 additions & 1 deletion spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
let(:partition) { 1 }
let(:opts) { { :brokers => brokers, :partition => partition } }
let(:offset) { nil }
let(:opts) { { :brokers => brokers, :partition => partition, :offset => offset } }


context "on C ruby", :platform => :mri do
Expand All @@ -36,6 +37,11 @@
let(:topic) { '' }
it_behaves_like 'an error condition'
end

context 'with a bad offset' do
let(:offset) { :foo }
it_behaves_like 'an error condition'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this test needs to be updated per Travis failures

end
end

describe '#shutdown' do
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
c.formatter = :documentation

shared_context 'integration test context', :type => :integration do
let(:topic) { $integrationconf['kafka']['topic'] }
let(:topic) { "hermann_testing_" + rand(100_000_000).to_s }
let(:brokers) { $integrationconf['kafka']['brokers'] }
let(:zookeepers) { $integrationconf['zookeepers'] }
end
Expand Down