Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/hutch/patch/worker.rb

Overview

Monkey patch worker. 因为 hutch 是借用的底层的 bunny 的 ConsumerWorkPool 来完成的并发任务处理, 但这个 Pool 太过于通用, 而我们需要针对 rabbitmq 传入过来的 Message 进行处理, 需要在任务执行的过程中添加额外的处理信息, 所以我们不由 ConsumerWorkPool 来处理任务, 改为由 ConsumerWorkPool 执行一次任务提交, 由 Bunny::ConsumerWorkPool 将需要执行的 block 提交给自己的 WorkerPool 来进行最终执行. 因为 RabbitMQ 队列中的任务是需要手动 Ack 才会标记完成, 并且根据 Channel 会有 Prefetch, 所以结合这两个特性则可以利用本地进程中的 Queue 进行缓存任务, 只要没有执行会有 Prefetch 控制当前节点缓存的总任务数, 根据 Ack 会明确告知 RabbitMQ 此任务完成.

Instance Method Summary collapse

Constructor Details

#initialize(broker, consumers, setup_procs) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/hutch/patch/worker.rb', line 12

def initialize(broker, consumers, setup_procs)
  raise "use Hutch::Schedule must set an positive channel_prefetch" if Hutch::Config.get(:channel_prefetch) < 1
  @broker          = broker
  self.consumers   = consumers
  self.setup_procs = setup_procs
  
  @message_worker = Concurrent::FixedThreadPool.new(Hutch::Config.get(:worker_pool_size))
  @timer_worker   = Concurrent::TimerTask.execute(execution_interval: Hutch::Config.get(:poller_interval)) do
    # all chekcer in the same thread
    heartbeat_connection
    flush_to_retry
    retry_buffer_queue
  end
  
  # The queue size maybe the same as channel[prefetch] and every Consumer shared one buffer queue with the
  # same prefetch size, when current consumer have unack messages reach the prefetch size rabbitmq will stop push
  # message to this consumer.
  # Because the buffer queue is shared by all consumers so the max queue size is [prefetch * consumer count],
  # if prefetch is 20 and have 30 consumer the max queue size is  20 * 30 = 600.
  @buffer_queue    = ::Queue.new
  @batch_size      = Hutch::Config.get(:poller_batch_size)
  @connected       = Hutch.connected?
  @last_flush_time = Time.now.utc
end

Instance Method Details

#consumer_msg(consumer, delivery_info, properties, payload) ⇒ Object

change args to message reuse the code from #handle_message



80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/hutch/patch/worker.rb', line 80

def consumer_msg(consumer, delivery_info, properties, payload)
  serializer = consumer.get_serializer || Hutch::Config[:serializer]
  logger.debug {
    spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}"
    "message(#{properties.message_id || '-'}): " +
      "routing key: #{delivery_info.routing_key}, " +
      "consumer: #{consumer}, " +
      "payload: #{spec}"
  }
  
  ConsumerMsg.new(consumer, Hutch::Message.new(delivery_info, properties, payload, serializer))
end

#flush_to_retryObject

对于 rate 间隔比较长的, 不适合一直存储在 buffer 中, 所以需要根据 interval 的值将长周期的 message 重新入队给 RabbitMQ 让其进行等待, 但同时不可以让其直接 Requeue, 这样会导致频繁的与 RabbitMQ 来往交换. 需要让消息根据周期以及执行次数逐步拉长等待, 直到最终最长时间的等待.

有下面几个要求:

- 


135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/hutch/patch/worker.rb', line 135

def flush_to_retry
  now = Time.now.utc
  if now - @last_flush_time >= Hutch::Config.get(:worker_buffer_flush_interval)
    @buffer_queue.size.times do
      cmsg = peak
      break if cmsg.blank?
      # 如果没有被处理, 重新放回 buffer
      @buffer_queue.push(cmsg) unless cmsg.enqueue_in_or_not
    end
    @last_flush_time = now
    logger.debug "flush_to_retry #{Time.now.utc - now}"
  end
end

#handle_cmsg(consumer, delivery_info, properties, payload, message) ⇒ Object



93
94
95
96
97
98
99
100
# File 'lib/hutch/patch/worker.rb', line 93

def handle_cmsg(consumer, delivery_info, properties, payload, message)
  consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info }
  with_tracing(consumer_instance).handle(message)
  @broker.ack(delivery_info.delivery_tag)
rescue => ex
  acknowledge_error(delivery_info, properties, @broker, ex)
  handle_error(properties, payload, consumer, ex)
end

#handle_cmsg_with_limits(cmsg) ⇒ Object

cmsg: ConsumerMsg



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/hutch/patch/worker.rb', line 60

def handle_cmsg_with_limits(cmsg)
  # 正常的任务处理 ratelimit 的处理逻辑, 如果有限制那么就进入 buffer 缓冲
  consumer = cmsg.consumer
  @message_worker.post do
    if consumer.ratelimit_exceeded?(cmsg.message)
      @buffer_queue.push(cmsg)
    else
      # if Hutch disconnect skip do work let message timeout in rabbitmq waiting message push again
      return unless @connected
      consumer.ratelimit_add(cmsg.message)
      handle_cmsg(*cmsg.handle_cmsg_args)
    end
  end
end

#handle_message_with_limits(consumer, delivery_info, properties, payload) ⇒ Object



75
76
77
# File 'lib/hutch/patch/worker.rb', line 75

def handle_message_with_limits(consumer, delivery_info, properties, payload)
  handle_cmsg_with_limits(consumer_msg(consumer, delivery_info, properties, payload))
end

#heartbeat_connectionObject

心跳检查 Hutch 的连接



104
105
106
# File 'lib/hutch/patch/worker.rb', line 104

def heartbeat_connection
  @connected = Hutch.connected?
end

#peakObject

non-blocking pop message, if empty return nil. other error raise exception



150
151
152
153
154
# File 'lib/hutch/patch/worker.rb', line 150

def peak
  @buffer_queue.pop(true)
rescue ThreadError => e
  nil if e.to_s == "queue empty"
end

#retry_buffer_queueObject

每隔一段时间, 从 buffer queue 中转移任务到执行, interval 比较短的会立即执行掉



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/hutch/patch/worker.rb', line 109

def retry_buffer_queue
  begin_size = @buffer_queue.size
  now        = Time.now.utc
  stat       = {}
  @batch_size.times do
    cmsg = peak
    break if cmsg.blank?
    handle_cmsg_with_limits(cmsg)
    
    next unless logger.level == Logger::DEBUG
    if stat.key?(cmsg.message.body[:b])
      stat[cmsg.message.body[:b]] += 1
    else
      stat[cmsg.message.body[:b]] = 1
    end
  end
  logger.debug "retry_buffer_queue #{Time.now.utc - now}, size from #{begin_size} to #{@buffer_queue.size}, stat: #{stat}"
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/hutch/patch/worker.rb', line 47

def setup_queue(consumer)
  logger.info "setting up queue: #{consumer.get_queue_name}"
  
  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)
  
  queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args|
    delivery_info, properties, payload = Hutch::Adapter.decode_message(*args)
    handle_message_with_limits(consumer, delivery_info, properties, payload)
  end
end

#stopObject

Stop a running worker by killing all subscriber threads. Stop two thread pool



39
40
41
42
43
# File 'lib/hutch/patch/worker.rb', line 39

def stop
  @timer_worker.shutdown
  @broker.stop
  @message_worker.shutdown
end