Class: Fleck::Client

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/fleck/client.rb

Defined Under Namespace

Classes: Request, Response

Instance Method Summary collapse

Methods included from Loggable

#logger

Constructor Details

#initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/fleck/client.rb', line 6

def initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1)
  @connection         = connection
  @queue_name         = queue_name
  @multiple_responses = multiple_responses
  @default_timeout    = multiple_responses ? 60 : nil
  @concurrency        = [concurrency.to_i, 1].max
  @requests           = ThreadSafe::Hash.new
  @subscriptions      = ThreadSafe::Array.new
  @terminated         = false
  @mutex              = Mutex.new

  @channel     = @connection.create_channel
  @exchange    = @channel.default_exchange
  @publisher   = Bunny::Exchange.new(@channel, exchange_type, exchange_name)
  @reply_queue = @channel.queue("", exclusive: true, auto_delete: true)

  handle_returned_messages!
  @concurrency.times { handle_responses! }

  logger.debug("Client initialized!")

  at_exit do
    terminate
  end
end

Instance Method Details

#publish(data, options) ⇒ Object



57
58
59
60
# File 'lib/fleck/client.rb', line 57

def publish(data, options)
  return if @terminated
  @mutex.synchronize { @publisher.publish(data, options) }
end

#remove_request(request_id) ⇒ Object



63
64
65
# File 'lib/fleck/client.rb', line 63

def remove_request(request_id)
  @requests.delete request_id
end

#request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fleck/client.rb', line 32

def request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block)

  if @terminated
    return Fleck::Client::Response.new(Oj.dump({status: 503, errors: ['Service Unavailable'], body: nil} , mode: :compat))
  end

  request = Fleck::Client::Request.new(
    self, queue, @reply_queue.name,
    action:             action,
    version:            version,
    headers:            headers,
    params:             params,
    timeout:            timeout,
    multiple_responses: @multiple_responses,
    rmq_options:        rmq_options,
    &block
  )

  @requests[request.id] = request
  request.send!(async)

  return request.response
end

#terminateObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fleck/client.rb', line 68

def terminate
  @terminated = true
  logger.info "Unsubscribing from #{@reply_queue.name}"
  @subscriptions.map(&:cancel) # stop receiving new messages
  logger.info "Canceling pending requests"
  # cancel pending requests
  while item = @requests.shift do
    begin
      item[1].cancel!
    rescue => e
      logger.error e.inspect + "\n" + e.backtrace.join("\n")
    end
  end
end