Class: RedisClient::Cluster::ConcurrentWorker::Group

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_client/cluster/concurrent_worker.rb

Defined Under Namespace

Classes: Task

Instance Method Summary collapse

Constructor Details

#initialize(worker:, queue:, size:) ⇒ Group

Returns a new instance of Group.



33
34
35
36
37
38
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 33

def initialize(worker:, queue:, size:)
  @worker = worker
  @queue = queue
  @size = size
  @count = 0
end

Instance Method Details

#closeObject



60
61
62
63
64
65
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 60

def close
  @queue.clear
  @queue.close if @queue.respond_to?(:close)
  @count = 0
  nil
end

#eachObject



49
50
51
52
53
54
55
56
57
58
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 49

def each
  raise InvalidNumberOfTasks, "expected: #{@size}, actual: #{@count}" if @count != @size

  @size.times do
    task = @queue.pop
    yield(task.id, task.result)
  end

  nil
end

#inspectObject



67
68
69
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 67

def inspect
  "#<#{self.class.name} size: #{@count}, max: #{@size}, worker: #{@worker.class.name}>"
end

#push(id, *args, **kwargs, &block) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/redis_client/cluster/concurrent_worker.rb', line 40

def push(id, *args, **kwargs, &block)
  raise InvalidNumberOfTasks, "max size reached: #{@count}" if @count == @size

  task = Task.new(id: id, queue: @queue, args: args, kwargs: kwargs, block: block)
  @worker.push(task)
  @count += 1
  nil
end