Class: Fleck::Client

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/fleck/client.rb

Defined Under Namespace

Classes: Request, Response

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_error, #logger

Constructor Details

#initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1) ⇒ Client

Returns a new instance of Client.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fleck/client.rb', line 22

def initialize(connection, queue_name = "", exchange_type: :direct, exchange_name: "", multiple_responses: false, concurrency: 1)
  @connection         = connection
  @queue_name         = queue_name
  @multiple_responses = multiple_responses
  @default_timeout    = multiple_responses ? 60 : nil
  @concurrency        = [concurrency.to_i, 1].max
  @requests           = ThreadSafe::Hash.new
  @subscriptions      = ThreadSafe::Array.new
  @terminated         = false
  @mutex              = Mutex.new
  @local_ip           = @connection.transport.socket.local_address.ip_address
  @remote_ip          = @connection.transport.socket.remote_address.ip_address

  @channel     = @connection.create_channel
  @exchange    = Bunny::Exchange.new(@channel, :direct, 'fleck')
  @publisher   = Bunny::Exchange.new(@connection.create_channel, exchange_type, exchange_name)
  @reply_queue = @channel.queue("", exclusive: true, auto_delete: true)
  @reply_queue.bind(@exchange, routing_key: @reply_queue.name)

  handle_returned_messages!
  @concurrency.times { handle_responses! }

  logger.debug("Client initialized!")

  Fleck::Client << self

  # at_exit do
  #   terminate
  # end
end

Instance Attribute Details

#local_ipObject (readonly)

Returns the value of attribute local_ip.



20
21
22
# File 'lib/fleck/client.rb', line 20

def local_ip
  @local_ip
end

#remote_ipObject (readonly)

Returns the value of attribute remote_ip.



20
21
22
# File 'lib/fleck/client.rb', line 20

def remote_ip
  @remote_ip
end

#terminatedObject (readonly)

Returns the value of attribute terminated.



20
21
22
# File 'lib/fleck/client.rb', line 20

def terminated
  @terminated
end

Class Method Details

.<<(new_instance) ⇒ Object



8
9
10
# File 'lib/fleck/client.rb', line 8

def self.<<(new_instance)
  @instances << new_instance
end

.remove_instance(instance) ⇒ Object



12
13
14
# File 'lib/fleck/client.rb', line 12

def self.remove_instance(instance)
  @instances.delete(instance)
end

.terminate_allObject



16
17
18
# File 'lib/fleck/client.rb', line 16

def self.terminate_all
  @instances.map(&:terminate)
end

Instance Method Details

#publish(data, options) ⇒ Object



78
79
80
81
# File 'lib/fleck/client.rb', line 78

def publish(data, options)
  return if @terminated
  @mutex.synchronize { @publisher.publish(data, options) }
end

#remove_request(request_id) ⇒ Object



84
85
86
# File 'lib/fleck/client.rb', line 84

def remove_request(request_id)
  @requests.delete request_id
end

#request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fleck/client.rb', line 53

def request(action: nil, version: nil, headers: {}, params: {}, async: @multiple_responses || false, timeout: @default_timeout, queue: @queue_name, rmq_options: {}, &block)

  if @terminated
    return Fleck::Client::Response.new(Oj.dump({status: 503, errors: ['Service Unavailable'], body: nil} , mode: :compat))
  end

  request = Fleck::Client::Request.new(
    self, queue, @reply_queue.name,
    action:             action,
    version:            version,
    headers:            headers,
    params:             params,
    timeout:            timeout,
    multiple_responses: @multiple_responses,
    rmq_options:        rmq_options,
    &block
  )

  @requests[request.id] = request
  request.send!(async)

  return request.response
end

#terminateObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fleck/client.rb', line 89

def terminate
  @terminated = true
  logger.info "Unsubscribing from #{@reply_queue.name}"
  # @subscriptions.map(&:cancel) # stop receiving new messages
  @channel&.close unless @channel&.closed?
  logger.info "Canceling pending requests"
  # cancel pending requests
  while item = @requests.shift do
    begin
      item[1].cancel!
    rescue => e
      logger.error e.inspect + "\n" + e.backtrace.join("\n")
    end
  end

  Fleck::Client.remove_instance(self)
end