diff --git a/ext/hermann/hermann_lib.c b/ext/hermann/hermann_lib.c index f906ec6..41bb781 100644 --- a/ext/hermann/hermann_lib.c +++ b/ext/hermann/hermann_lib.c @@ -322,9 +322,6 @@ void consumer_init_kafka(HermannInstanceConfig* config) { rd_kafka_set_logger(config->rk, logger); rd_kafka_set_log_level(config->rk, LOG_DEBUG); - /* TODO: offset calculation */ - config->start_offset = RD_KAFKA_OFFSET_END; - /* Add brokers */ if (rd_kafka_brokers_add(config->rk, config->brokers) == 0) { fprintf(stderr, "%% No valid brokers specified\n"); @@ -794,11 +791,13 @@ static VALUE consumer_allocate(VALUE klass) { * @param topic VALUE a Ruby string * @param brokers VALUE a Ruby string containing list of host:port * @param partition VALUE a Ruby number + * @param offset VALUE a Ruby number */ static VALUE consumer_initialize(VALUE self, VALUE topic, VALUE brokers, - VALUE partition) { + VALUE partition, + VALUE offset) { HermannInstanceConfig* consumerConfig; char* topicPtr; @@ -819,6 +818,17 @@ static VALUE consumer_initialize(VALUE self, consumerConfig->exit_eof = 0; consumerConfig->quiet = 0; + if ( FIXNUM_P(offset) ) { + consumerConfig->start_offset = FIX2LONG(offset); + } else if ( SYMBOL_P(offset) ) { + if ( offset == ID2SYM(rb_intern("start")) ) + consumerConfig->start_offset = RD_KAFKA_OFFSET_BEGINNING; + else if ( offset == ID2SYM(rb_intern("end")) ) + consumerConfig->start_offset = RD_KAFKA_OFFSET_END; + } else { + consumerConfig->start_offset = RD_KAFKA_OFFSET_END; + } + return self; } @@ -1007,7 +1017,7 @@ void Init_hermann_lib() { rb_define_alloc_func(c_consumer, consumer_allocate); /* Initialize */ - rb_define_method(c_consumer, "initialize", consumer_initialize, 3); + rb_define_method(c_consumer, "initialize", consumer_initialize, 4); rb_define_method(c_consumer, "initialize_copy", consumer_init_copy, 1); /* Consumer has method 'consume' */ diff --git a/ext/hermann/hermann_lib.h b/ext/hermann/hermann_lib.h index 0a66836..e7130c7 100644 --- a/ext/hermann/hermann_lib.h +++ b/ext/hermann/hermann_lib.h @@ -92,6 +92,7 @@ typedef struct HermannInstanceConfig { int isConnected; int isErrored; + char *error; } HermannInstanceConfig; diff --git a/lib/hermann/consumer.rb b/lib/hermann/consumer.rb index 3283eca..4f2fdef 100644 --- a/lib/hermann/consumer.rb +++ b/lib/hermann/consumer.rb @@ -1,4 +1,5 @@ require 'hermann' +require 'hermann/errors' if Hermann.jruby? require 'hermann/provider/java_simple_consumer' @@ -17,13 +18,18 @@ class Consumer # # @params [String] kafka topic # @params [Hash] options for Consumer - # @option opts [String] :brokers (for MRI) Comma separated list of brokers - # @option opts [Integer] :partition (for MRI) The kafka partition + # @option opts [String] :brokers (for MRI) Comma separated list of brokers + # @option opts [Integer] :partition (for MRI) The kafka partition + # @option opts [Symbol|Fixnum] :offset (for MRI) Starting consumer offset. either :start, :end, or Fixnum # @option opts [Integer] :zookeepers (for jruby) list of zookeeper servers # @option opts [Integer] :group_id (for jruby) client group_id # def initialize(topic, opts = {}) @topic = topic + + offset = opts.delete(:offset) + raise Hermann::Errors::InvalidOffsetError.new("Bad offset: #{offset}") unless valid_offset?(offset) + if Hermann.jruby? zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id) @@ -31,7 +37,7 @@ def initialize(topic, opts = {}) else brokers, partition = require_values_at(opts, :brokers, :partition) - @internal = Hermann::Lib::Consumer.new(topic, brokers, partition) + @internal = Hermann::Lib::Consumer.new(topic, brokers, partition, offset) end end @@ -49,6 +55,12 @@ def shutdown end end + private + + def valid_offset?(offset) + offset.nil? || offset.is_a?(Fixnum) || offset == :start || offset == :end + end + def require_values_at(opts, *args) args.map do |a| raise "Please provide :#{a} option!" unless opts[a] diff --git a/lib/hermann/errors.rb b/lib/hermann/errors.rb index 7a49014..6cba1c3 100644 --- a/lib/hermann/errors.rb +++ b/lib/hermann/errors.rb @@ -24,6 +24,9 @@ class ConfigurationError < GeneralError; end # cannot discover brokers from zookeeper class NoBrokersError < GeneralError; end + + # offsets can only be two symbols or a fixnum + class InvalidOffsetError < GeneralError; end end end diff --git a/scripts/consume_msgs_loop_localhost_mri.rb b/scripts/consume_msgs_loop_localhost_mri.rb new file mode 100644 index 0000000..6af5341 --- /dev/null +++ b/scripts/consume_msgs_loop_localhost_mri.rb @@ -0,0 +1,25 @@ +require 'rubygems' + +$LOAD_PATH << File.dirname(__FILE__) + "/../lib" +$LOAD_PATH << File.dirname(__FILE__) + "/../ext" +require 'hermann' +require 'hermann/consumer' + +t1 = 0 +threads = [] +100.times do |i| + threads << Thread.new do + puts "booting #{i}" + c = Hermann::Consumer.new( "maxwell", brokers: "localhost:9092", partition: i, offset: :start) + c.consume() do + |msg| puts("Received: #{msg}") + if(t1 == 0) + t1 = Time.now + end + t2 = Time.now + elapsed = t2 - t1 + puts("Total elapsed time: #{elapsed} seconds") + end + end +end +threads.each(&:join) diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index ee522b1..e57723f 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -1,5 +1,6 @@ require 'spec_helper' require 'hermann/consumer' +require 'hermann/errors' # XXX: Hermann::Consumer isn't really supported anywhere, MRI included right # now @@ -9,7 +10,8 @@ let(:topic) { 'rspec' } let(:brokers) { 'localhost:1337' } let(:partition) { 1 } - let(:opts) { { :brokers => brokers, :partition => partition } } + let(:offset) { nil } + let(:opts) { { :brokers => brokers, :partition => partition, :offset => offset } } context "on C ruby", :platform => :mri do @@ -36,6 +38,13 @@ let(:topic) { '' } it_behaves_like 'an error condition' end + + context 'with a bad offset' do + let(:offset) { :foo } + it "raises an InvalidOffset error" do + expect { subject }.to raise_error(Hermann::Errors::InvalidOffsetError) + end + end end describe '#shutdown' do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 33d6ab7..e509ee0 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -13,7 +13,7 @@ c.formatter = :documentation shared_context 'integration test context', :type => :integration do - let(:topic) { $integrationconf['kafka']['topic'] } + let(:topic) { "hermann_testing_" + rand(100_000_000).to_s } let(:brokers) { $integrationconf['kafka']['brokers'] } let(:zookeepers) { $integrationconf['zookeepers'] } end