Class: Fleck::Client

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/fleck/client.rb

Defined Under Namespace

Classes: Request, Response

Instance Method Summary collapse

Methods included from Loggable

#logger

Constructor Details

#initialize(connection, queue_name) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/fleck/client.rb', line 6

def initialize(connection, queue_name)
  @connection  = connection
  @queue_name  = queue_name
  @channel     = @connection.create_channel
  @exchange    = @channel.default_exchange
  @reply_queue = @channel.queue("", exclusive: true)
  @requests    = ThreadSafe::Hash.new

  @reply_queue.subscribe do |delivery_info, , payload|
    begin
      logger.debug "Response received: #{payload}"
      request = @requests[[:correlation_id]]
      if request
        request.response = Fleck::Client::Response.new(payload)
        request.complete!
        @requests.delete [:correlation_id]
      end
    rescue => e
      logger.error e.inspect + "\n" + e.backtrace.join("\n")
    end
  end

  logger.debug("Client initialized!")

  at_exit do
    terminate
  end
end

Instance Method Details

#request(headers = {}, payload = {}, async = false, &block) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/fleck/client.rb', line 35

def request(headers = {}, payload = {}, async = false, &block)
  request = Fleck::Client::Request.new(@exchange, @queue_name, @reply_queue.name, headers, payload, &block)
  @requests[request.id] = request
  request.send!(async)

  return request.response
end

#terminateObject



43
44
45
46
47
48
49
50
51
# File 'lib/fleck/client.rb', line 43

def terminate
  @requests.each do |id, request|
    begin
      request.complete!
    rescue => e
      logger.error e.inspect + "\n" + e.backtrace.join("\n")
    end
  end
end