Module: LaGear::Bus

Defined in:
lib/la_gear/bus.rb

Defined Under Namespace

Classes: DelayablePublisher, NamespaceUtility

Class Method Summary collapse

Class Method Details

.init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
# File 'lib/la_gear/bus.rb', line 3

def init_pool(size = ::Sidekiq.options[:concurrency],
                   timeout = 3)
  $publisher = ConnectionPool.new(
    size: size,
    timeout: timeout
  ) { ::LaGear::Publisher.new }

  $publisher.with do |bus|
    fail 'Bus is lost!' unless bus.is_a?(LaGear::Publisher)
  end
end

.publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object



16
17
18
19
# File 'lib/la_gear/bus.rb', line 16

def publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay(sidekiq_opts).publish(routing_key, msg, bunny_opts)
end

.publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object



28
29
30
31
# File 'lib/la_gear/bus.rb', line 28

def publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay_until(timestamp, sidekiq_opts).publish(routing_key, msg, bunny_opts)
end

.publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object



22
23
24
25
# File 'lib/la_gear/bus.rb', line 22

def publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  DelayablePublisher.sidekiq_delay_for(interval, sidekiq_opts).publish(routing_key, msg, bunny_opts)
end

.publish_local(routing_key, msg, la_gear_opts = {}) ⇒ Object



34
35
36
37
# File 'lib/la_gear/bus.rb', line 34

def publish_local(routing_key, msg, la_gear_opts = {})
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  NamespaceUtility.local_worker(routing_key).perform_async(*msg.values)
end

.publish_local_in(routing_key, msg, la_gear_opts = {}, interval) ⇒ Object



40
41
42
43
# File 'lib/la_gear/bus.rb', line 40

def publish_local_in(routing_key, msg, la_gear_opts = {}, interval)
  routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
  NamespaceUtility.local_worker(routing_key).perform_in(interval, *msg.values)
end