Class: Qyu::Workers::Base

Inherits:
Object
  • Object
show all
Includes:
Concerns::Callback, Concerns::FailureQueue, Concerns::PayloadValidator
Defined in:
lib/qyu/workers/base.rb

Direct Known Subclasses

Sync

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Concerns::FailureQueue

#failure_queue

Methods included from Concerns::PayloadValidator

#validate_payload!, #validates

Methods included from Concerns::Callback

#callback, #run_callbacks

Constructor Details

#initialize(&block) ⇒ Base

Returns a new instance of Base.



13
14
15
16
17
# File 'lib/qyu/workers/base.rb', line 13

def initialize(&block)
  @id = Qyu::Utils.uuid
  @processed_tasks = 0
  instance_exec(&block) if block_given?
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



10
11
12
# File 'lib/qyu/workers/base.rb', line 10

def id
  @id
end

#processed_tasksObject

Returns the value of attribute processed_tasks.



11
12
13
# File 'lib/qyu/workers/base.rb', line 11

def processed_tasks
  @processed_tasks
end

Instance Method Details

#work(queue_name, blocking: true) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/qyu/workers/base.rb', line 19

def work(queue_name, blocking: true)
  log(:info, "Worker started for queue '#{queue_name}'")
  repeat = true

  remaining_fetch_retries = 3

  while repeat
    run_callbacks(:execute) do
      begin
        fetched_task = fetch_task(queue_name)
        validate_payload!(fetched_task)
        log(:info, "Worker processed #{processed_tasks} tasks from queue `#{queue_name}`")
        if fetched_task.acknowledgeable?
          discard_completed_task(fetched_task)
        elsif fetched_task.lock!
          fetched_task.mark_working
          begin
            yield(fetched_task)
            conclude_task(fetched_task)
          rescue => ex
            fail_task(fetched_task, ex)
          end
        end
      rescue Qyu::Errors::UnsyncError
      rescue Qyu::Errors::CouldNotFetchTask => ex
        if remaining_fetch_retries <= 0
          acknowledge_message_with_task_id_not_found_in_store(ex)
        else
          sleep(remaining_fetch_retries)
          remaining_fetch_retries -= 1
          retry
        end
      rescue Qyu::Errors::PayloadValidationError
        fetched_task.mark_invalid_payload
      rescue => ex
        log("Worker error: #{ex.class}: #{ex.message}")
        log("Backtrace: #{ex.backtrace.join("\n")}")
      end
    end

    repeat = blocking
    run_garbage_collector
  end
end