Class: Async::Container::Threaded::Child

Inherits:
Channel
  • Object
show all
Defined in:
lib/async/container/threaded.rb

Overview

Represents a running child thread from the point of view of the parent container.

Defined Under Namespace

Classes: Exit, Instance, Status

Instance Attribute Summary

Attributes inherited from Channel

#in, #out

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Channel

#close_read, #close_write, #receive

Constructor Details

#initialize(name: nil, **options) ⇒ Child

Initialize the thread.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/async/container/threaded.rb', line 127

def initialize(name: nil, **options)
  super(**options)
  
  @status = nil
  
  @thread = yield(self)
  @thread.report_on_exception = false
  @thread.name = name
  
  @waiter = ::Thread.new do
    begin
      @thread.join
    rescue Exit => exit
      finished(exit.error)
    rescue Interrupt
      # Graceful shutdown.
      finished
    rescue Exception => error
      finished(error)
    else
      finished
    end
  end
end

Class Method Details

.fork(**options) ⇒ Object

Start a new child thread and execute the provided block in it.



113
114
115
116
117
118
119
120
121
122
# File 'lib/async/container/threaded.rb', line 113

def self.fork(**options)
  self.new(**options) do |thread|
    ::Thread.new do
      # This could be a configuration option (see forked implementation too):
      ::Thread.handle_interrupt(SignalException => :immediate) do
        yield Instance.for(thread)
      end
    end
  end
end

Instance Method Details

#as_jsonObject

Convert the child process to a hash, suitable for serialization.



155
156
157
158
159
160
# File 'lib/async/container/threaded.rb', line 155

def as_json(...)
  {
    name: @thread.name,
    status: @status&.as_json,
  }
end

#closeObject

Invoke #terminate! and then #wait for the child thread to exit.



188
189
190
191
192
193
# File 'lib/async/container/threaded.rb', line 188

def close
  self.terminate!
  self.wait
ensure
  super
end

#interrupt!Object

Raise Interrupt in the child thread.



196
197
198
# File 'lib/async/container/threaded.rb', line 196

def interrupt!
  @thread.raise(Interrupt)
end

#kill!Object

Invoke Thread#kill on the child thread.



206
207
208
209
210
211
# File 'lib/async/container/threaded.rb', line 206

def kill!
  # Killing a thread does not raise an exception in the thread, so we need to handle the status here:
  @status = Status.new(:killed)
  
  @thread.kill
end

#nameObject

Get the name of the thread.



177
178
179
# File 'lib/async/container/threaded.rb', line 177

def name
  @thread.name
end

#name=(value) ⇒ Object

Set the name of the thread.



171
172
173
# File 'lib/async/container/threaded.rb', line 171

def name= value
  @thread.name = value
end

#restart!Object

Raise Restart in the child thread.



214
215
216
# File 'lib/async/container/threaded.rb', line 214

def restart!
  @thread.raise(Restart)
end

#terminate!Object

Raise Async::Container::Terminate in the child thread.



201
202
203
# File 'lib/async/container/threaded.rb', line 201

def terminate!
  @thread.raise(Terminate)
end

#to_jsonObject

Convert the request to JSON.



165
166
167
# File 'lib/async/container/threaded.rb', line 165

def to_json(...)
  as_json.to_json(...)
end

#to_sObject

A human readable representation of the thread.



183
184
185
# File 'lib/async/container/threaded.rb', line 183

def to_s
  "\#<#{self.class} #{@thread.name}>"
end

#wait(timeout = 0.1) ⇒ Object

Wait for the thread to exit and return he exit status.



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/async/container/threaded.rb', line 223

def wait(timeout = 0.1)
  if @waiter
    Console.debug(self, "Waiting for thread to exit...", child: {thread_id: @thread.object_id}, timeout: timeout)
    
    unless @waiter.join(timeout)
      Console.warn(self, "Thread is blocking, sending kill signal...", child: {thread_id: @thread.object_id}, caller: caller_locations, timeout: timeout)
      self.kill!
      @waiter.join
    end
    
    @waiter = nil
  end
  
  Console.debug(self, "Thread exited.", child: {thread_id: @thread.object_id, status: @status})
  
  return @status
end