Class: Combi::QueueService

Inherits:
Object
  • Object
show all
Defined in:
lib/combi/queue_service.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, options) ⇒ QueueService

Returns a new instance of QueueService.



8
9
10
11
12
13
14
# File 'lib/combi/queue_service.rb', line 8

def initialize(config, options)
  @config = config
  @options = options
  @rpc_queue = nil
  @ready_defer = EventMachine::DefaultDeferrable.new
  @ready_callbacks = []
end

Instance Attribute Details

#rpc_callbackObject

Returns the value of attribute rpc_callback.



6
7
8
# File 'lib/combi/queue_service.rb', line 6

def rpc_callback
  @rpc_callback
end

Instance Method Details

#acknowledge(delivery_info) ⇒ Object



86
87
88
# File 'lib/combi/queue_service.rb', line 86

def acknowledge(delivery_info)
  @channel.acknowledge(delivery_info.delivery_tag, false)
end

#connect(config, &after_connect) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/combi/queue_service.rb', line 43

def connect(config, &after_connect)
  Combi.logger.info {"trying to connect to queue server"}
  @amqp_conn = AMQP.connect(config) do |connection, open_ok|
    @channel = AMQP::Channel.new @amqp_conn
    @channel.auto_recovery = true
    @exchange = @channel.direct ''
    after_connect.call
    connection.on_error do |conn, connection_close|
      Combi.logger.info {"[amqp connection.close] Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}"}
      if connection_close.reply_code == 320
        Combi.logger.info {"[amqp connection.close] Setting up a periodic reconnection timer..."}
        reconnect
      end
    end
    connection.on_tcp_connection_loss do |conn, settings|
      Combi.logger.error {"Connection failed, resetting for reconnect"}
      reconnect
    end
  end
end

#create_rpc_queueObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/combi/queue_service.rb', line 95

def create_rpc_queue
  @rpc_queue.unsubscribe unless @rpc_queue.nil?
  @rpc_queue = queue('', exclusive: true, auto_delete: true) do |rpc_queue|
    Combi.logger.debug {"\tRPC QUEUE: #{@rpc_queue.name}"}
    rpc_queue.subscribe do |, response|
      parsed_response = Yajl::Parser.parse response, symbolize_keys: true
      message = {
        correlation_id: .correlation_id,
        response: parsed_response
      }
      rpc_callback.call(message) unless rpc_callback.nil?
    end
    @ready_defer.succeed
  end
end

#disconnectObject



72
73
74
# File 'lib/combi/queue_service.rb', line 72

def disconnect
  @amqp_conn.close
end

#next_ready_only(&block) ⇒ Object



21
22
23
# File 'lib/combi/queue_service.rb', line 21

def next_ready_only(&block)
  @ready_defer.callback &block
end

#publish(*args, &block) ⇒ Object



76
77
78
79
80
# File 'lib/combi/queue_service.rb', line 76

def publish(*args, &block)
  @exchange.publish *args do
    block.call if block_given?
  end
end

#publish_request(kind, message, options = {}) ⇒ Object



111
112
113
114
115
116
117
118
119
120
# File 'lib/combi/queue_service.rb', line 111

def publish_request(kind, message, options = {})
  if options.has_key? :correlation_id
    # wants a response
    options[:reply_to] = @rpc_queue.name
  end
  options[:expiration] = ((options[:timeout] || RPC_DEFAULT_TIMEOUT) * 1000).to_i
  Combi.logger.debug {"sending request #{kind} #{message.inspect[0..500]} with options #{options.inspect}"}
  request = Yajl::Encoder.encode kind: kind, payload: message, options: {}
  publish request, options
end

#queue(name, options = {}, &block) ⇒ Object



82
83
84
# File 'lib/combi/queue_service.rb', line 82

def queue(name, options = {}, &block)
  @channel.queue(name, options, &block)
end

#ready(&block) ⇒ Object



16
17
18
19
# File 'lib/combi/queue_service.rb', line 16

def ready(&block)
  @ready_callbacks << block
  @ready_defer.callback &block
end

#reconnectObject



64
65
66
67
68
69
70
# File 'lib/combi/queue_service.rb', line 64

def reconnect
  @ready_defer = EventMachine::DefaultDeferrable.new
  @ready_callbacks.each do |callback|
    @ready_defer.callback &callback
  end
  start
end

#respond(response, delivery_info) ⇒ Object



90
91
92
93
# File 'lib/combi/queue_service.rb', line 90

def respond(response, delivery_info)
  serialized = Yajl::Encoder.encode response
  publish serialized, routing_key: delivery_info.reply_to, correlation_id: delivery_info.correlation_id
end

#startObject



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/combi/queue_service.rb', line 29

def start
  @config[:reconnect_period] ||= 4
  reconnection_proc = Proc.new { EM.add_timer(@config[:reconnect_period] * rand) { start } }
  @config[:on_tcp_connection_failure] = reconnection_proc
  @config[:on_possible_authentication_failure] = reconnection_proc
  connect @config do
    if @options[:rpc] == :enabled
      create_rpc_queue
    else
      @ready_defer.succeed
    end
  end
end

#statusObject



25
26
27
# File 'lib/combi/queue_service.rb', line 25

def status
  @amqp_conn && @amqp_conn.status
end