Class: Threads

Inherits:
Object
  • Object
show all
Defined in:
lib/threads.rb

Overview

Threads.

This class may help you test your code for thread-safety by running it multiple times in a few parallel threads:

require 'threads'
Threads.new(5).assert do |i|
  puts "Hello from the thread no.#{i}"
end

Here, the message will be printed to the console five times, once in every thread.

Author

Yegor Bugayenko ([email protected])

Copyright

Copyright © 2018-2025 Yegor Bugayenko

License

MIT

Instance Method Summary collapse

Constructor Details

#initialize(total = Concurrent.processor_count * 8, log: $stdout, task_timeout: 10, shutdown_timeout: 30) ⇒ Threads

Constructor.

Parameters:

  • total (Number) (defaults to: Concurrent.processor_count * 8)

    How many threads to run

  • log (Logger) (defaults to: $stdout)

    Where to print output

  • task_timeout (Number) (defaults to: 10)

    How many seconds to wait for all tasks to finish

  • shutdown_timeout (Number) (defaults to: 30)

    How many seconds to wait for the thread pool to shut down



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/threads.rb', line 32

def initialize(total = Concurrent.processor_count * 8, log: $stdout, task_timeout: 10, shutdown_timeout: 30)
  raise "Total can't be nil" if total.nil?
  raise "Total can't be negative or zero: #{total}" unless total.positive?
  @total = total
  raise "Log can't be nil" if log.nil?
  @log = log
  validate('task_timeout', task_timeout)
  @task_timeout = task_timeout
  validate('shutdown_timeout', shutdown_timeout)
  @shutdown_timeout = shutdown_timeout
end

Instance Method Details

#assert(reps = @total, task_timeout: @task_timeout, shutdown_timeout: @shutdown_timeout) ⇒ Object

Run them all and assert that all of them finished successfully.

For example, test a counter for thread safety:

counter = 0
mutex = Mutex.new
Threads.new(10).assert do |t|
  mutex.synchronize { counter += 1 }
end
puts "Final counter value: #{counter}"

You can also access the repetition index:

Threads.new(4).assert(100) do |t, r|
  puts "Thread #{t}, repetition #{r}"
end

Parameters:

  • reps (Number) (defaults to: @total)

    How many times to repeat the testing cycle

  • task_timeout (Number) (defaults to: @task_timeout)

    How many seconds to wait for all tasks to finish

  • shutdown_timeout (Number) (defaults to: @shutdown_timeout)

    How many seconds to wait for the thread pool to shut down

Returns:

  • nil



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/threads.rb', line 73

def assert(reps = @total, task_timeout: @task_timeout, shutdown_timeout: @shutdown_timeout)
  raise "Repetition counter #{reps} can't be smaller than #{@total}" if reps < @total
  validate('task_timeout', task_timeout)
  validate('shutdown_timeout', shutdown_timeout)
  done = Concurrent::AtomicFixnum.new
  rep = Concurrent::AtomicFixnum.new
  pool = Concurrent::FixedThreadPool.new(@total)
  latch = Concurrent::CountDownLatch.new(1)
  finish = Concurrent::CountDownLatch.new(@total)
  @total.times do |t|
    pool.post do
      Thread.current.name = "assert-thread-#{t}"
      latch.wait(10)
      begin
        loop do
          r = rep.increment
          break if r > reps
          begin
            yield(t, r - 1)
          rescue StandardError => e
            print(Backtrace.new(e))
            raise e
          end
        end
        done.increment
      ensure
        finish.count_down
      end
    end
  end
  latch.count_down
  finish.wait(task_timeout)
  pool.shutdown
  raise "Can't stop the pool" unless pool.wait_for_termination(shutdown_timeout)
  return if done.value == @total
  raise "Only #{done.value} out of #{@total} threads completed successfully"
end

#validate(label, timeout) ⇒ Object

Validate a timeout value.

Parameters:

  • label (String)

    Label to use in the error message

  • timeout (Number)

    Timeout value to validate



47
48
49
50
# File 'lib/threads.rb', line 47

def validate(label, timeout)
  raise "#{label} can't be nil" if timeout.nil?
  raise "#{label} can't be negative or zero: #{timeout}" unless timeout.positive?
end