Class: Hutch::Worker
- Inherits:
-
Object
- Object
- Hutch::Worker
- Defined in:
- lib/hutch/patch/worker.rb
Overview
Monkey patch worker. 因为 hutch 是借用的底层的 bunny 的 ConsumerWorkPool 来完成的并发任务处理, 但这个 Pool 太过于通用, 而我们需要针对 rabbitmq 传入过来的 Message 进行处理, 需要在任务执行的过程中 添加额外的处理信息, 所以我们不由 ConsumerWorkPool 来处理任务, 改为由 ConsumerWorkPool 执行一次任务提交, 由 Bunny::ConsumerWorkPool 将需要执行的 block 提交给自己的 WorkerPool 来进行最终执行. 因为 RabbitMQ 队列中的任务是需要手动 Ack 才会标记完成, 并且根据 Channel 会有 Prefetch, 所以结合这两个特性 则可以利用本地进程中的 Queue 进行缓存任务, 只要没有执行会有 Prefetch 控制当前节点缓存的总任务数, 根据 Ack 会 明确告知 RabbitMQ 此任务完成.
Instance Method Summary collapse
-
#consumer_msg(consumer, delivery_info, properties, payload) ⇒ Object
change args to message reuse the code from #handle_message.
-
#flush_to_retry ⇒ Object
对于 rate 间隔比较长的, 不适合一直存储在 buffer 中, 所以需要根据 interval 的值将长周期的 message 重新入队给 RabbitMQ 让其进行 等待, 但同时不可以让其直接 Requeue, 这样会导致频繁的与 RabbitMQ 来往交换.
- #handle_cmsg(consumer, delivery_info, properties, payload, message) ⇒ Object
-
#handle_cmsg_with_limits(cmsg) ⇒ Object
cmsg: ConsumerMsg.
- #handle_message_with_limits(consumer, delivery_info, properties, payload) ⇒ Object
-
#heartbeat_connection ⇒ Object
心跳检查 Hutch 的连接.
-
#initialize(broker, consumers, setup_procs) ⇒ Worker
constructor
A new instance of Worker.
-
#peak ⇒ Object
non-blocking pop message, if empty return nil.
-
#retry_buffer_queue ⇒ Object
每隔一段时间, 从 buffer queue 中转移任务到执行, interval 比较短的会立即执行掉.
-
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
-
#stop ⇒ Object
Stop a running worker by killing all subscriber threads.
Constructor Details
#initialize(broker, consumers, setup_procs) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/hutch/patch/worker.rb', line 12 def initialize(broker, consumers, setup_procs) raise "use Hutch::Schedule must set an positive channel_prefetch" if Hutch::Config.get(:channel_prefetch) < 1 @broker = broker self.consumers = consumers self.setup_procs = setup_procs @message_worker = Concurrent::FixedThreadPool.new(Hutch::Config.get(:worker_pool_size)) @timer_worker = Concurrent::TimerTask.execute(execution_interval: Hutch::Config.get(:poller_interval)) do # all chekcer in the same thread heartbeat_connection flush_to_retry retry_buffer_queue end # The queue size maybe the same as channel[prefetch] and every Consumer shared one buffer queue with the # same prefetch size, when current consumer have unack messages reach the prefetch size rabbitmq will stop push # message to this consumer. # Because the buffer queue is shared by all consumers so the max queue size is [prefetch * consumer count], # if prefetch is 20 and have 30 consumer the max queue size is 20 * 30 = 600. @buffer_queue = ::Queue.new @batch_size = Hutch::Config.get(:poller_batch_size) @connected = Hutch.connected? @last_flush_time = Time.now.utc end |
Instance Method Details
#consumer_msg(consumer, delivery_info, properties, payload) ⇒ Object
change args to message reuse the code from #handle_message
80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/hutch/patch/worker.rb', line 80 def consumer_msg(consumer, delivery_info, properties, payload) serializer = consumer.get_serializer || Hutch::Config[:serializer] logger.debug { spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}" "message(#{properties. || '-'}): " + "routing key: #{delivery_info.routing_key}, " + "consumer: #{consumer}, " + "payload: #{spec}" } ConsumerMsg.new(consumer, Hutch::Message.new(delivery_info, properties, payload, serializer)) end |
#flush_to_retry ⇒ Object
对于 rate 间隔比较长的, 不适合一直存储在 buffer 中, 所以需要根据 interval 的值将长周期的 message 重新入队给 RabbitMQ 让其进行 等待, 但同时不可以让其直接 Requeue, 这样会导致频繁的与 RabbitMQ 来往交换. 需要让消息根据周期以及执行次数逐步拉长等待, 直到最终最长 时间的等待.
有下面几个要求:
- 在 retry_buffer_queue 之前调用
- 整个方法调用时间长度需要在 1s 之内
135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/hutch/patch/worker.rb', line 135 def flush_to_retry now = Time.now.utc if now - @last_flush_time >= Hutch::Config.get(:worker_buffer_flush_interval) @buffer_queue.size.times do cmsg = peak break if cmsg.blank? # 如果没有被处理, 重新放回 buffer @buffer_queue.push(cmsg) unless cmsg.enqueue_in_or_not end @last_flush_time = now logger.debug "flush_to_retry #{Time.now.utc - now}" end end |
#handle_cmsg(consumer, delivery_info, properties, payload, message) ⇒ Object
93 94 95 96 97 98 99 100 |
# File 'lib/hutch/patch/worker.rb', line 93 def handle_cmsg(consumer, delivery_info, properties, payload, ) consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info } with_tracing(consumer_instance).handle() @broker.ack(delivery_info.delivery_tag) rescue => ex acknowledge_error(delivery_info, properties, @broker, ex) handle_error(properties, payload, consumer, ex) end |
#handle_cmsg_with_limits(cmsg) ⇒ Object
cmsg: ConsumerMsg
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/hutch/patch/worker.rb', line 60 def handle_cmsg_with_limits(cmsg) # 正常的任务处理 ratelimit 的处理逻辑, 如果有限制那么就进入 buffer 缓冲 consumer = cmsg.consumer @message_worker.post do if consumer.ratelimit_exceeded?(cmsg.) @buffer_queue.push(cmsg) else # if Hutch disconnect skip do work let message timeout in rabbitmq waiting message push again return unless @connected consumer.ratelimit_add(cmsg.) handle_cmsg(*cmsg.handle_cmsg_args) end end end |
#handle_message_with_limits(consumer, delivery_info, properties, payload) ⇒ Object
75 76 77 |
# File 'lib/hutch/patch/worker.rb', line 75 def (consumer, delivery_info, properties, payload) handle_cmsg_with_limits(consumer_msg(consumer, delivery_info, properties, payload)) end |
#heartbeat_connection ⇒ Object
心跳检查 Hutch 的连接
104 105 106 |
# File 'lib/hutch/patch/worker.rb', line 104 def heartbeat_connection @connected = Hutch.connected? end |
#peak ⇒ Object
non-blocking pop message, if empty return nil. other error raise exception
150 151 152 153 154 |
# File 'lib/hutch/patch/worker.rb', line 150 def peak @buffer_queue.pop(true) rescue ThreadError => e nil if e.to_s == "queue empty" end |
#retry_buffer_queue ⇒ Object
每隔一段时间, 从 buffer queue 中转移任务到执行, interval 比较短的会立即执行掉
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/hutch/patch/worker.rb', line 109 def retry_buffer_queue begin_size = @buffer_queue.size now = Time.now.utc stat = {} @batch_size.times do cmsg = peak break if cmsg.blank? handle_cmsg_with_limits(cmsg) next unless logger.level == Logger::DEBUG if stat.key?(cmsg..body[:b]) stat[cmsg..body[:b]] += 1 else stat[cmsg..body[:b]] = 1 end end logger.debug "retry_buffer_queue #{Time.now.utc - now}, size from #{begin_size} to #{@buffer_queue.size}, stat: #{stat}" end |
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/hutch/patch/worker.rb', line 47 def setup_queue(consumer) logger.info "setting up queue: #{consumer.get_queue_name}" queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args| delivery_info, properties, payload = Hutch::Adapter.(*args) (consumer, delivery_info, properties, payload) end end |
#stop ⇒ Object
Stop a running worker by killing all subscriber threads. Stop two thread pool
39 40 41 42 43 |
# File 'lib/hutch/patch/worker.rb', line 39 def stop @timer_worker.shutdown @broker.stop @message_worker.shutdown end |