Class: Fleck::Client

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/fleck/client.rb

Defined Under Namespace

Classes: Request, Response

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_error, #logger

Constructor Details

#initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) ⇒ Client

Returns a new instance of Client.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/fleck/client.rb', line 8

def initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1)
  @connection         = connection
  @queue_name         = queue_name
  @multiple_responses = multiple_responses
  @default_timeout    = multiple_responses ? 60 : nil
  @concurrency        = [concurrency.to_i, 1].max
  @requests           = ThreadSafe::Hash.new
  @subscriptions      = ThreadSafe::Array.new
  @terminated         = false
  @mutex              = Mutex.new
  @local_ip           = @connection.transport.socket.local_address.ip_address
  @remote_ip          = @connection.transport.socket.remote_address.ip_address

  @channel     = @connection.create_channel
  @exchange    = Bunny::Exchange.new(@channel, :direct, 'fleck')
  @publisher   = Bunny::Exchange.new(@connection.create_channel, exchange_type, exchange_name)
  @reply_queue = @channel.queue("", exclusive: true, auto_delete: true)
  @reply_queue.bind(@exchange, routing_key: @reply_queue.name)

  handle_returned_messages!
  @concurrency.times { handle_responses! }

  logger.debug("Client initialized!")

  at_exit do
    terminate
  end
end

Instance Attribute Details

#local_ipObject (readonly)

Returns the value of attribute local_ip.



6
7
8
# File 'lib/fleck/client.rb', line 6

def local_ip
  @local_ip
end

#remote_ipObject (readonly)

Returns the value of attribute remote_ip.



6
7
8
# File 'lib/fleck/client.rb', line 6

def remote_ip
  @remote_ip
end

Instance Method Details

#publish(data, options) ⇒ Object



62
63
64
65
# File 'lib/fleck/client.rb', line 62

def publish(data, options)
  return if @terminated
  @mutex.synchronize { @publisher.publish(data, options) }
end

#remove_request(request_id) ⇒ Object



68
69
70
# File 'lib/fleck/client.rb', line 68

def remove_request(request_id)
  @requests.delete request_id
end

#request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fleck/client.rb', line 37

def request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block)

  if @terminated
    return Fleck::Client::Response.new(Oj.dump({status: 503, errors: ['Service Unavailable'], body: nil} , mode: :compat))
  end

  request = Fleck::Client::Request.new(
    self, queue, @reply_queue.name,
    action:             action,
    version:            version,
    headers:            headers,
    params:             params,
    timeout:            timeout,
    multiple_responses: @multiple_responses,
    rmq_options:        rmq_options,
    &block
  )

  @requests[request.id] = request
  request.send!(async)

  return request.response
end

#terminateObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fleck/client.rb', line 73

def terminate
  @terminated = true
  logger.info "Unsubscribing from #{@reply_queue.name}"
  @subscriptions.map(&:cancel) # stop receiving new messages
  logger.info "Canceling pending requests"
  # cancel pending requests
  while item = @requests.shift do
    begin
      item[1].cancel!
    rescue => e
      logger.error e.inspect + "\n" + e.backtrace.join("\n")
    end
  end
end