Class: Combi::Queue

Inherits:
Bus
  • Object
show all
Defined in:
lib/combi/buses/queue.rb

Constant Summary

Constants inherited from Bus

Bus::RPC_DEFAULT_TIMEOUT

Instance Attribute Summary collapse

Attributes inherited from Bus

#routes

Instance Method Summary collapse

Methods inherited from Bus

#add_service, #enable, #log, #post_initialize, #restart!

Constructor Details

#initialize(options) ⇒ Queue

Returns a new instance of Queue.



9
10
11
12
13
14
15
16
# File 'lib/combi/buses/queue.rb', line 9

def initialize(options)
  super
  @response_store = Combi::ResponseStore.new
  @queue_service = Combi::QueueService.new(options[:amqp_config], rpc: :enabled)
  queue_service.rpc_callback = lambda do |message|
    @response_store.handle_rpc_response(message)
  end
end

Instance Attribute Details

#queue_serviceObject (readonly)

Returns the value of attribute queue_service.



7
8
9
# File 'lib/combi/buses/queue.rb', line 7

def queue_service
  @queue_service
end

Instance Method Details

#add_routes_for(service_name, service_instance) ⇒ Object



28
29
30
31
# File 'lib/combi/buses/queue.rb', line 28

def add_routes_for(service_name, service_instance)
  create_queue_for_service(service_name)
  super
end

#create_queue_for_service(service_name) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/combi/buses/queue.rb', line 33

def create_queue_for_service(service_name)
  log "creating queue #{service_name}"
  queue_options = {}
  subscription_options = {}
  subscription_options[:ack] = true
  queue_service.ready do
    queue_service.queue(service_name.to_s, queue_options) do |queue|
      log "subscribing to queue #{service_name.to_s}"
      queue.subscribe(subscription_options) do |delivery_info, payload|
        process_queue_message service_name, payload, delivery_info
        queue_service.acknowledge delivery_info
      end
    end
  end
end

#process_queue_message(service_name, request, delivery_info) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/combi/buses/queue.rb', line 69

def process_queue_message(service_name, request, delivery_info)
  message = Yajl::Parser.parse request, symbolize_keys: true
  kind = message[:kind]
  payload = message[:payload]
  options = message[:options]
  response = invoke_service(service_name, kind, payload)
  if delivery_info.reply_to
    response.callback do |service_response|
      queue_service.respond service_response, delivery_info
    end
    response.errback do |service_response|
      failure_response = { error: service_response }
      queue_service.respond failure_response, delivery_info
    end
  end
end

#request(name, kind, message, timeout: RPC_DEFAULT_TIMEOUT, fast: false) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/combi/buses/queue.rb', line 49

def request(name, kind, message, timeout: RPC_DEFAULT_TIMEOUT, fast: false)
  log "Preparing request: #{name}.#{kind} #{message.inspect[0..500]}\t|| timeout: #{timeout} fast: #{fast}"
  options = {
    timeout: timeout,
    routing_key: name.to_s
  }
  if fast
    waiter = nil
  else
    correlation_id = Combi::Correlation.generate
    options[:correlation_id] = correlation_id
    waiter = @response_store.wait_for correlation_id, timeout
  end
  queue_service.next_ready_only do
    log "Making request: #{name}.#{kind} #{message.inspect[0..500]}\t|| #{options.inspect[0..500]}"
    queue_service.publish_request(kind, message, options)
  end
  waiter
end

#start!Object



18
19
20
# File 'lib/combi/buses/queue.rb', line 18

def start!
  queue_service.start
end

#stop!Object



22
23
24
25
26
# File 'lib/combi/buses/queue.rb', line 22

def stop!
  queue_service.ready do
    @queue_service.disconnect
  end
end