Skip to content

Commit

Permalink
Allow to set connection options on a per server basis (#107)
Browse files Browse the repository at this point in the history
Adds a set of tests that verifies that per server
connection options are used also for http api requests.

* Overhaul API requests in queue_properties

This introduced bigger changes in the queue_properties implementation.
We change the way the requests are build in order to be able to use per server
settings reliably.

Most notably we always derive the api port from the server's port, by prepending
a 1.

Breaking Change:
================

Configuration.api_port has been removed without replacement.
Version has been bumped accordingly.

---------

Co-authored-by: danylo.goncharov <[email protected]>
Co-authored-by: Daniel Goncharov <[email protected]>
Co-authored-by: Pia Alina Bregulla <[email protected]>
  • Loading branch information
4 people authored Aug 15, 2024
1 parent 5a7e300 commit 613108f
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 63 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: build

on:
push:
branches: [ master ]
branches: [ master, v4.x ]
pull_request:
branches: [ master ]
branches: [ master, v4.x ]

jobs:
test:
Expand Down Expand Up @@ -33,7 +33,7 @@ jobs:
run: sudo apt-get install redis

- name: Start required services
run: docker-compose up -d
run: docker compose up -d

- name: Install gems
run: bundle install && bundle exec appraisal install
Expand All @@ -53,5 +53,5 @@ jobs:
BUNDLE_GEMFILE: gemfiles/redis_${{ matrix.redis-version }}.gemfile

- name: Stop services
run: docker-compose down
run: docker compose down
if: always()
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ gemfiles/*
/master-client.txt
/master-server.txt
/redis-master-rcc
/gems
/bundler
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ source 'https://rubygems.org'
gemspec

gem "hiredis-client"

# gem 'bunny', '=0.7.10', :path => "#{ENV['HOME']}/src/bunny"

# Use patched appraisal gem until it is fixed upstream.
Expand Down
5 changes: 5 additions & 0 deletions lib/beetle/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def current_port
@server =~ /:(\d+)$/ ? $1.to_i : 5672
end


def connection_options_for_server(server)
@client.config.connection_options_for_server(server)
end

def set_current_server(s)
@server = s
end
Expand Down
33 changes: 29 additions & 4 deletions lib/beetle/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ class Configuration

# list of amqp servers to use (defaults to <tt>"localhost:5672"</tt>)
attr_accessor :servers

# list of additional amqp servers to use for subscribers (defaults to <tt>""</tt>)
attr_accessor :additional_subscription_servers

# a hash mapping a server name to a hash of connection options for that server or additional subscription server
attr_accessor :server_connection_options

# the virtual host to use on the AMQP servers (defaults to <tt>"/"</tt>)
attr_accessor :vhost
# the AMQP user to use when connecting to the AMQP servers (defaults to <tt>"guest"</tt>)
Expand Down Expand Up @@ -118,9 +123,6 @@ class Configuration
# Write timeout for http requests to RabbitMQ HTTP API
attr_accessor :rabbitmq_api_write_timeout

# Returns the port on which the Rabbit API is hosted
attr_accessor :api_port

# the socket timeout in seconds for message publishing (defaults to <tt>0</tt>).
# consider this a highly experimental feature for now.
attr_accessor :publishing_timeout
Expand Down Expand Up @@ -177,11 +179,11 @@ def initialize #:nodoc:
self.redis_configuration_client_ids = ""

self.servers = "localhost:5672"
self.server_connection_options = {}
self.additional_subscription_servers = ""
self.vhost = "/"
self.user = "guest"
self.password = "guest"
self.api_port = 15672
self.frame_max = 131072
self.channel_max = 2047
self.prefetch_count = 1
Expand Down Expand Up @@ -237,7 +239,30 @@ def redis_options
}
end

# Returns a hash of connection options for the given server.
# If no server specific options are set, it constructs defaults which
# use the global user, password and vhost settings.
def connection_options_for_server(server)
overrides = server_connection_options[server] || {}

default_server_connection_options(server).merge(overrides)
end

private

def default_server_connection_options(server)
host, port = server.split(':')
port ||= 5672

{
host: host,
port: port.to_i,
user: user,
pass: password,
vhost: vhost,
}
end

def load_config
raw = ERB.new(IO.read(config_file)).result
hash = if config_file =~ /\.json$/
Expand Down
17 changes: 10 additions & 7 deletions lib/beetle/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,21 @@ def bunny?
end

def new_bunny
options = connection_options_for_server(@server)

b = Bunny.new(
:host => current_host,
:port => current_port,
:logging => !!@options[:logging],
:user => @client.config.user,
:pass => @client.config.password,
:vhost => @client.config.vhost,
:host => options[:host],
:port => options[:port],
:user => options[:user],
:pass => options[:pass],
:vhost => options[:vhost],
:ssl => options[:ssl],
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:socket_timeout => @client.config.publishing_timeout,
:connect_timeout => @client.config.publisher_connect_timeout,
:spec => '09')
:spec => '09',
:logging => !!@options[:logging])
b.start
b
end
Expand Down
62 changes: 26 additions & 36 deletions lib/beetle/queue_properties.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ def set_queue_policy!(server, queue_name, options={})

# no need to worry that the server has the port 5672. Net:HTTP will take care of this. See below.
policy_name = "#{queue_name}_policy"
request_url = URI("http://#{server}/api/policies/#{vhost}/#{policy_name}")
get_request = Net::HTTP::Get.new(request_url)
put_request = Net::HTTP::Put.new(request_url)
delete_request = Net::HTTP::Delete.new(request_url)
request_path = "/api/policies/#{vhost}/#{policy_name}"

# set up queue policy
definition = {}
Expand All @@ -70,29 +67,24 @@ def set_queue_policy!(server, queue_name, options={})

is_default_policy = definition == config.broker_default_policy

get_response = run_rabbit_http_request(request_url, get_request) do |http|
http.request(get_request, nil)
end
get_response = run_api_request(server, Net::HTTP::Get, request_path)

case get_response.code
when "200"
response_body = JSON.parse(get_response.body) rescue {}
same_policy = put_request_body.all? { |k,v| response_body[k] == v }
if same_policy
if is_default_policy
run_rabbit_http_request(request_url, delete_request) do |http|
http.request(get_request, nil)
end
run_api_request(server, Net::HTTP::Delete, request_path)
end

return :ok
end
when "404"
return :ok if is_default_policy
end

put_response = run_rabbit_http_request(request_url, put_request) do |http|
http.request(put_request, put_request_body.to_json)
end
put_response = run_api_request(server, Net::HTTP::Put, request_path, put_request_body.to_json)

unless %w(200 201 204).include?(put_response.code)
log_error("Failed to create policy for queue #{queue_name}", put_response)
Expand Down Expand Up @@ -125,12 +117,7 @@ def remove_obsolete_bindings(server, queue_name, bindings)
end

def retrieve_bindings(server, queue_name)
request_url = URI("http://#{server}/api/queues/#{vhost}/#{queue_name}/bindings")
request = Net::HTTP::Get.new(request_url)

response = run_rabbit_http_request(request_url, request) do |http|
http.request(request)
end
response = run_api_request(server, Net::HTTP::Get, "/api/queues/#{vhost}/#{queue_name}/bindings")

unless response.code == "200"
log_error("Failed to retrieve bindings for queue #{queue_name}", response)
Expand All @@ -141,34 +128,37 @@ def retrieve_bindings(server, queue_name)
end

def remove_binding(server, queue_name, exchange, properties_key)
request_url = URI("http://#{server}/api/bindings/#{vhost}/e/#{exchange}/q/#{queue_name}/#{properties_key}")
request = Net::HTTP::Delete.new(request_url)

response = run_rabbit_http_request(request_url, request) do |http|
http.request(request)
end
response = run_api_request(server, Net::HTTP::Delete, "/api/bindings/#{vhost}/e/#{exchange}/q/#{queue_name}/#{properties_key}")

unless %w(200 201 204).include?(response.code)
log_error("Failed to remove obsolete binding for queue #{queue_name}", response)
raise FailedRabbitRequest.new("Could not retrieve queue bindings")
end
end

def run_rabbit_http_request(uri, request, &block)
request.basic_auth(config.user, config.password)
case request.class::METHOD
when 'GET'
request["Accept"] = "application/json"
when 'PUT'
request["Content-Type"] = "application/json"
def run_api_request(server, request_const, path, *request_args)
connection_options = config.connection_options_for_server(server)

derived_api_port = "1#{connection_options[:port]}".to_i
request_url = URI("http://#{connection_options[:host]}:#{derived_api_port}#{path}")

request = request_const.new(request_url).tap do |req|
req.basic_auth(connection_options[:user], connection_options[:pass])
case request_const::METHOD
when 'GET'
req["Accept"] = "application/json"
when 'PUT'
req["Content-Type"] = "application/json"
end
end
http = Net::HTTP.new(uri.hostname, config.api_port)

http = Net::HTTP.new(connection_options[:host], derived_api_port)
http.use_ssl = !!connection_options[:ssl]
http.read_timeout = config.rabbitmq_api_read_timeout
http.write_timeout = config.rabbitmq_api_write_timeout if http.respond_to?(:write_timeout=)
# don't do this in production:
# http.set_debug_output(logger.instance_eval{ @logdev.dev })

http.start do |instance|
block.call(instance) if block_given?
instance.request(request, *request_args)
end
end

Expand Down
11 changes: 9 additions & 2 deletions lib/beetle/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def initialize(client, options = {}) #:nodoc:
def listen_queues(queues) #:nodoc:
@listened_queues = queues
@exchanges_for_queues = exchanges_for_queues(queues)

EM.run do
each_server_sorted_randomly do
connect_server connection_settings
Expand Down Expand Up @@ -229,9 +230,15 @@ def bind_queue!(queue, exchange_name, binding_options)
end

def connection_settings
options = connection_options_for_server(@server)
{
:host => current_host, :port => current_port, :logging => false,
:user => @client.config.user, :pass => @client.config.password, :vhost => @client.config.vhost,
:host => options[:host],
:port => options[:port],
:user => options[:user],
:pass => options[:pass],
:vhost => options[:vhost],
:ssl => options[:ssl],
:logging => false,
:on_tcp_connection_failure => on_tcp_connection_failure,
:on_possible_authentication_failure => on_possible_authentication_failure,
}
Expand Down
2 changes: 1 addition & 1 deletion lib/beetle/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Beetle
VERSION = "3.5.7"
VERSION = "4.0.0"
end
1 change: 0 additions & 1 deletion test/beetle/base_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,5 @@ def setup
@client.expects(:update_queue_properties!).with(options.merge(:server => "localhost:5672"))
@bs.__send__(:publish_policy_options, options)
end

end
end
47 changes: 47 additions & 0 deletions test/beetle/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,51 @@ class ConfigurationTest < Minitest::Test
assert_equal "10.0.0.1:3001", config.additional_subscription_servers
end
end

class ConnectionOptionsForServerTest < Minitest::Test

test "returns the options for the server provided" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = {host: 'localhost', port: 5672, user: "john", pass: "doe", vhost: "test", ssl: 0}

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "john", options[:user]
assert_equal "doe", options[:pass]
assert_equal "test", options[:vhost]
assert_equal 0, options[:ssl]
end
end

test "returns default options if no specific options are set for the server" do
config = Configuration.new
config.servers = 'localhost:5672'

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "guest", options[:user]
assert_equal "guest", options[:pass]
assert_equal "/", options[:vhost]
assert_nil options[:ssl]
end
end

test "allows to set specific options while retaining defaults for the rest" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = { pass: "another_pass", ssl: 1 }

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "guest", options[:user]
assert_equal "another_pass", options[:pass]
assert_equal "/", options[:vhost]
assert_equal 1, options[:ssl]
end
end
end
end
29 changes: 29 additions & 0 deletions test/beetle/publisher_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def setup
:user => "guest",
:pass => "guest",
:vhost => "/",
:ssl => nil,
:socket_timeout => 0,
:connect_timeout => 5,
:frame_max => 131072,
Expand All @@ -34,6 +35,34 @@ def setup
assert_equal m, @pub.send(:new_bunny)
end

test "new bunnies should be created using custom connection options and they should be started" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = { user: "john", pass: "doe", vhost: "test", ssl: 0 }
client = Client.new(config)
pub = Publisher.new(client)

bunny_mock = mock("dummy_bunny")
expected_bunny_options = {
host: "localhost",
port: 5672,
user: "john",
pass: "doe",
vhost: "test",
ssl: 0,
socket_timeout: 0,
connect_timeout: 5,
frame_max: 131072,
channel_max: 2047,
spec: '09',
logging: false
}

Bunny.expects(:new).with(expected_bunny_options).returns(bunny_mock)
bunny_mock.expects(:start)
assert_equal bunny_mock, pub.send(:new_bunny)
end

test "initially there should be no bunnies" do
assert_equal({}, @pub.instance_variable_get("@bunnies"))
end
Expand Down
Loading

0 comments on commit 613108f

Please sign in to comment.