Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions ext/hermann/hermann_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,6 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
rd_kafka_set_logger(config->rk, logger);
rd_kafka_set_log_level(config->rk, LOG_DEBUG);

/* TODO: offset calculation */
config->start_offset = RD_KAFKA_OFFSET_END;

/* Add brokers */
if (rd_kafka_brokers_add(config->rk, config->brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
Expand Down Expand Up @@ -794,11 +791,13 @@ static VALUE consumer_allocate(VALUE klass) {
* @param topic VALUE a Ruby string
* @param brokers VALUE a Ruby string containing list of host:port
* @param partition VALUE a Ruby number
* @param offset VALUE a Ruby number
*/
static VALUE consumer_initialize(VALUE self,
VALUE topic,
VALUE brokers,
VALUE partition) {
VALUE partition,
VALUE offset) {

HermannInstanceConfig* consumerConfig;
char* topicPtr;
Expand All @@ -819,6 +818,17 @@ static VALUE consumer_initialize(VALUE self,
consumerConfig->exit_eof = 0;
consumerConfig->quiet = 0;

if ( FIXNUM_P(offset) ) {
consumerConfig->start_offset = FIX2LONG(offset);
} else if ( SYMBOL_P(offset) ) {
if ( offset == ID2SYM(rb_intern("start")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING;
else if ( offset == ID2SYM(rb_intern("end")) )
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
} else {
consumerConfig->start_offset = RD_KAFKA_OFFSET_END;
}

return self;
}

Expand Down Expand Up @@ -1007,7 +1017,7 @@ void Init_hermann_lib() {
rb_define_alloc_func(c_consumer, consumer_allocate);

/* Initialize */
rb_define_method(c_consumer, "initialize", consumer_initialize, 3);
rb_define_method(c_consumer, "initialize", consumer_initialize, 4);
rb_define_method(c_consumer, "initialize_copy", consumer_init_copy, 1);

/* Consumer has method 'consume' */
Expand Down
1 change: 1 addition & 0 deletions ext/hermann/hermann_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef struct HermannInstanceConfig {
int isConnected;

int isErrored;

char *error;
} HermannInstanceConfig;

Expand Down
18 changes: 15 additions & 3 deletions lib/hermann/consumer.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'hermann'
require 'hermann/errors'

if Hermann.jruby?
require 'hermann/provider/java_simple_consumer'
Expand All @@ -17,21 +18,26 @@ class Consumer
#
# @params [String] kafka topic
# @params [Hash] options for Consumer
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
# @option opts [Integer] :partition (for MRI) The kafka partition
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
# @option opts [Integer] :partition (for MRI) The kafka partition
# @option opts [Symbol|Fixnum] :offset (for MRI) Starting consumer offset. either :start, :end, or Fixnum
# @option opts [Integer] :zookeepers (for jruby) list of zookeeper servers
# @option opts [Integer] :group_id (for jruby) client group_id
#
def initialize(topic, opts = {})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you update the RDoc to include the :offset option documentation including mention of :start and :end

@topic = topic

offset = opts.delete(:offset)
raise Hermann::Errors::InvalidOffsetError.new("Bad offset: #{offset}") unless valid_offset?(offset)

if Hermann.jruby?
zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id)

@internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, group_id, topic, opts)
else
brokers, partition = require_values_at(opts, :brokers, :partition)

@internal = Hermann::Lib::Consumer.new(topic, brokers, partition)
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition, offset)
end
end

Expand All @@ -49,6 +55,12 @@ def shutdown
end
end

private

def valid_offset?(offset)
offset.nil? || offset.is_a?(Fixnum) || offset == :start || offset == :end
end

def require_values_at(opts, *args)
args.map do |a|
raise "Please provide :#{a} option!" unless opts[a]
Expand Down
3 changes: 3 additions & 0 deletions lib/hermann/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class ConfigurationError < GeneralError; end

# cannot discover brokers from zookeeper
class NoBrokersError < GeneralError; end

# offsets can only be two symbols or a fixnum
class InvalidOffsetError < GeneralError; end
end
end

25 changes: 25 additions & 0 deletions scripts/consume_msgs_loop_localhost_mri.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require 'rubygems'

$LOAD_PATH << File.dirname(__FILE__) + "/../lib"
$LOAD_PATH << File.dirname(__FILE__) + "/../ext"
require 'hermann'
require 'hermann/consumer'

t1 = 0
threads = []
100.times do |i|
threads << Thread.new do
puts "booting #{i}"
c = Hermann::Consumer.new( "maxwell", brokers: "localhost:9092", partition: i, offset: :start)
c.consume() do
|msg| puts("Received: #{msg}")
if(t1 == 0)
t1 = Time.now
end
t2 = Time.now
elapsed = t2 - t1
puts("Total elapsed time: #{elapsed} seconds")
end
end
end
threads.each(&:join)
11 changes: 10 additions & 1 deletion spec/consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'spec_helper'
require 'hermann/consumer'
require 'hermann/errors'

# XXX: Hermann::Consumer isn't really supported anywhere, MRI included right
# now
Expand All @@ -9,7 +10,8 @@
let(:topic) { 'rspec' }
let(:brokers) { 'localhost:1337' }
let(:partition) { 1 }
let(:opts) { { :brokers => brokers, :partition => partition } }
let(:offset) { nil }
let(:opts) { { :brokers => brokers, :partition => partition, :offset => offset } }


context "on C ruby", :platform => :mri do
Expand All @@ -36,6 +38,13 @@
let(:topic) { '' }
it_behaves_like 'an error condition'
end

context 'with a bad offset' do
let(:offset) { :foo }
it "raises an InvalidOffset error" do
expect { subject }.to raise_error(Hermann::Errors::InvalidOffsetError)
end
end
end

describe '#shutdown' do
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
c.formatter = :documentation

shared_context 'integration test context', :type => :integration do
let(:topic) { $integrationconf['kafka']['topic'] }
let(:topic) { "hermann_testing_" + rand(100_000_000).to_s }
let(:brokers) { $integrationconf['kafka']['brokers'] }
let(:zookeepers) { $integrationconf['zookeepers'] }
end
Expand Down