Class: Sbmt::Outbox::V2::Processor

Inherits:
BoxProcessor show all
Defined in:
lib/sbmt/outbox/v2/processor.rb

Constant Summary collapse

REDIS_BRPOP_MIN_DELAY =
0.1

Instance Attribute Summary collapse

Attributes inherited from BoxProcessor

#started, #threads_count, #worker_name

Instance Method Summary collapse

Methods inherited from BoxProcessor

#alive?, #ready?, #safe_process_task, #start, #stop, #throttle

Constructor Details

#initialize(boxes, threads_count: nil, lock_timeout: nil, cache_ttl: nil, cutoff_timeout: nil, brpop_delay: nil, redis: nil) ⇒ Processor

Returns a new instance of Processor.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/sbmt/outbox/v2/processor.rb', line 17

def initialize(
  boxes,
  threads_count: nil,
  lock_timeout: nil,
  cache_ttl: nil,
  cutoff_timeout: nil,
  brpop_delay: nil,
  redis: nil
)
  @lock_timeout = lock_timeout || processor_config.general_timeout
  @cache_ttl = cache_ttl || @lock_timeout * 10
  @cutoff_timeout = cutoff_timeout || processor_config.cutoff_timeout
  @brpop_delay = brpop_delay || redis_brpop_delay(boxes.count, processor_config.brpop_delay)
  @redis = redis

  super(boxes: boxes, threads_count: threads_count || processor_config.threads_count, name: "processor", redis: redis)
end

Instance Attribute Details

#brpop_delayObject (readonly)

Returns the value of attribute brpop_delay.



13
14
15
# File 'lib/sbmt/outbox/v2/processor.rb', line 13

def brpop_delay
  @brpop_delay
end

#cache_ttlObject (readonly)

Returns the value of attribute cache_ttl.



13
14
15
# File 'lib/sbmt/outbox/v2/processor.rb', line 13

def cache_ttl
  @cache_ttl
end

#cutoff_timeoutObject (readonly)

Returns the value of attribute cutoff_timeout.



13
14
15
# File 'lib/sbmt/outbox/v2/processor.rb', line 13

def cutoff_timeout
  @cutoff_timeout
end

#lock_timeoutObject (readonly)

Returns the value of attribute lock_timeout.



13
14
15
# File 'lib/sbmt/outbox/v2/processor.rb', line 13

def lock_timeout
  @lock_timeout
end

Instance Method Details

#process_task(_worker_number, task) ⇒ Object



35
36
37
38
# File 'lib/sbmt/outbox/v2/processor.rb', line 35

def process_task(_worker_number, task)
  middlewares = Middleware::Builder.new(batch_process_middlewares)
  middlewares.call(task) { process(task) }
end