Module: Quandl::Command::Task::Threading
- Extended by:
- ActiveSupport::Concern
- Included in:
- Quandl::Command::Task
- Defined in:
- lib/quandl/command/task/threading.rb
Defined Under Namespace
Modules: ClassMethods
Instance Method Summary collapse
- #await_thread_pool_lock!(key) ⇒ Object
- #background_job(*args, &block) ⇒ Object
- #exiting? ⇒ Boolean
- #mutex ⇒ Object
- #obtain_thread_pool_lock(key) ⇒ Object
- #release_thread_pool_lock(key) ⇒ Object
- #shutdown_thread_pool_on_sigint ⇒ Object
- #thread_pool ⇒ Object
- #thread_pool_locked?(key) ⇒ Boolean
- #thread_pool_locks ⇒ Object
- #threads ⇒ Object
- #threads? ⇒ Boolean
Instance Method Details
#await_thread_pool_lock!(key) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/quandl/command/task/threading.rb', line 59 def await_thread_pool_lock!(key) got_lock = false # try to get a lock mutex.synchronize do unless thread_pool_locked?(key) obtain_thread_pool_lock(key) got_lock = true end end # try again unless lock was obtained if !got_lock sleep(0.05) await_thread_pool_lock!(key) end end |
#background_job(*args, &block) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/quandl/command/task/threading.rb', line 34 def background_job(*args, &block) # dont execute jobs when shutdown signaled return false if exiting? # options opts = args..symbolize_keys! key = opts[:lock] # without threads process in foreground return block.call unless threads? # process with pool job = thread_pool.process do # wait for lock await_thread_pool_lock!(key) if key.present? # if this key is locked block.call # unlock release_thread_pool_lock(key) if key.present? end # onwards job end |
#exiting? ⇒ Boolean
103 104 105 |
# File 'lib/quandl/command/task/threading.rb', line 103 def exiting? threads? && thread_pool.shutdown? end |
#mutex ⇒ Object
55 56 57 |
# File 'lib/quandl/command/task/threading.rb', line 55 def mutex @mutex ||= Mutex.new end |
#obtain_thread_pool_lock(key) ⇒ Object
79 80 81 |
# File 'lib/quandl/command/task/threading.rb', line 79 def obtain_thread_pool_lock(key) thread_pool_locks[key] = true end |
#release_thread_pool_lock(key) ⇒ Object
83 84 85 |
# File 'lib/quandl/command/task/threading.rb', line 83 def release_thread_pool_lock(key) thread_pool_locks.delete(key) end |
#shutdown_thread_pool_on_sigint ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/quandl/command/task/threading.rb', line 20 def shutdown_thread_pool_on_sigint debug("starting up with #{threads} threads") return unless threads? trap('SIGINT') do debug "exit signal received" unless thread_pool.shutdown? debug "waiting for executing jobs to finish" thread_pool.shutdown end debug "exiting now" exit end end |
#thread_pool ⇒ Object
75 76 77 |
# File 'lib/quandl/command/task/threading.rb', line 75 def thread_pool @thread_pool ||= Thread.pool( threads ) end |
#thread_pool_locked?(key) ⇒ Boolean
87 88 89 |
# File 'lib/quandl/command/task/threading.rb', line 87 def thread_pool_locked?(key) thread_pool_locks[key] == true end |
#thread_pool_locks ⇒ Object
91 92 93 |
# File 'lib/quandl/command/task/threading.rb', line 91 def thread_pool_locks @thread_pool_locks ||= {} end |
#threads ⇒ Object
99 100 101 |
# File 'lib/quandl/command/task/threading.rb', line 99 def threads @threads ||= .threads.present? ? .threads.to_i : 10 end |
#threads? ⇒ Boolean
95 96 97 |
# File 'lib/quandl/command/task/threading.rb', line 95 def threads? threads > 1 end |