Class: ActiveConcurrency::Base::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/active_concurrency/base/worker.rb

Direct Known Subclasses

Processes::Worker, Threads::Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name: nil) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
# File 'lib/active_concurrency/base/worker.rb', line 12

def initialize(name: nil)
  @name = "#{prefix}_worker_#{name || SecureRandom.uuid}"
  @queue = Queue.new
end

Instance Attribute Details

#mutexObject

Returns the value of attribute mutex.



10
11
12
# File 'lib/active_concurrency/base/worker.rb', line 10

def mutex
  @mutex
end

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/active_concurrency/base/worker.rb', line 9

def name
  @name
end

Instance Method Details

#clearObject



17
18
19
# File 'lib/active_concurrency/base/worker.rb', line 17

def clear
  @queue.clear
end

#closeObject



21
22
23
# File 'lib/active_concurrency/base/worker.rb', line 21

def close
  @queue.close
end

#closed?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/active_concurrency/base/worker.rb', line 25

def closed?
  @queue.closed?
end

#empty?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/active_concurrency/base/worker.rb', line 29

def empty?
  @queue.empty?
end

#exitObject



33
34
35
# File 'lib/active_concurrency/base/worker.rb', line 33

def exit
  schedule { throw :exit }
end

#lockObject



37
38
39
40
41
# File 'lib/active_concurrency/base/worker.rb', line 37

def lock
  return true if process? || mutex.nil? || mutex.locked?

  mutex.lock
end

#schedule(*args, &block) ⇒ Object



43
44
45
# File 'lib/active_concurrency/base/worker.rb', line 43

def schedule(*args, &block)
  @queue << [block, args]
end

#shutdownObject

rubocop:disable Lint/UnreachableCode



48
49
50
51
52
53
54
# File 'lib/active_concurrency/base/worker.rb', line 48

def shutdown
  exit
  lock
  join
  close
  exit!
end

#sizeObject

rubocop:enable Lint/UnreachableCode



57
58
59
# File 'lib/active_concurrency/base/worker.rb', line 57

def size
  @queue.size
end