Class: MessageBus::TimerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/timer_thread.rb

Defined Under Namespace

Classes: Cancelable, CancelableEvery

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTimerThread

Returns a new instance of TimerThread.



24
25
26
27
28
29
30
31
# File 'lib/message_bus/timer_thread.rb', line 24

def initialize
  @stopped = false
  @jobs = []
  @mutex = Mutex.new
  @next = nil
  @thread = Thread.new{do_work}
  @on_error = lambda{|e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}"}
end

Instance Attribute Details

#jobsObject (readonly)

Returns the value of attribute jobs.



3
4
5
# File 'lib/message_bus/timer_thread.rb', line 3

def jobs
  @jobs
end

Instance Method Details

#every(delay, &block) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/message_bus/timer_thread.rb', line 45

def every(delay, &block)
  result = CancelableEvery.new
  do_work = proc do
    begin
      block.call
    ensure
      result.current = queue(delay, &do_work)
    end
  end
  result.current = queue(delay,&do_work)
  result
end

#on_error(&block) ⇒ Object



87
88
89
# File 'lib/message_bus/timer_thread.rb', line 87

def on_error(&block)
  @on_error = block
end

#queue(delay = 0, &block) ⇒ Object

queue a block to run after a certain delay (in seconds)



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/message_bus/timer_thread.rb', line 59

def queue(delay=0, &block)
  queue_time = Time.new.to_f + delay
  job = [queue_time, block]

  @mutex.synchronize do
    i = @jobs.length
    while i > 0
      i -= 1
      current,_ = @jobs[i]
      i+=1 and break if current < queue_time
    end
    @jobs.insert(i, job)
    @next = queue_time if i==0
  end

  unless @thread.alive?
    @mutex.synchronize do
      @thread = Thread.new{do_work} unless @thread.alive?
    end
  end

  if @thread.status == "sleep".freeze
    @thread.wakeup
  end

  Cancelable.new(job)
end

#stopObject



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/message_bus/timer_thread.rb', line 33

def stop
  @stopped = true
  running = true
  while running
    @mutex.synchronize do
      running = @thread && @thread.alive?
      @thread.wakeup if running
    end
    sleep 0
  end
end