Class: Combi::QueueService

Inherits:
Object
  • Object
show all
Defined in:
lib/combi/queue_service.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, options) ⇒ QueueService

Returns a new instance of QueueService.



9
10
11
12
13
14
# File 'lib/combi/queue_service.rb', line 9

def initialize(config, options)
  @config = config
  @options = options
  @rpc_queue = nil
  @ready_defer = EventMachine::DefaultDeferrable.new
end

Instance Attribute Details

#rpc_callbackObject

Returns the value of attribute rpc_callback.



7
8
9
# File 'lib/combi/queue_service.rb', line 7

def rpc_callback
  @rpc_callback
end

Instance Method Details

#acknowledge(delivery_info) ⇒ Object



59
60
61
# File 'lib/combi/queue_service.rb', line 59

def acknowledge(delivery_info)
  @channel.acknowledge(delivery_info.delivery_tag, false)
end

#call(kind, message, options = {}) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/combi/queue_service.rb', line 83

def call(kind, message, options = {})
  log "sending request #{kind} #{message.inspect[0..500]} with options #{options.inspect}"
  raise "RPC is not enabled or reply_to is not included" if (@rpc_queue.nil? || @rpc_queue.name.nil?) && options[:reply_to].nil?
  options[:timeout] ||= RPC_DEFAULT_TIMEOUT
  options[:routing_key] ||= 'rcalls_queue'
  options[:reply_to] ||= @rpc_queue.name
  request = {
    kind: kind,
    payload: message,
    options: {}
  }
  publish(request.to_json, options)
end

#connect(config, &after_connect) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/combi/queue_service.rb', line 36

def connect(config, &after_connect)
  @amqp_conn = AMQP.connect(config) do |connection, open_ok|
    @channel = AMQP::Channel.new @amqp_conn
    @channel.auto_recovery = true
    @exchange = @channel.direct ''
    after_connect.call
  end
end

#create_rpc_queueObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/combi/queue_service.rb', line 68

def create_rpc_queue
  @rpc_queue.unsubscribe unless @rpc_queue.nil?
  @rpc_queue = queue('', exclusive: true, auto_delete: true) do |rpc_queue|
    log "\tRPC QUEUE: #{@rpc_queue.name}"
    rpc_queue.subscribe do |, response|
      message = {
        'correlation_id' => .correlation_id,
        'response' => response
      }
      rpc_callback.call(message) unless rpc_callback.nil?
    end
    @ready_defer.succeed
  end
end

#disconnectObject



45
46
47
# File 'lib/combi/queue_service.rb', line 45

def disconnect
  @amqp_conn.close
end

#log(message) ⇒ Object



20
21
22
23
# File 'lib/combi/queue_service.rb', line 20

def log(message)
  return unless @debug_mode ||= ENV['DEBUG'] == 'true'
  puts "#{Time.now.to_f} #{self.class.name} #{message}"
end

#publish(*args, &block) ⇒ Object



49
50
51
52
53
# File 'lib/combi/queue_service.rb', line 49

def publish(*args, &block)
  @exchange.publish *args do
    block.call if block_given?
  end
end

#queue(name, options = {}, &block) ⇒ Object



55
56
57
# File 'lib/combi/queue_service.rb', line 55

def queue(name, options = {}, &block)
  @channel.queue(name, options, &block)
end

#ready(&block) ⇒ Object



16
17
18
# File 'lib/combi/queue_service.rb', line 16

def ready(&block)
  @ready_defer.callback &block
end

#respond(response, delivery_info) ⇒ Object



63
64
65
66
# File 'lib/combi/queue_service.rb', line 63

def respond(response, delivery_info)
  response = response.call if response.respond_to? :call
  publish response, routing_key: delivery_info.reply_to, correlation_id: delivery_info.correlation_id
end

#startObject



25
26
27
28
29
30
31
32
33
34
# File 'lib/combi/queue_service.rb', line 25

def start
  connect @config do
    if @options[:rpc] == :enabled
      create_rpc_queue
    else
      puts "ready"
      @ready_defer.succeed
    end
  end
end