Class: Fleck::Client
Defined Under Namespace
Instance Method Summary collapse
-
#initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) ⇒ Client
constructor
A new instance of Client.
- #publish(data, options) ⇒ Object
- #remove_request(request_id) ⇒ Object
- #request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) ⇒ Object
- #terminate ⇒ Object
Methods included from Loggable
Constructor Details
#initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) ⇒ Client
Returns a new instance of Client.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/fleck/client.rb', line 6 def initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) @connection = connection @queue_name = queue_name @multiple_responses = multiple_responses @default_timeout = multiple_responses ? 60 : nil @concurrency = [concurrency.to_i, 1].max @requests = ThreadSafe::Hash.new @subscriptions = ThreadSafe::Array.new @terminated = false @mutex = Mutex.new @channel = @connection.create_channel @exchange = @channel.default_exchange @publisher = Bunny::Exchange.new(@channel, exchange_type, exchange_name) @reply_queue = @channel.queue("", exclusive: true, auto_delete: true) @concurrency.times { handle_responses! } logger.debug("Client initialized!") at_exit do terminate end end |
Instance Method Details
#publish(data, options) ⇒ Object
57 58 59 60 |
# File 'lib/fleck/client.rb', line 57 def publish(data, ) return if @terminated @mutex.synchronize { @publisher.publish(data, ) } end |
#remove_request(request_id) ⇒ Object
63 64 65 |
# File 'lib/fleck/client.rb', line 63 def remove_request(request_id) @requests.delete request_id end |
#request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fleck/client.rb', line 32 def request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) if @terminated return Fleck::Client::Response.new(Oj.dump({status: 503, errors: ['Service Unavailable'], body: nil} , mode: :compat)) end request = Fleck::Client::Request.new( self, queue, @reply_queue.name, action: action, version: version, headers: headers, params: params, timeout: timeout, multiple_responses: @multiple_responses, rmq_options: , &block ) @requests[request.id] = request request.send!(async) return request.response end |
#terminate ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fleck/client.rb', line 68 def terminate @terminated = true logger.info "Unsubscribing from #{@reply_queue.name}" @subscriptions.map(&:cancel) # stop receiving new messages logger.info "Canceling pending requests" # cancel pending requests while item = @requests.shift do begin item[1].cancel! rescue => e logger.error e.inspect + "\n" + e.backtrace.join("\n") end end end |