Class: Eventr::SupervisedObject

Inherits:
Object
  • Object
show all
Defined in:
lib/eventr/actors.rb

Direct Known Subclasses

Consumer, Publisher

Instance Method Summary collapse

Instance Method Details

#applicationObject



34
# File 'lib/eventr/actors.rb', line 34

def application() threads[:application]; end

#on_exceptionObject



37
38
39
# File 'lib/eventr/actors.rb', line 37

def on_exception
  @on_exception
end

#on_exception=(&block) ⇒ Object



41
42
43
# File 'lib/eventr/actors.rb', line 41

def on_exception=(&block)
  @on_exception = block
end

#sleep_time_from_backoffObject



45
46
47
48
# File 'lib/eventr/actors.rb', line 45

def sleep_time_from_backoff
  backoff = Thread.current[:backoff] || 0
  (0..backoff).inject([1,0]) { |(a,b), _| [b, a+b] }[0]
end

#startObject



28
29
30
31
# File 'lib/eventr/actors.rb', line 28

def start
  start_application_thread
  start_supervisor_thread
end

#start_application_threadObject



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/eventr/actors.rb', line 50

def start_application_thread
  threads[:application] ||= Thread.new {
    begin
      main()
    rescue StandardError => e
      on_exception.call(e) if on_exception.respond_to? :call
      warn "#{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
      raise e
    ensure
      threads[:supervisor].wakeup # wakeup the supervisor to help us recover
    end
  }
end

#start_supervisor_threadObject



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/eventr/actors.rb', line 64

def start_supervisor_thread
  threads[:supervisor] ||= Thread.new {
    Thread.current[:backoff] = 1

    begin
      runs = 5
      loop {
        unless (application && application.alive?)
          puts "#{self.class.name}::Supervisor: cleaning up app thread and restarting it."
          threads[:application] = nil
          start_application_thread

          # stop when we've successfully cleaned something up
          runs = 0

          # and make sure to reset backoff
          Thread.current[:backoff] = 1
        end

        # check for required cleanup 5 times over as many seconds
        if (runs -= 1) <= 0
          Thread.stop
          runs = 5
        end

        sleep 1
      }

    rescue StandardError => e
      warn "#{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"

      if Thread.current[:backoff] <= 15
        Thread.current[:backoff] += 1
        sleep_time = sleep_time_from_backoff
        warn "sleeping for #{sleep_time} before restarting supervisor"
        sleep sleep_Time
        retry
      end

      # if the supervisor goes away, take the whole thing down.
      threads[:application].raise SupervisorDown, "supervisor went away due to: #{e.class.name}: #{e.message} -> #{e.backtrace.first}"

      raise e
    end
  }
end

#stopObject



24
25
26
# File 'lib/eventr/actors.rb', line 24

def stop
  threads.values.each { |t| t.send :kill }
end

#supervisorObject



35
# File 'lib/eventr/actors.rb', line 35

def supervisor() threads[:supervisor]; end

#threadsObject



33
# File 'lib/eventr/actors.rb', line 33

def threads() @threads ||= {}; end