Module: Fluent::PluginHelper::Thread

Included in:
ChildProcess, EventLoop
Defined in:
lib/fluent/plugin_helper/thread.rb

Constant Summary collapse

THREAD_DEFAULT_WAIT_SECONDS =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#_threadsObject (readonly)

stop : mark callback thread as stopped shutdown : [-] close : correct stopped threads terminate: kill all threads



27
28
29
# File 'lib/fluent/plugin_helper/thread.rb', line 27

def _threads
  @_threads
end

Instance Method Details

#after_shutdownObject



127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin_helper/thread.rb', line 127

def after_shutdown
  super
  wakeup_threads = []
  @_threads_mutex.synchronize do
    @_threads.each_value do |thread|
      wakeup_threads << thread if thread.alive? && thread.status == "sleep"
    end
  end
  wakeup_threads.each do |thread|
    thread.wakeup if thread.alive?
  end
end

#closeObject



140
141
142
143
144
145
146
147
148
149
# File 'lib/fluent/plugin_helper/thread.rb', line 140

def close
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    if !thread || thread.join(@_thread_wait_seconds)
      @_threads_mutex.synchronize{ @_threads.delete(obj_id) }
    end
  end

  super
end

#initializeObject



104
105
106
107
108
109
# File 'lib/fluent/plugin_helper/thread.rb', line 104

def initialize
  super
  @_threads_mutex = Mutex.new
  @_threads = {}
  @_thread_wait_seconds = THREAD_DEFAULT_WAIT_SECONDS
end

#stopObject



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/fluent/plugin_helper/thread.rb', line 111

def stop
  super
  wakeup_threads = []
  @_threads_mutex.synchronize do
    @_threads.each_value do |thread|
      thread[:_fluentd_plugin_helper_thread_running] = false
      wakeup_threads << thread if thread.alive? && thread.status == "sleep"
    end
  end
  wakeup_threads.each do |thread|
    if thread.alive?
      thread.wakeup
    end
  end
end

#terminateObject



151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fluent/plugin_helper/thread.rb', line 151

def terminate
  super
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    log.warn "killing existing thread", thread: thread
    thread.kill if thread
  end
  @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
    thread = @_threads[obj_id]
    thread.join
    @_threads_mutex.synchronize{ @_threads.delete(obj_id) }
  end
  @_thread_wait_seconds = nil
end

#thread_create(title) ⇒ Object

Ruby 2.2.3 or earlier (and all 2.1.x) cause bug about Threading (“Stack consistency error”)

by passing splatted argument to `yield`

bugs.ruby-lang.org/issues/11027 We can enable to pass arguments after expire of Ruby 2.1 (& older 2.2.x) def thread_create(title, *args)

Thread.new(*args) do |*t_args|
  yield *t_args

Raises:

  • (ArgumentError)


53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin_helper/thread.rb', line 53

def thread_create(title)
  raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
  raise ArgumentError, "BUG: callback not specified" unless block_given?
  m = Mutex.new
  m.lock
  thread = ::Thread.new do
    m.lock # run thread after that thread is successfully set into @_threads
    m.unlock
    thread_exit = false
    ::Thread.current[:_fluentd_plugin_helper_thread_title] = title
    ::Thread.current[:_fluentd_plugin_helper_thread_started] = true
    ::Thread.current[:_fluentd_plugin_helper_thread_running] = true
    begin
      yield
      thread_exit = true
    rescue Exception => e
      log.warn "thread exited by unexpected error", plugin: self.class, title: title, error: e
      thread_exit = true
      raise
    ensure
      if ::Thread.current.alive? && !thread_exit
        log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
      end
      @_threads_mutex.synchronize do
        @_threads.delete(::Thread.current.object_id)
      end
      ::Thread.current[:_fluentd_plugin_helper_thread_running] = false
    end
  end
  thread.abort_on_exception = true
  @_threads_mutex.synchronize do
    @_threads[thread.object_id] = thread
  end
  m.unlock
  thread
end

#thread_current_running?Boolean



29
30
31
32
# File 'lib/fluent/plugin_helper/thread.rb', line 29

def thread_current_running?
  # checker for code in callback of thread_create
  ::Thread.current[:_fluentd_plugin_helper_thread_running] || false
end

#thread_exist?(title) ⇒ Boolean



90
91
92
# File 'lib/fluent/plugin_helper/thread.rb', line 90

def thread_exist?(title)
  @_threads.values.select{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }.size > 0
end

#thread_running?(title) ⇒ Boolean



99
100
101
102
# File 'lib/fluent/plugin_helper/thread.rb', line 99

def thread_running?(title)
  t = @_threads.values.select{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }.first
  t && t[:_fluentd_plugin_helper_thread_running]
end

#thread_started?(title) ⇒ Boolean



94
95
96
97
# File 'lib/fluent/plugin_helper/thread.rb', line 94

def thread_started?(title)
  t = @_threads.values.select{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }.first
  t && t[:_fluentd_plugin_helper_thread_started]
end

#thread_wait_until_startObject



34
35
36
37
38
# File 'lib/fluent/plugin_helper/thread.rb', line 34

def thread_wait_until_start
  until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && t[:_fluentd_plugin_helper_thread_started] } }
    sleep 0.1
  end
end

#thread_wait_until_stopObject



40
41
42
43
44
# File 'lib/fluent/plugin_helper/thread.rb', line 40

def thread_wait_until_stop
  until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && ![:_fluentd_plugin_helper_thread_running] } }
    sleep 0.1
  end
end