Class: Sbmt::Outbox::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sbmt/outbox/worker.rb

Defined Under Namespace

Classes: Job

Instance Method Summary collapse

Constructor Details

#initialize(boxes:, concurrency: 10) ⇒ Worker

Returns a new instance of Worker.



33
34
35
36
37
38
39
40
# File 'lib/sbmt/outbox/worker.rb', line 33

def initialize(boxes:, concurrency: 10)
  self.queue = Queue.new
  build_jobs(boxes).each { |job| queue << job }
  self.thread_pool = ThreadPool.new { queue.pop }
  self.concurrency = [concurrency, queue.size].min
  self.thread_workers = {}
  init_redis
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


84
85
86
87
88
89
90
91
# File 'lib/sbmt/outbox/worker.rb', line 84

def alive?
  return false unless started

  deadline = Time.current - general_timeout
  thread_workers.all? do |_worker_number, time|
    deadline < time
  end
end

#ready?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/sbmt/outbox/worker.rb', line 80

def ready?
  started && thread_workers.any?
end

#startObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/sbmt/outbox/worker.rb', line 42

def start
  raise "Outbox is already started" if started
  self.started = true
  self.thread_workers = {}

  thread_pool.start(concurrency: concurrency) do |worker_number, job|
    touch_thread_worker!
    result = ThreadPool::PROCESSED
    logger.with_tags(**job.log_tags.merge(worker: worker_number)) do
      lock_manager.lock("#{job.resource_path}:lock", general_timeout * 1000) do |locked|
        labels = job.yabeda_labels.merge(worker_number: worker_number)

        if locked
          job_execution_runtime.measure(labels) do
            ::Rails.application.executor.wrap do
              safe_process_job(job, worker_number, labels)
            end
          end
        else
          result = ThreadPool::SKIPPED
          logger.log_info("Skip processing already locked #{job.resource_key}")
        end

        job_counter.increment(labels.merge(state: locked ? "processed" : "skipped"), by: 1)
      end
    end

    result
  ensure
    queue << job
  end
rescue => e
  Outbox.error_tracker.error(e)
  raise
ensure
  self.started = false
end