Class: Hutch::Worker
- Inherits:
-
Object
- Object
- Hutch::Worker
- Defined in:
- lib/hutch/patch/worker.rb
Overview
Monkey patch worker. 因为 hutch 是借用的底层的 bunny 的 ConsumerWorkPool 来完成的并发任务处理, 但这个 Pool 太过于通用, 而我们需要针对 rabbitmq 传入过来的 Message 进行处理, 需要在任务执行的过程中添加额外的处理信息, 所以我们不由 ConsumerWorkPool 来处理任务, 改为由 ConsumerWorkPool 执行一次任务提交, 由 Bunny::ConsumerWorkPool 将需要执行的 block 提交给自己的 WorkerPool 来进行最终执行. 因为 RabbitMQ 队列中的任务是需要手动 Ack 才会标记完成, 并且根据 Channel 会有 Prefetch, 所以结合这两个特性则可以利用本地进程中的 Queue 进行缓存任务, 只要没有执行会有 Prefetch 控制当前节点缓存的总任务数, 根据 Ack 会明确告知 RabbitMQ 此任务完成.
Instance Method Summary collapse
- #handle_message_with_limits(consumer, delivery_info, properties, payload) ⇒ Object
-
#heartbeat_connection ⇒ Object
心跳检查 Hutch 的连接.
-
#initialize(broker, consumers, setup_procs) ⇒ Worker
constructor
A new instance of Worker.
-
#peak ⇒ Object
non-blocking pop message, if empty return nil.
-
#retry_buffer_queue ⇒ Object
每隔一段时间, 从 buffer queue 中转移任务到执行.
-
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
-
#stop ⇒ Object
Stop a running worker by killing all subscriber threads.
Constructor Details
#initialize(broker, consumers, setup_procs) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/hutch/patch/worker.rb', line 12 def initialize(broker, consumers, setup_procs) @broker = broker self.consumers = consumers self.setup_procs = setup_procs @message_worker = Concurrent::FixedThreadPool.new(Hutch::Config.get(:worker_pool_size)) @timer_worker = Concurrent::TimerTask.execute(execution_interval: Hutch::Config.get(:poller_interval)) do heartbeat_connection retry_buffer_queue end # The queue size maybe the same as channel[prefetch] and every Consumer have it's own buffer queue with the same prefetch size, # when the buffer queue have the prefetch size message rabbitmq will stop push message to this consumer but it's ok. # The consumer will threshold by the shared redis instace. @buffer_queue = ::Queue.new @batch_size = Hutch::Config.get(:poller_batch_size) @connected = Hutch.connected? end |
Instance Method Details
#handle_message_with_limits(consumer, delivery_info, properties, payload) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/hutch/patch/worker.rb', line 53 def (consumer, delivery_info, properties, payload) # 1. consumer.limit? # 2. yes: make and ConsumerMsg to queue # 3. no: post handle @message_worker.post do if consumer.ratelimit_exceeded? @buffer_queue.push(ConsumerMsg.new(consumer, delivery_info, properties, payload)) else # if Hutch disconnect skip do work let message timeout in rabbitmq waiting message push again return unless @connected consumer.ratelimit_add (consumer, delivery_info, properties, payload) end end end |
#heartbeat_connection ⇒ Object
心跳检查 Hutch 的连接
70 71 72 |
# File 'lib/hutch/patch/worker.rb', line 70 def heartbeat_connection @connected = Hutch.connected? end |
#peak ⇒ Object
non-blocking pop message, if empty return nil. other error raise exception
84 85 86 87 88 |
# File 'lib/hutch/patch/worker.rb', line 84 def peak @buffer_queue.pop(true) rescue ThreadError => e nil if e.to_s == "queue empty" end |
#retry_buffer_queue ⇒ Object
每隔一段时间, 从 buffer queue 中转移任务到执行
75 76 77 78 79 80 81 |
# File 'lib/hutch/patch/worker.rb', line 75 def retry_buffer_queue @batch_size.times do cmsg = peak return if cmsg.blank? (cmsg.consumer, cmsg.delivery_info, cmsg.properties, cmsg.payload) end end |
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/hutch/patch/worker.rb', line 41 def setup_queue(consumer) logger.info "setting up queue: #{consumer.get_queue_name}" queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args| delivery_info, properties, payload = Hutch::Adapter.(*args) (consumer, delivery_info, properties, payload) end end |
#stop ⇒ Object
Stop a running worker by killing all subscriber threads. Stop two thread pool
33 34 35 36 37 |
# File 'lib/hutch/patch/worker.rb', line 33 def stop @timer_worker.shutdown @message_worker.shutdown @broker.stop end |