Class: MessageBus::TimerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/timer_thread.rb

Defined Under Namespace

Classes: Cancelable, CancelableEvery

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTimerThread

Returns a new instance of TimerThread.



33
34
35
36
37
38
39
40
# File 'lib/message_bus/timer_thread.rb', line 33

def initialize
  @stopped = false
  @jobs = []
  @mutex = Mutex.new
  @next = nil
  @thread = Thread.new { do_work }
  @on_error = lambda { |e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}" }
end

Instance Attribute Details

#jobsObject (readonly)

Returns the value of attribute jobs.



4
5
6
# File 'lib/message_bus/timer_thread.rb', line 4

def jobs
  @jobs
end

Instance Method Details

#every(delay, &block) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/message_bus/timer_thread.rb', line 54

def every(delay, &block)
  result = CancelableEvery.new
  do_work = proc do
    begin
      block.call
    ensure
      result.current = queue(delay, &do_work)
    end
  end
  result.current = queue(delay, &do_work)
  result
end

#on_error(&block) ⇒ Object



99
100
101
# File 'lib/message_bus/timer_thread.rb', line 99

def on_error(&block)
  @on_error = block
end

#queue(delay = 0, &block) ⇒ Object

queue a block to run after a certain delay (in seconds)



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/message_bus/timer_thread.rb', line 68

def queue(delay = 0, &block)
  queue_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delay
  job = [queue_time, block]

  @mutex.synchronize do
    i = @jobs.length
    while i > 0
      i -= 1
      current, _ = @jobs[i]
      if current < queue_time
        i += 1
        break
      end
    end
    @jobs.insert(i, job)
    @next = queue_time if i == 0
  end

  unless @thread.alive?
    @mutex.synchronize do
      @thread = Thread.new { do_work } unless @thread.alive?
    end
  end

  if @thread.status == "sleep"
    @thread.wakeup
  end

  Cancelable.new(job)
end

#stopObject



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/message_bus/timer_thread.rb', line 42

def stop
  @stopped = true
  running = true
  while running
    @mutex.synchronize do
      running = @thread && @thread.alive?
      @thread.wakeup if running
    end
    sleep 0
  end
end