Class: Hutch::Schedule::Core
- Inherits:
-
Object
- Object
- Hutch::Schedule::Core
- Defined in:
- lib/hutch/schedule/core.rb
Instance Attribute Summary collapse
-
#broker ⇒ Object
readonly
Returns the value of attribute broker.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
Instance Method Summary collapse
-
#connect! ⇒ Object
Becareful with the sequence of initialize.
- #declare_delay_exchange(ch = channel) ⇒ Object
-
#declare_delay_exchange! ⇒ Object
The exchange used by Hutch::Schedule.
- #declare_publisher! ⇒ Object
-
#initialize(broker) ⇒ Core
constructor
A new instance of Core.
-
#publish(*args) ⇒ Object
Schedule`s publisher, publish the message to schedule topic exchange.
- #setup_delay_queue!(suffix) ⇒ Object
-
#setup_delay_queues! ⇒ Object
The queue used by Hutch::Schedule.
Constructor Details
#initialize(broker) ⇒ Core
Returns a new instance of Core.
12 13 14 15 |
# File 'lib/hutch/schedule/core.rb', line 12 def initialize(broker) raise "Broker can`t be nil" if broker.blank? @broker = broker end |
Instance Attribute Details
#broker ⇒ Object (readonly)
Returns the value of attribute broker.
9 10 11 |
# File 'lib/hutch/schedule/core.rb', line 9 def broker @broker end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
9 10 11 |
# File 'lib/hutch/schedule/core.rb', line 9 def exchange @exchange end |
Instance Method Details
#connect! ⇒ Object
Becareful with the sequence of initialize
18 19 20 21 22 |
# File 'lib/hutch/schedule/core.rb', line 18 def connect! declare_delay_exchange! declare_publisher! setup_delay_queues! end |
#declare_delay_exchange(ch = channel) ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'lib/hutch/schedule/core.rb', line 33 def declare_delay_exchange(ch = channel) exchange_name = "#{Hutch::Config.get(:mq_exchange)}.schedule" = { durable: true }.merge(Hutch::Config.get(:mq_exchange_options)) logger.info "using topic exchange(schedule) '#{exchange_name}'" broker.send(:with_bunny_precondition_handler, 'schedule exchange') do ch.topic(exchange_name, ) end end |
#declare_delay_exchange! ⇒ Object
The exchange used by Hutch::Schedule
29 30 31 |
# File 'lib/hutch/schedule/core.rb', line 29 def declare_delay_exchange! @exchange = declare_delay_exchange end |
#declare_publisher! ⇒ Object
24 25 26 |
# File 'lib/hutch/schedule/core.rb', line 24 def declare_publisher! @publisher = Hutch::Publisher.new(connection, channel, exchange) end |
#publish(*args) ⇒ Object
Schedule`s publisher, publish the message to schedule topic exchange
58 59 60 |
# File 'lib/hutch/schedule/core.rb', line 58 def publish(*args) @publisher.publish(*args) end |
#setup_delay_queue!(suffix) ⇒ Object
48 49 50 51 52 53 54 55 |
# File 'lib/hutch/schedule/core.rb', line 48 def setup_delay_queue!(suffix) # TODO: extract the ttl to config params props = { :'x-message-ttl' => 30.days.in_milliseconds, :'x-dead-letter-exchange' => Hutch::Config.get(:mq_exchange) } queue = broker.queue(Hutch::Schedule.delay_queue_name(suffix), props) # bind routing_key to schedule exchange queue.bind(exchange, routing_key: Hutch::Schedule.delay_routing_key(suffix)) end |
#setup_delay_queues! ⇒ Object
The queue used by Hutch::Schedule
44 45 46 |
# File 'lib/hutch/schedule/core.rb', line 44 def setup_delay_queues! DELAY_QUEUES.map { |suffix| setup_delay_queue!(suffix) } end |