Class: Hutch::ConsumerMsg

Inherits:
Object
  • Object
show all
Defined in:
lib/hutch/patch/worker.rb

Overview

Consumer Message wrap Hutch::Message and Consumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(consumer, message) ⇒ ConsumerMsg

Returns a new instance of ConsumerMsg.



165
166
167
168
# File 'lib/hutch/patch/worker.rb', line 165

def initialize(consumer, message)
  @consumer = consumer
  @message  = message
end

Instance Attribute Details

#consumerObject (readonly)

Returns the value of attribute consumer.



159
160
161
# File 'lib/hutch/patch/worker.rb', line 159

def consumer
  @consumer
end

#messageObject (readonly)

Returns the value of attribute message.



159
160
161
# File 'lib/hutch/patch/worker.rb', line 159

def message
  @message
end

Instance Method Details

#enqueue_in_or_notObject

if delays > 10s then let the message to rabbitmq to delay and enqueue again instead of rabbitmq reqneue



179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/hutch/patch/worker.rb', line 179

def enqueue_in_or_not
  # interval 小于 5s, 的则不会传, 在自己的 buffer 中等待
  return false if interval < Hutch::Config.get(:worker_buffer_flush_interval)
  # 等待时间过长的消息, 交给远端的 rabbitmq 去进行等待, 不占用 buffer 空间
  # 如果数据量特别大, 但 ratelimit 特别严格, 那么也会变为固定周期的积压, 需要增加对执行次数的记录以及延长
  # 市场 30s 执行一次的任务, 积累了 200 个, 那么这个积压会越来越多, 直到保持到一个 RabbitMQ 与 hutch 之间的最长等待周期, 会一直空转
  #  - 要么增加对执行次数的考虑, 拉长延长. 但最终会有一个最长的延长 10800 (3h), 这个问题最终仍然会存在
  #  - 设置延长多长之后, 就舍弃这个任务, 因为由于 ratelimit 的存在, 但又持续的积压, 不可能处理完这个任务
  # 这个方案没有很好的解决方法, 这是一个典型的 "生产速度大于消费速度" 的问题, 如果长时间的 生产 > 消费, 这个问题是无解的
  Hutch.broker.ack(message.delivery_info.delivery_tag)
  # TODO: 如果存在 x-death 的 count 需要额外考虑, 解决与 error retry 的 x-death 复用的问题
  # 临时给一个随机的 1,2 倍率的延迟, 大概率为 1 倍,小概率为 2 倍
  consumer.enqueue_in(interval * [rand(3), 1].max, message.body, message.properties.to_hash)
end

#handle_cmsg_argsObject



170
171
172
# File 'lib/hutch/patch/worker.rb', line 170

def handle_cmsg_args
  [consumer, message.delivery_info, message.properties, message.payload, message]
end

#intervalObject



174
175
176
# File 'lib/hutch/patch/worker.rb', line 174

def interval
  @interval ||= consumer.interval(message)
end

#loggerObject



161
162
163
# File 'lib/hutch/patch/worker.rb', line 161

def logger
  Hutch::Logging.logger
end