Class: Combi::QueueService
- Inherits:
-
Object
- Object
- Combi::QueueService
- Defined in:
- lib/combi/queue_service.rb
Instance Attribute Summary collapse
-
#rpc_callback ⇒ Object
Returns the value of attribute rpc_callback.
Instance Method Summary collapse
- #acknowledge(delivery_info) ⇒ Object
- #connect(config, &after_connect) ⇒ Object
- #create_rpc_queue ⇒ Object
- #disconnect ⇒ Object
-
#initialize(config, options) ⇒ QueueService
constructor
A new instance of QueueService.
- #next_ready_only(&block) ⇒ Object
- #publish(*args, &block) ⇒ Object
- #publish_request(kind, message, options = {}) ⇒ Object
- #queue(name, options = {}, &block) ⇒ Object
- #ready(&block) ⇒ Object
- #reconnect ⇒ Object
- #respond(response, delivery_info) ⇒ Object
- #start ⇒ Object
- #status ⇒ Object
Constructor Details
#initialize(config, options) ⇒ QueueService
Returns a new instance of QueueService.
8 9 10 11 12 13 14 |
# File 'lib/combi/queue_service.rb', line 8 def initialize(config, ) @config = config @options = @rpc_queue = nil @ready_defer = EventMachine::DefaultDeferrable.new @ready_callbacks = [] end |
Instance Attribute Details
#rpc_callback ⇒ Object
Returns the value of attribute rpc_callback.
6 7 8 |
# File 'lib/combi/queue_service.rb', line 6 def rpc_callback @rpc_callback end |
Instance Method Details
#acknowledge(delivery_info) ⇒ Object
86 87 88 |
# File 'lib/combi/queue_service.rb', line 86 def acknowledge(delivery_info) @channel.acknowledge(delivery_info.delivery_tag, false) end |
#connect(config, &after_connect) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/combi/queue_service.rb', line 43 def connect(config, &after_connect) Combi.logger.info {"trying to connect to queue server"} @amqp_conn = AMQP.connect(config) do |connection, open_ok| @channel = AMQP::Channel.new @amqp_conn @channel.auto_recovery = true @exchange = @channel.direct '' after_connect.call connection.on_error do |conn, connection_close| Combi.logger.info {"[amqp connection.close] Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}"} if connection_close.reply_code == 320 Combi.logger.info {"[amqp connection.close] Setting up a periodic reconnection timer..."} reconnect end end connection.on_tcp_connection_loss do |conn, settings| Combi.logger.error {"Connection failed, resetting for reconnect"} reconnect end end end |
#create_rpc_queue ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/combi/queue_service.rb', line 95 def create_rpc_queue @rpc_queue.unsubscribe unless @rpc_queue.nil? @rpc_queue = queue('', exclusive: true, auto_delete: true) do |rpc_queue| Combi.logger.debug {"\tRPC QUEUE: #{@rpc_queue.name}"} rpc_queue.subscribe do |, response| parsed_response = Yajl::Parser.parse response, symbolize_keys: true = { correlation_id: .correlation_id, response: parsed_response } rpc_callback.call() unless rpc_callback.nil? end @ready_defer.succeed end end |
#disconnect ⇒ Object
72 73 74 |
# File 'lib/combi/queue_service.rb', line 72 def disconnect @amqp_conn.close end |
#next_ready_only(&block) ⇒ Object
21 22 23 |
# File 'lib/combi/queue_service.rb', line 21 def next_ready_only(&block) @ready_defer.callback &block end |
#publish(*args, &block) ⇒ Object
76 77 78 79 80 |
# File 'lib/combi/queue_service.rb', line 76 def publish(*args, &block) @exchange.publish *args do block.call if block_given? end end |
#publish_request(kind, message, options = {}) ⇒ Object
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/combi/queue_service.rb', line 111 def publish_request(kind, , = {}) if .has_key? :correlation_id # wants a response [:reply_to] = @rpc_queue.name end [:expiration] = (([:timeout] || RPC_DEFAULT_TIMEOUT) * 1000).to_i Combi.logger.debug {"sending request #{kind} #{.inspect[0..500]} with options #{.inspect}"} request = Yajl::Encoder.encode kind: kind, payload: , options: {} publish request, end |
#queue(name, options = {}, &block) ⇒ Object
82 83 84 |
# File 'lib/combi/queue_service.rb', line 82 def queue(name, = {}, &block) @channel.queue(name, , &block) end |
#ready(&block) ⇒ Object
16 17 18 19 |
# File 'lib/combi/queue_service.rb', line 16 def ready(&block) @ready_callbacks << block @ready_defer.callback &block end |
#reconnect ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/combi/queue_service.rb', line 64 def reconnect @ready_defer = EventMachine::DefaultDeferrable.new @ready_callbacks.each do |callback| @ready_defer.callback &callback end start end |
#respond(response, delivery_info) ⇒ Object
90 91 92 93 |
# File 'lib/combi/queue_service.rb', line 90 def respond(response, delivery_info) serialized = Yajl::Encoder.encode response publish serialized, routing_key: delivery_info.reply_to, correlation_id: delivery_info.correlation_id end |
#start ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/combi/queue_service.rb', line 29 def start @config[:reconnect_period] ||= 4 reconnection_proc = Proc.new { EM.add_timer(@config[:reconnect_period] * rand) { start } } @config[:on_tcp_connection_failure] = reconnection_proc @config[:on_possible_authentication_failure] = reconnection_proc connect @config do if @options[:rpc] == :enabled create_rpc_queue else @ready_defer.succeed end end end |
#status ⇒ Object
25 26 27 |
# File 'lib/combi/queue_service.rb', line 25 def status @amqp_conn && @amqp_conn.status end |