Class: Hutch::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/hutch/patch/worker.rb

Overview

Monkey patch worker. 因为 hutch 是借用的底层的 bunny 的 ConsumerWorkPool 来完成的并发任务处理, 但这个 Pool 太过于通用, 而我们需要针对 rabbitmq 传入过来的 Message 进行处理, 需要在任务执行的过程中添加额外的处理信息, 所以我们不由 ConsumerWorkPool 来处理任务, 改为由 ConsumerWorkPool 执行一次任务提交, 由 Bunny::ConsumerWorkPool 将需要执行的 block 提交给自己的 WorkerPool 来进行最终执行. 因为 RabbitMQ 队列中的任务是需要手动 Ack 才会标记完成, 并且根据 Channel 会有 Prefetch, 所以结合这两个特性则可以利用本地进程中的 Queue 进行缓存任务, 只要没有执行会有 Prefetch 控制当前节点缓存的总任务数, 根据 Ack 会明确告知 RabbitMQ 此任务完成.

Instance Method Summary collapse

Constructor Details

#initialize(broker, consumers, setup_procs) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/hutch/patch/worker.rb', line 12

def initialize(broker, consumers, setup_procs)
  @broker          = broker
  self.consumers   = consumers
  self.setup_procs = setup_procs
  
  @message_worker = Concurrent::FixedThreadPool.new(Hutch::Config.get(:worker_pool_size))
  @timer_worker   = Concurrent::TimerTask.execute(execution_interval: Hutch::Config.get(:poller_interval)) do
    heartbeat_connection
    retry_buffer_queue
  end
  
  # The queue size maybe the same as channel[prefetch] and every Consumer have it's own buffer queue with the same prefetch size,
  # when the buffer queue have the prefetch size message rabbitmq will stop push message to this consumer but it's ok.
  # The consumer will threshold by the shared redis instace.
  @buffer_queue = ::Queue.new
  @batch_size   = Hutch::Config.get(:poller_batch_size)
  @connected    = Hutch.connected?
end

Instance Method Details

#handle_message_with_limits(consumer, delivery_info, properties, payload) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/hutch/patch/worker.rb', line 53

def handle_message_with_limits(consumer, delivery_info, properties, payload)
  # 1. consumer.limit?
  # 2. yes: make and ConsumerMsg to queue
  # 3. no: post handle
  @message_worker.post do
    if consumer.ratelimit_exceeded?
      @buffer_queue.push(ConsumerMsg.new(consumer, delivery_info, properties, payload))
    else
      # if Hutch disconnect skip do work let message timeout in rabbitmq waiting message push again
      return unless @connected
      consumer.ratelimit_add
      handle_message(consumer, delivery_info, properties, payload)
    end
  end
end

#heartbeat_connectionObject

心跳检查 Hutch 的连接



70
71
72
# File 'lib/hutch/patch/worker.rb', line 70

def heartbeat_connection
  @connected = Hutch.connected?
end

#peakObject

non-blocking pop message, if empty return nil. other error raise exception



84
85
86
87
88
# File 'lib/hutch/patch/worker.rb', line 84

def peak
  @buffer_queue.pop(true)
rescue ThreadError => e
  nil if e.to_s == "queue empty"
end

#retry_buffer_queueObject

每隔一段时间, 从 buffer queue 中转移任务到执行



75
76
77
78
79
80
81
# File 'lib/hutch/patch/worker.rb', line 75

def retry_buffer_queue
  @batch_size.times do
    cmsg = peak
    return if cmsg.blank?
    handle_message_with_limits(cmsg.consumer, cmsg.delivery_info, cmsg.properties, cmsg.payload)
  end
end

#setup_queue(consumer) ⇒ Object

Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/hutch/patch/worker.rb', line 41

def setup_queue(consumer)
  logger.info "setting up queue: #{consumer.get_queue_name}"
  
  queue = @broker.queue(consumer.get_queue_name, consumer.get_arguments)
  @broker.bind_queue(queue, consumer.routing_keys)
  
  queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args|
    delivery_info, properties, payload = Hutch::Adapter.decode_message(*args)
    handle_message_with_limits(consumer, delivery_info, properties, payload)
  end
end

#stopObject

Stop a running worker by killing all subscriber threads. Stop two thread pool



33
34
35
36
37
# File 'lib/hutch/patch/worker.rb', line 33

def stop
  @timer_worker.shutdown
  @message_worker.shutdown
  @broker.stop
end