Class: Isono::Util::DeferedMsg

Inherits:
Queue
  • Object
show all
Defined in:
lib/isono/util.rb

Overview

A utility class to interact success/error message with two different threads. dm = DeferedMsg.new EM.schedule

if condition
  dm.success
else
  dm.error(RuntimeError.new("fail"))
end

dm.wait rescue abort("got failure")

# will raise RuntimeError in case of error(). dm.wait unless EM.reactor_thread?

Defined Under Namespace

Classes: TimeoutError

Instance Method Summary collapse

Constructor Details

#initialize(timeout = 60*15, th = Thread.current) ⇒ DeferedMsg

Returns a new instance of DeferedMsg.



169
170
171
172
173
174
175
176
# File 'lib/isono/util.rb', line 169

def initialize(timeout=60*15, th=Thread.current)
  super()
  @thread_wait = th
  @timer_sig = EventMachine.add_timer(timeout) {
    @on_timeout_hook.call if @on_timeout_hook
    error(TimeoutError.new)
  }
end

Instance Method Details

#cancelObject



193
194
195
196
# File 'lib/isono/util.rb', line 193

def cancel
  EventMachine.cancel_timer(@timer_sig) rescue nil
  @on_timeout_hook = nil
end

#error(ex) ⇒ Object

Raises:

  • (TypeError)


183
184
185
186
187
# File 'lib/isono/util.rb', line 183

def error(ex)
  raise TypeError unless ex.is_a?(Exception)
  self.enq(ex)
  @thread_called = Thread.current
end

#on_timeout(&blk) ⇒ Object



189
190
191
# File 'lib/isono/util.rb', line 189

def on_timeout(&blk)
  @on_timeout_hook = blk
end

#success(returnval = true) ⇒ Object



178
179
180
181
# File 'lib/isono/util.rb', line 178

def success(returnval=true)
  self.enq(returnval)
  @thread_called = Thread.current
end

#waitObject



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/isono/util.rb', line 198

def wait
  if (@thread_called == @thread_wait || @thread_called == Thread.current ) &&
      self.empty?
    raise "do success() or error() prior to calling wait()"
  end
  
  ret = self.deq()
  # requeue the message to distribute to another wait().
  self.enq(ret)
  if ret.is_a?(Exception)
    raise ret
  else
    return ret
  end
ensure
  @thread_wait = nil
  self.cancel
end