Skip to content

Ruby examples are ready #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 18, 2011
28 changes: 28 additions & 0 deletions ruby/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Ruby code for RabbitMQ tutorials

Here you can find Ruby code examples from [RabbitMQ tutorials](http://www.rabbitmq.com/getstarted.html).

## Requirements

To run this code you need [amqp gem](http://bit.ly/itcpVv) version 0.8 (currently available as a prerelease version). This code won't work with earlier versions! You can install it via RubyGems thusly:

gem install amqp --pre

## Code

[Tutorial one: "Hello World!"](http://www.rabbitmq.com/tutorial-one-ruby.html):

ruby send.rb
ruby receive.rb

[Tutorial two: Work Queues](http://www.rabbitmq.com/tutorial-two-ruby.html):

ruby new_task.rb
ruby worker.rb

[Tutorial three: Publish/Subscribe](http://www.rabbitmq.com/tutorial-three-ruby.html)

ruby receive_logs.rb
ruby emit_log.rb

To learn more, visit [Ruby AMQP gem documentation](http://bit.ly/mDm1JE) site.
19 changes: 19 additions & 0 deletions ruby/emit_log.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("logs")
message = ARGV.empty? ? "info: Hello World!" : ARGV.join(" ")

exchange.publish(message)
puts " [x] Sent #{message}"

EM.add_timer(0.5) do
connection.close do
EM.stop { exit }
end
end
end
20 changes: 20 additions & 0 deletions ruby/emit_log_direct.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.direct("direct_logs")
severity = ARGV.shift || "info"
message = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

exchange.publish(message, :routing_key => severity)
puts " [x] Sent #{severity}:#{message}"

EM.add_timer(0.5) do
connection.close do
EM.stop { exit }
end
end
end
20 changes: 20 additions & 0 deletions ruby/emit_log_topic.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("topic_logs")
severity = ARGV.shift || "anonymous.info"
message = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

exchange.publish(message, :routing_key => severity)
puts " [x] Sent #{severity}:#{message}"

EM.add_timer(0.5) do
connection.close do
EM.stop { exit }
end
end
end
19 changes: 19 additions & 0 deletions ruby/new_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
queue = channel.queue("task_queue", :durable => true)
message = ARGV.empty? ? "Hello World!" : ARGV.join(" ")

AMQP::Exchange.default.publish(message, :routing_key => queue.name, :persistent => true)
puts " [x] Sent #{message}"

EM.add_timer(0.5) do
connection.close do
EM.stop { exit }
end
end
end
21 changes: 21 additions & 0 deletions ruby/receive.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
queue = channel.queue("hello")

Signal.trap("INT") do
connection.close do
EM.stop { exit }
end
end

puts " [*] Waiting for messages. To exit press CTRL+C"

queue.subscribe do |body|
puts " [x] Received #{body}"
end
end
24 changes: 24 additions & 0 deletions ruby/receive_logs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.fanout("logs")
queue = channel.queue("", :exclusive => true)

queue.bind(exchange)

Signal.trap("INT") do
connection.close do
EM.stop { exit }
end
end

puts " [*] Waiting for logs. To exit press CTRL+C"

queue.subscribe do |body|
puts " [x] #{body}"
end
end
30 changes: 30 additions & 0 deletions ruby/receive_logs_direct.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.direct("direct_logs")
queue = channel.queue("", :exclusive => true)

if ARGV.empty?
abort "Usage: #{$0} [info] [warning] [error]"
end

ARGV.each do |severity|
queue.bind(exchange, :routing_key => severity)
end

Signal.trap("INT") do
connection.close do
EM.stop { exit }
end
end

puts " [*] Waiting for logs. To exit press CTRL+C"

queue.subscribe do |header, body|
puts " [x] #{header.routing_key}:#{body}"
end
end
30 changes: 30 additions & 0 deletions ruby/receive_logs_topic.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
exchange = channel.topic("topic_logs")
queue = channel.queue("", :exclusive => true)

if ARGV.empty?
abort "Usage: #{$0} [binding key]"
end

ARGV.each do |binding_key|
queue.bind(exchange, :routing_key => binding_key)
end

Signal.trap("INT") do
connection.close do
EM.stop { exit }
end
end

puts " [*] Waiting for logs. To exit press CTRL+C"

queue.subscribe do |header, body|
puts " [x] #{header.routing_key}:#{body}"
end
end
66 changes: 66 additions & 0 deletions ruby/rpc_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env ruby
# encoding: utf-8

# Note: This is just proof of concept. For
# real-world usage, you are strongly advised
# to use https://github.com/ruby-amqp/rpc
# or some other RPC library.

require "amqp"

class FibonacciRpcClient
def initialize
subscribe_to_callback_queue
end

def connection
@connection ||= AMQP.connect(:host => "localhost")
end

def channel
@channel ||= AMQP::Channel.new(self.connection)
end

def callback_queue
@callback_queue ||= self.channel.queue("", :exclusive => true)
end

def requests
@requests ||= Hash.new
end

def call(n, &block)
corr_id = rand(10_000_000).to_s
self.requests[corr_id] = nil
self.callback_queue.append_callback(:declare) do
AMQP::Exchange.default.publish(n.to_s, :routing_key => "rpc_queue", :reply_to => self.callback_queue.name, :correlation_id => corr_id)

EM.add_periodic_timer(0.1) do
# p self.requests
if result = self.requests[corr_id]
block.call(result.to_i)
EM.stop
end
end
end
end

private
def subscribe_to_callback_queue
self.callback_queue.subscribe do |header, body|
corr_id = header.correlation_id
unless self.requests[corr_id]
self.requests[corr_id] = body
end
end
end
end

EM.run do
fibonacci_rpc = FibonacciRpcClient.new()

puts " [x] Requesting fib(30)"
fibonacci_rpc.call(30) do |response|
puts " [.] Got #{response}"
end
end
28 changes: 28 additions & 0 deletions ruby/rpc_server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

def fib(n)
return n if n == 0 || n == 1
return fib(n - 1) + fib(n - 2)
end

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
queue = channel.queue("rpc_queue")

channel.prefetch(1)

queue.subscribe(:ack => true) do |header, body|
n = body.to_i

puts " [.] fib(#{n})"
response = fib(n)

AMQP::Exchange.default.publish(response.to_s, :routing_key => header.reply_to, :correlation_id => header.correlation_id)
header.ack
end

puts " [x] Awaiting RPC requests"
end
18 changes: 18 additions & 0 deletions ruby/send.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
queue = channel.queue("hello")

channel.default_exchange.publish("Hello World!", :routing_key => queue.name)
puts " [x] Sent 'Hello World!'"

EM.add_timer(0.5) do
connection.close do
EM.stop { exit }
end
end
end
26 changes: 26 additions & 0 deletions ruby/worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env ruby
# encoding: utf-8

require "amqp"

AMQP.start(:host => "localhost") do |connection|
channel = AMQP::Channel.new(connection)
queue = channel.queue("task_queue", :durable => true)

Signal.trap("INT") do
connection.close do
EM.stop { exit }
end
end

puts " [*] Waiting for messages. To exit press CTRL+C"

channel.prefetch(1)
queue.subscribe(:ack => true) do |header, body|
puts " [x] Received #{body}"
EM.add_timer(body.count(".")) do
puts " [x] Done"
header.ack
end
end
end