Module: LaGear::Bus
- Defined in:
- lib/la_gear/bus.rb
Defined Under Namespace
Classes: DelayablePublisher, NamespaceUtility
Class Method Summary
collapse
-
.init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3) ⇒ Object
-
.publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object
-
.publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object
-
.publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object
-
.publish_local(routing_key, msg, la_gear_opts = {}) ⇒ Object
-
.publish_local_in(routing_key, msg, la_gear_opts = {}, interval) ⇒ Object
Class Method Details
.init_pool(size = ::Sidekiq.options[:concurrency], timeout = 3) ⇒ Object
3
4
5
6
7
8
9
10
11
12
13
|
# File 'lib/la_gear/bus.rb', line 3
def init_pool(size = ::Sidekiq.options[:concurrency],
timeout = 3)
$publisher = ConnectionPool.new(
size: size,
timeout: timeout
) { ::LaGear::Publisher.new }
$publisher.with do |bus|
fail 'Bus is lost!' unless bus.is_a?(LaGear::Publisher)
end
end
|
.publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object
16
17
18
19
|
# File 'lib/la_gear/bus.rb', line 16
def publish(routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
DelayablePublisher.sidekiq_delay(sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
|
.publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object
28
29
30
31
|
# File 'lib/la_gear/bus.rb', line 28
def publish_at(timestamp, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
DelayablePublisher.sidekiq_delay_until(timestamp, sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
|
.publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {}) ⇒ Object
22
23
24
25
|
# File 'lib/la_gear/bus.rb', line 22
def publish_in(interval, routing_key, msg, la_gear_opts = {}, bunny_opts = {}, sidekiq_opts = {})
routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
DelayablePublisher.sidekiq_delay_for(interval, sidekiq_opts).publish(routing_key, msg, bunny_opts)
end
|
.publish_local(routing_key, msg, la_gear_opts = {}) ⇒ Object
34
35
36
37
|
# File 'lib/la_gear/bus.rb', line 34
def publish_local(routing_key, msg, la_gear_opts = {})
routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
NamespaceUtility.local_worker(routing_key).perform_async(*msg.values)
end
|
.publish_local_in(routing_key, msg, la_gear_opts = {}, interval) ⇒ Object
40
41
42
43
|
# File 'lib/la_gear/bus.rb', line 40
def publish_local_in(routing_key, msg, la_gear_opts = {}, interval)
routing_key = NamespaceUtility.adjust_routing_key(routing_key, la_gear_opts)
NamespaceUtility.local_worker(routing_key).perform_in(interval, *msg.values)
end
|