Module: Quandl::Command::Task::Threading

Extended by:
ActiveSupport::Concern
Included in:
Quandl::Command::Task
Defined in:
lib/quandl/command/task/threading.rb

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#await_thread_pool_lock!(key) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/quandl/command/task/threading.rb', line 59

def await_thread_pool_lock!(key)
  got_lock = false
  # try to get a lock
  mutex.synchronize do
    unless thread_pool_locked?(key)
      obtain_thread_pool_lock(key)
      got_lock = true
    end
  end
  # try again unless lock was obtained
  if !got_lock
    sleep(0.05)
    await_thread_pool_lock!(key)
  end
end

#background_job(*args, &block) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/quandl/command/task/threading.rb', line 34

def background_job(*args, &block)
  # dont execute jobs when shutdown signaled
  return false if exiting?
  # options
  opts = args.extract_options!.symbolize_keys!
  key = opts[:lock]
  # without threads process in foreground
  return block.call unless threads?
  # process with pool
  job = thread_pool.process do
    # wait for lock
    await_thread_pool_lock!(key) if key.present?
    # if this key is locked
    block.call
    # unlock
    release_thread_pool_lock(key) if key.present?
  end
  # onwards
  job
end

#exiting?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/quandl/command/task/threading.rb', line 103

def exiting?
  threads? && thread_pool.shutdown?
end

#mutexObject



55
56
57
# File 'lib/quandl/command/task/threading.rb', line 55

def mutex
  @mutex ||= Mutex.new
end

#obtain_thread_pool_lock(key) ⇒ Object



79
80
81
# File 'lib/quandl/command/task/threading.rb', line 79

def obtain_thread_pool_lock(key)
  thread_pool_locks[key] = true
end

#release_thread_pool_lock(key) ⇒ Object



83
84
85
# File 'lib/quandl/command/task/threading.rb', line 83

def release_thread_pool_lock(key)
  thread_pool_locks.delete(key)
end

#shutdown_thread_pool_on_sigintObject



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/quandl/command/task/threading.rb', line 20

def shutdown_thread_pool_on_sigint
  debug("starting up with #{threads} threads")
  return unless threads?
  trap('SIGINT') do
    debug "exit signal received"
    unless thread_pool.shutdown?
      debug "waiting for executing jobs to finish"
      thread_pool.shutdown
    end
    debug "exiting now"
    exit
  end
end

#thread_poolObject



75
76
77
# File 'lib/quandl/command/task/threading.rb', line 75

def thread_pool
  @thread_pool ||= Thread.pool( threads )
end

#thread_pool_locked?(key) ⇒ Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/quandl/command/task/threading.rb', line 87

def thread_pool_locked?(key)
  thread_pool_locks[key] == true
end

#thread_pool_locksObject



91
92
93
# File 'lib/quandl/command/task/threading.rb', line 91

def thread_pool_locks
  @thread_pool_locks ||= {}
end

#threadsObject



99
100
101
# File 'lib/quandl/command/task/threading.rb', line 99

def threads
  @threads ||= options.threads.present? ? options.threads.to_i : 10
end

#threads?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/quandl/command/task/threading.rb', line 95

def threads?
  threads > 1
end