Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/hutch/patch/worker.rb

Overview

Monkey patch worker. 因为 hutch 是借用的底层的 bunny 的 ConsumerWorkPool 来完成的并发任务处理, 但这个 Pool 太过于通用, 而我们需要针对 rabbitmq 传入过来的 Message 进行处理, 需要在任务执行的过程中 添加额外的处理信息, 所以我们不由 ConsumerWorkPool 来处理任务, 改为由 ConsumerWorkPool 执行一次任务提交, 由 Bunny::ConsumerWorkPool 将需要执行的 block 提交给自己的 WorkerPool 来进行最终执行. 因为 RabbitMQ 队列中的任务是需要手动 Ack 才会标记完成, 并且根据 Channel 会有 Prefetch, 所以结合这两个特性 则可以利用本地进程中的 Queue 进行缓存任务, 只要没有执行会有 Prefetch 控制当前节点缓存的总任务数, 根据 Ack 会 明确告知 RabbitMQ 此任务完成.

Instance Method Summary collapse

Constructor Details

#initialize(broker, consumers, setup_procs) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/hutch/patch/worker.rb', line 12

def initialize(broker, consumers, setup_procs)
  raise "use Hutch::Schedule must set an positive channel_prefetch" if Hutch::Config.get(:channel_prefetch) < 1
  @broker          = broker
  self.consumers   = consumers
  self.setup_procs = setup_procs
  
  @message_worker = Concurrent::FixedThreadPool.new(Hutch::Config.get(:worker_pool_size))
  @timer_worker   = Concurrent::TimerTask.execute(execution_interval: Hutch::Config.get(:poller_interval)) do
    # all chekcer in the same thread
    heartbeat_connection
    flush_to_retry
    retry_buffer_queue
  end
  
  # The queue size maybe the same as channel[prefetch] and every Consumer shared one buffer queue with the
  # same prefetch size, when current consumer have unack messages reach the prefetch size rabbitmq will stop push
  # message to this consumer.
  # Because the buffer queue is shared by all consumers so the max queue size is [prefetch * consumer count],
  # if prefetch is 20 and have 30 consumer the max queue size is  20 * 30 = 600.
  @buffer_queue    = ::Queue.new
  @batch_size      = Hutch::Config.get(:poller_batch_size)
  @connected       = Hutch.connected?
  @last_flush_time = Time.now.utc
end

Instance Method Details

#consumer_msg(consumer, delivery_info, properties, payload) ⇒ Object

change args to message reuse the code from #handle_message



80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/hutch/patch/worker.rb', line 80

def consumer_msg(consumer, delivery_info, properties, payload)
  serializer = consumer.get_serializer || Hutch::Config[:serializer]
  logger.debug {
    spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}"
    "message(#{properties.message_id || '-'}): " +
      "routing key: #{delivery_info.routing_key}, " +
      "consumer: #{consumer}, " +
      "payload: #{spec}"
  }
  
  ConsumerMsg.new(consumer, Hutch::Message.new(delivery_info, properties, payload, serializer))
end

#flush_to_retryObject

对于 rate 间隔比较长的, 不适合一直存储在 buffer 中, 所以需要根据 interval 的值将长周期的 message 重新入队给 RabbitMQ 让其进行 等待, 但同时不可以让其直接 Requeue, 这样会导致频繁的与 RabbitMQ 来往交换. 需要让消息根据周期以及执行次数逐步拉长等待, 直到最终最长 时间的等待.

有下面几个要求:

- 在 retry_buffer_queue 之前调用
- 整个方法调用时间长度需要在 1s 之内


135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/hutch/patch/worker.rb', line 135

def flush_to_retry
  now = Time.now.utc
  if now - @last_flush_time >= Hutch::Config.get(:worker_buffer_flush_interval)
    @buffer_queue.size.times do
      cmsg = peak
      break if cmsg.blank?
      # 如果没有被处理, 重新放回 buffer
      @buffer_queue.push(cmsg) unless cmsg.enqueue_in_or_not
    end
    @last_flush_time = now
    logger.debug "flush_to_retry #{Time.now.utc - now}"
  end
end

#handle_cmsg(consumer, delivery_info, properties, payload, message) ⇒ Object



93
94
95
96
97
98
99
100
# File 'lib/hutch/patch/worker.rb', line 93

def handle_cmsg(consumer, delivery_info, properties, payload, message)
  consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
  with_tracing(consumer_instance).handle(message)
  @broker.ack(delivery_info.delivery_tag)
rescue => ex
  acknowledge_error(delivery_info, properties, @broker, ex)
  handle_error(properties, payload, consumer, ex)
end

#handle_cmsg_with_limits(cmsg) ⇒ Object

cmsg: ConsumerMsg



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/hutch/patch/worker.rb', line 60

def handle_cmsg_with_limits(cmsg)
  # 正常的任务处理 ratelimit 的处理逻辑, 如果有限制那么就进入 buffer 缓冲
  consumer = cmsg.consumer
  @message_worker.post do
    if consumer.ratelimit_exceeded?(cmsg.message)
      @buffer_queue.push(cmsg)
    else
      # if Hutch disconnect skip do work let message timeout in rabbitmq waiting message push again
      return unless @connected
      consumer.ratelimit_add(cmsg.message)
      handle_cmsg(*cmsg.handle_cmsg_args)
    end
  end
end

#handle_message_with_limits(consumer, delivery_info, properties, payload) ⇒ Object



75
76
77
# File 'lib/hutch/patch/worker.rb', line 75

def handle_message_with_limits(consumer, delivery_info, properties, payload)
  handle_cmsg_with_limits(consumer_msg(consumer, delivery_info, properties, payload))
end

#heartbeat_connectionObject

心跳检查 Hutch 的连接



104
105
106
# File 'lib/hutch/patch/worker.rb', line 104

def heartbeat_connection
  @connected = Hutch.connected?
end

#peakObject

non-blocking pop message, if empty return nil. other error raise exception



150
151
152
153
154
# File 'lib/hutch/patch/worker.rb', line 150

def peak
  @buffer_queue.pop(true)
rescue ThreadError => e
  nil if e.to_s == "queue empty"
end

#retry_buffer_queueObject

每隔一段时间, 从 buffer queue 中转移任务到执行, interval 比较短的会立即执行掉



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/hutch/patch/worker.rb', line 109

def retry_buffer_queue
  begin_size = @buffer_queue.size
  now        = Time.now.utc
  stat       = {}
  @batch_size.times do
    cmsg = peak
    break if cmsg.blank?
    handle_cmsg_with_limits(cmsg)
    
    next unless logger.level == Logger::DEBUG
    if stat.key?(cmsg.message.body[:b])
      stat[cmsg.message.body[:b]] += 1
    else
      stat[cmsg.message.body[:b]] = 1
    end
  end
  logger.debug "retry_buffer_queue #{Time.now.utc - now}, size from #{begin_size} to #{@buffer_queue.size}, stat: #{stat}"
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/hutch/patch/worker.rb', line 47

def setup_queue(consumer)
  logger.info "setting up queue: #{consumer.get_queue_name}"
  
  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)
  
  queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args|
    delivery_info, properties, payload = Hutch::Adapter.decode_message(*args)
    handle_message_with_limits(consumer, delivery_info, properties, payload)
  end
end

#stopObject

Stop a running worker by killing all subscriber threads. Stop two thread pool



39
40
41
42
43
# File 'lib/hutch/patch/worker.rb', line 39

def stop
  @timer_worker.shutdown
  @broker.stop
  @message_worker.shutdown
end