Class: Combi::QueueService
- Inherits:
-
Object
- Object
- Combi::QueueService
- Defined in:
- lib/combi/queue_service.rb
Constant Summary collapse
- RPC_DEFAULT_TIMEOUT =
1- RPC_WAIT_PERIOD =
0.01
Instance Attribute Summary collapse
-
#rpc_callback ⇒ Object
Returns the value of attribute rpc_callback.
Instance Method Summary collapse
- #acknowledge(delivery_info) ⇒ Object
- #call(kind, message, options = {}) ⇒ Object
- #connect(config, &after_connect) ⇒ Object
- #create_rpc_queue ⇒ Object
- #disconnect ⇒ Object
-
#initialize(config, options) ⇒ QueueService
constructor
A new instance of QueueService.
- #log(message) ⇒ Object
- #publish(*args, &block) ⇒ Object
- #queue(name, options = {}, &block) ⇒ Object
- #ready(&block) ⇒ Object
- #respond(response, delivery_info) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(config, options) ⇒ QueueService
Returns a new instance of QueueService.
12 13 14 15 16 17 |
# File 'lib/combi/queue_service.rb', line 12 def initialize(config, ) @config = config @options = @rpc_queue = nil @ready_defer = EventMachine::DefaultDeferrable.new end |
Instance Attribute Details
#rpc_callback ⇒ Object
Returns the value of attribute rpc_callback.
10 11 12 |
# File 'lib/combi/queue_service.rb', line 10 def rpc_callback @rpc_callback end |
Instance Method Details
#acknowledge(delivery_info) ⇒ Object
63 64 65 |
# File 'lib/combi/queue_service.rb', line 63 def acknowledge(delivery_info) @channel.acknowledge(delivery_info.delivery_tag, false) end |
#call(kind, message, options = {}) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/combi/queue_service.rb', line 87 def call(kind, , = {}) log "sending request #{kind} #{.inspect} with options #{.inspect}" raise "RPC is not enabled or reply_to is not included" if (@rpc_queue.nil? || @rpc_queue.name.nil?) && [:reply_to].nil? [:timeout] ||= RPC_DEFAULT_TIMEOUT [:routing_key] ||= 'rcalls_queue' [:reply_to] ||= @rpc_queue.name request = { kind: kind, payload: , options: {} } publish(request, ) end |
#connect(config, &after_connect) ⇒ Object
39 40 41 42 43 44 45 46 |
# File 'lib/combi/queue_service.rb', line 39 def connect(config, &after_connect) @amqp_conn = AMQP.connect(config) do |connection, open_ok| @channel = AMQP::Channel.new @amqp_conn @channel.auto_recovery = true @exchange = @channel.direct '' after_connect.call end end |
#create_rpc_queue ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/combi/queue_service.rb', line 72 def create_rpc_queue @rpc_queue.unsubscribe unless @rpc_queue.nil? @rpc_queue = queue('', exclusive: true, auto_delete: true) do |rpc_queue| log "\tRPC QUEUE: #{@rpc_queue.name}" rpc_queue.subscribe do |, response| = { 'correlation_id' => .correlation_id, 'response' => response } rpc_callback.call() unless rpc_callback.nil? end @ready_defer.succeed end end |
#disconnect ⇒ Object
48 49 50 51 52 |
# File 'lib/combi/queue_service.rb', line 48 def disconnect @amqp_conn.close do puts "disconnected from RABBIT" end end |
#log(message) ⇒ Object
23 24 25 26 |
# File 'lib/combi/queue_service.rb', line 23 def log() return unless @debug_mode ||= ENV['DEBUG'] == 'true' puts "#{object_id} #{self.class.name} #{}" end |
#publish(*args, &block) ⇒ Object
54 55 56 57 |
# File 'lib/combi/queue_service.rb', line 54 def publish(*args, &block) args[0] = args[0].to_json unless args[0].is_a? String @exchange.publish *args, &block end |
#queue(name, options = {}, &block) ⇒ Object
59 60 61 |
# File 'lib/combi/queue_service.rb', line 59 def queue(name, = {}, &block) @channel.queue(name, , &block) end |
#ready(&block) ⇒ Object
19 20 21 |
# File 'lib/combi/queue_service.rb', line 19 def ready(&block) @ready_defer.callback &block end |
#respond(response, delivery_info) ⇒ Object
67 68 69 70 |
# File 'lib/combi/queue_service.rb', line 67 def respond(response, delivery_info) response = response.call if response.respond_to? :call publish response, routing_key: delivery_info.reply_to, correlation_id: delivery_info.correlation_id end |
#start ⇒ Object
28 29 30 31 32 33 34 35 36 37 |
# File 'lib/combi/queue_service.rb', line 28 def start connect @config do if @options[:rpc] == :enabled create_rpc_queue else puts "ready" @ready_defer.succeed end end end |