Class: Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/worker.rb

Defined Under Namespace

Classes: Ctx, Defer

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, &block) ⇒ Worker



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/worker.rb', line 30

def initialize(opts={}, &block)
  @in = Queue.new
  @out = Queue.new
  @block = block
  @ctx = Worker::Ctx.new
  @defers = Queue.new

  @retries = 0
  @opts = opts
  run!
end

Instance Method Details

#joinObject



79
80
81
82
83
84
# File 'lib/worker.rb', line 79

def join
  loop do
    break if @defers.size == 0
    sleep 0.1
  end
end

#perform(*args) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/worker.rb', line 42

def perform(*args)
  @in.push args

  ret = @out.pop
  if ret.is_a? Exception
    raise ret
  else
    ret
  end
rescue Exception => ex
  backoff = @opts.dig(:backoff) || 0.1
  backoff_max = @opts.dig(:backoff_max)
  retries_max = @opts.dig(:retry) || 0

  if @retries == retries_max
    @retries = 0
    return if @opts.dig(:raise) == false
    raise ex
  end
  @retries += 1
  sleeping = @retries * backoff
  sleeping = backoff_max if backoff_max && sleeping > backoff_max

  sleep sleeping
  retry
end

#perform_async(*args) ⇒ Object



69
70
71
72
73
74
75
76
77
# File 'lib/worker.rb', line 69

def perform_async(*args)
  defer = Defer.new do
    ret = perform(*args)
    @defers.pop
    ret
  end
  @defers.push defer
  defer
end

#run!Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/worker.rb', line 86

def run!
  @thread = Thread.new do
    loop do
      ret = @ctx.instance_exec *@in.pop, &@block
      @out.push ret
    rescue Exception => ex
      @out.push ex
    end
  end
  self
end

#stop!Object



98
99
100
101
# File 'lib/worker.rb', line 98

def stop!
  @thread&.kill
  self
end