Class: Hutch::ConsumerMsg
- Inherits:
-
Object
- Object
- Hutch::ConsumerMsg
- Defined in:
- lib/hutch/patch/worker.rb
Overview
Consumer Message wrap Hutch::Message and Consumer
Instance Attribute Summary collapse
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#message ⇒ Object
readonly
Returns the value of attribute message.
Instance Method Summary collapse
-
#enqueue_in_or_not ⇒ Object
if delays > 10s then let the message to rabbitmq to delay and enqueue again instead of rabbitmq reqneue.
- #handle_cmsg_args ⇒ Object
-
#initialize(consumer, message) ⇒ ConsumerMsg
constructor
A new instance of ConsumerMsg.
- #interval ⇒ Object
- #logger ⇒ Object
Constructor Details
#initialize(consumer, message) ⇒ ConsumerMsg
Returns a new instance of ConsumerMsg.
165 166 167 168 |
# File 'lib/hutch/patch/worker.rb', line 165 def initialize(consumer, ) @consumer = consumer @message = end |
Instance Attribute Details
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
159 160 161 |
# File 'lib/hutch/patch/worker.rb', line 159 def consumer @consumer end |
#message ⇒ Object (readonly)
Returns the value of attribute message.
159 160 161 |
# File 'lib/hutch/patch/worker.rb', line 159 def @message end |
Instance Method Details
#enqueue_in_or_not ⇒ Object
if delays > 10s then let the message to rabbitmq to delay and enqueue again instead of rabbitmq reqneue
179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/hutch/patch/worker.rb', line 179 def enqueue_in_or_not # interval 小于 5s, 的则不会传, 在自己的 buffer 中等待 return false if interval < Hutch::Config.get(:worker_buffer_flush_interval) # 等待时间过长的消息, 交给远端的 rabbitmq 去进行等待, 不占用 buffer 空间 # 如果数据量特别大, 但 ratelimit 特别严格, 那么也会变为固定周期的积压, 需要增加对执行次数的记录以及延长 # 市场 30s 执行一次的任务, 积累了 200 个, 那么这个积压会越来越多, 直到保持到一个 RabbitMQ 与 hutch 之间的最长等待周期, 会一直空转 # - 要么增加对执行次数的考虑, 拉长延长. 但最终会有一个最长的延长 10800 (3h), 这个问题最终仍然会存在 # - 设置延长多长之后, 就舍弃这个任务, 因为由于 ratelimit 的存在, 但又持续的积压, 不可能处理完这个任务 # 这个方案没有很好的解决方法, 这是一个典型的 "生产速度大于消费速度" 的问题, 如果长时间的 生产 > 消费, 这个问题是无解的 Hutch.broker.ack(.delivery_info.delivery_tag) # TODO: 如果存在 x-death 的 count 需要额外考虑, 解决与 error retry 的 x-death 复用的问题 # 临时给一个随机的 1,2 倍率的延迟, 大概率为 1 倍,小概率为 2 倍 consumer.enqueue_in(interval * [rand(3), 1].max, .body, .properties.to_hash) end |
#handle_cmsg_args ⇒ Object
170 171 172 |
# File 'lib/hutch/patch/worker.rb', line 170 def handle_cmsg_args [consumer, .delivery_info, .properties, .payload, ] end |
#interval ⇒ Object
174 175 176 |
# File 'lib/hutch/patch/worker.rb', line 174 def interval @interval ||= consumer.interval() end |
#logger ⇒ Object
161 162 163 |
# File 'lib/hutch/patch/worker.rb', line 161 def logger Hutch::Logging.logger end |