23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/tobox/plugins/stats.rb', line 23
def start
return if @running
config = @config
plugins = config.plugins.map(&:name)
interval = config.stats_interval_seconds
@stats_handlers = Array(config.lifecycle_events[:stats])
return if @stats_handlers.empty?
@error_handlers = Array(config.lifecycle_events[:error_worker])
@max_attempts = config[:max_attempts]
@created_at_column = config[:created_at_column]
@db = Sequel.connect(config.database.opts.merge(max_connections: 1))
@db.loggers = config.database.loggers
Array(config.lifecycle_events[:database_connect]).each { |cb| cb.call(@db) }
outbox_table = config[:table]
@outbox_ds = @db[outbox_table]
if plugins.include?("Tobox::Plugins::Inbox")
inbox_table = config[:inbox_table]
@inbox_ds = @db[inbox_table]
end
if @created_at_column
@oldest_event_age_ds = @outbox_ds.where(last_error: nil, run_at: nil).order(Sequel.asc(:id))
end
logger = config.default_logger
stats = method(:collect_event_stats)
stats.instance_eval do
alias collect call
end
@th = Thread.start do
Thread.current.name = "outbox-stats"
loop do
logger.debug { "stats worker: sleep for #{interval}s..." }
sleep interval
begin
emit_event_stats(stats)
rescue RuntimeError => e
@error_handlers.each { |hd| hd.call(e) }
end
break unless @running
end
end
@running = true
end
|