Class: QueueBus::Local

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_bus/local.rb

Overview

only process local queues

Class Method Summary collapse

Class Method Details

.publish(attributes = {}) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/queue_bus/local.rb', line 6

def publish(attributes = {})
  if ::QueueBus.local_mode == :suppress
    ::QueueBus.log_worker("Suppressed: #{attributes.inspect}")
    return  # not doing anything
  end

  # To json and back to simlulate enqueueing
  json = ::QueueBus::Util.encode(attributes)
  attributes = ::QueueBus::Util.decode(json)

  ::QueueBus.log_worker("Local running: #{attributes.inspect}")

  # looking for subscriptions, not queues
  subscription_matches(attributes).each do |sub|
    bus_attr = {  "bus_driven_at" => Time.now.to_i,
                  "bus_rider_queue" => sub.queue_name,
                  "bus_rider_app_key" => sub.app_key,
                  "bus_rider_sub_key" => sub.key,
                  "bus_rider_class_name" => sub.class_name}
    to_publish = bus_attr.merge(attributes || {})
    if ::QueueBus.local_mode == :standalone
      ::QueueBus.enqueue_to(sub.queue_name, sub.class_name, bus_attr.merge(attributes || {}))
    else  # defaults to inline mode
      sub.execute!(to_publish)
    end
  end
end

.subscription_matches(attributes) ⇒ Object

looking directly at subscriptions loaded into dispatcher so we don’t need redis server up



36
37
38
39
40
41
42
# File 'lib/queue_bus/local.rb', line 36

def subscription_matches(attributes)
  out = []
  ::QueueBus.dispatchers.each do |dispatcher|
    out.concat(dispatcher.subscription_matches(attributes))
  end
  out
end