Class: Qyu::Workers::Base

Inherits:
Object
  • Object
show all
Includes:
Concerns::Callback, Concerns::FailureQueue, Concerns::PayloadValidator, Concerns::Timeout
Defined in:
lib/qyu/workers/base.rb

Overview

Qyu::Workers::Base A Worker is sitting on a queue, waiting for something.

Qyu::Worker#work(queue_name)

Worker lifecycle:

- Start an infinte loop:
      while (true)
- Fetch a message (Task) from its queue:
      t = Task.fetch(queue_name)
- Check the completion:
      if t.completed? t.acknowledge_message
- Lock it:
      t.lock! && t.mark_working
- Works: yield(t)
- Create the next steps/tasks:
      t.job.create_next_tasks(t, t.job.payload (...))
- Finish:
      t.unlock! && t.mark_finished && t.acknowledge_message

Direct Known Subclasses

Split, Sync

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Concerns::Timeout

#timeout

Methods included from Concerns::PayloadValidator

#validate_payload!, #validates

Methods included from Concerns::FailureQueue

#failure_queue

Methods included from Concerns::Callback

#callback, #run_callbacks

Constructor Details

#initialize(&block) ⇒ Base


32
33
34
35
36
# File 'lib/qyu/workers/base.rb', line 32

def initialize(&block)
  @id = Qyu::Utils.uuid
  @processed_tasks = 0
  instance_exec(&block) if block_given?
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id


29
30
31
# File 'lib/qyu/workers/base.rb', line 29

def id
  @id
end

#processed_tasksObject

Returns the value of attribute processed_tasks


30
31
32
# File 'lib/qyu/workers/base.rb', line 30

def processed_tasks
  @processed_tasks
end

Instance Method Details

#work(queue_name, blocking: true) ⇒ Object


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/qyu/workers/base.rb', line 38

def work(queue_name, blocking: true)
  log(:info, "worker started for queue '#{queue_name}'")
  repeat = true

  remaining_fetch_retries = 3

  while repeat
    run_callbacks(:execute) do
      begin
        fetched_task = fetch_task(queue_name)
        validate_payload!(fetched_task)
        log(:info, "worker processed #{processed_tasks} tasks from queue `#{queue_name}`")
        if fetched_task.acknowledgeable?
          discard_completed_task(fetched_task)
        elsif fetched_task.lock!
          fetched_task.mark_working
          begin
            Timeout::timeout(@timeout) do
              yield(fetched_task)
            end
            conclude_task(fetched_task)
          rescue => ex
            fail_task(fetched_task, ex)
          end
        end
      rescue Qyu::Errors::UnsyncError
      rescue Qyu::Errors::CouldNotFetchTask => ex
        if remaining_fetch_retries <= 0
          acknowledge_message_with_task_id_not_found_in_store(ex)
        else
          sleep(remaining_fetch_retries)
          remaining_fetch_retries -= 1
          retry
        end
      rescue Qyu::Errors::PayloadValidationError => ex
        log("invalid payload: #{ex.class}: #{ex.message}")
        fetched_task.mark_invalid_payload
      rescue => ex
        log("worker error: #{ex.class}: #{ex.message}")
        log("backtrace: #{ex.backtrace.join("\n")}")
      end
    end

    repeat = blocking
    run_garbage_collector
  end
end