Class: FFWD::TCP::FlushingConnect
- Inherits:
-
Object
- Object
- FFWD::TCP::FlushingConnect
- Includes:
- Reporter
- Defined in:
- lib/ffwd/protocol/tcp/flushing_connect.rb
Overview
A TCP connection implementation that buffers events and metrics in batches over a time window and calls ‘send_all’ on the connection.
Constant Summary collapse
- DEFAULT_FORCED_FLUSH_FACTOR =
percent of maximum events/metrics which will cause a flush.
0.8
- DEFAULT_EVENT_LIMIT =
defaults for buffered connections. maximum amount of events to buffer up.
1000
- DEFAULT_METRIC_LIMIT =
maximum amount of metrics to buffer up.
10000
Instance Attribute Summary collapse
-
#log ⇒ Object
readonly
Returns the value of attribute log.
Class Method Summary collapse
Instance Method Summary collapse
- #flush! ⇒ Object
-
#initialize(core, log, connection, config) ⇒ FlushingConnect
constructor
A new instance of FlushingConnect.
- #reporter_meta ⇒ Object
Methods included from Reporter
included, #increment, map_meta, #report!, #reporter_data
Constructor Details
#initialize(core, log, connection, config) ⇒ FlushingConnect
Returns a new instance of FlushingConnect.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 52 def initialize(core, log, connection, config) @log = log @c = connection flush_period = config[:flush_period] ignored = config[:ignored] forced_flush_factor = config[:forced_flush_factor] event_limit = config[:event_limit] metric_limit = config[:metric_limit] @event_buffer = [] @metric_buffer = [] @timer = nil @subs = [] core.starting do @c.connect @timer = EM::PeriodicTimer.new(flush_period){flush!} unless ignored.include? :events event_consumer = setup_consumer( @event_buffer, event_limit, forced_flush_factor, :dropped_events) @subs << core.output.event_subscribe(&event_consumer) end unless ignored.include? :metrics metric_consumer = setup_consumer( @metric_buffer, metric_limit, forced_flush_factor, :dropped_metrics) @subs << core.output.metric_subscribe(&metric_consumer) end end core.stopping do @c.disconnect if @timer @timer.cancel @timer = nil end @subs.each(&:unsubscribe).clear end end |
Instance Attribute Details
#log ⇒ Object (readonly)
Returns the value of attribute log.
39 40 41 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 39 def log @log end |
Class Method Details
.prepare(opts) ⇒ Object
45 46 47 48 49 50 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 45 def self.prepare opts opts[:forced_flush_factor] ||= DEFAULT_FORCED_FLUSH_FACTOR opts[:event_limit] ||= DEFAULT_EVENT_LIMIT opts[:metric_limit] ||= DEFAULT_METRIC_LIMIT opts end |
Instance Method Details
#flush! ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 97 def flush! if @event_buffer.empty? and @metric_buffer.empty? return end unless @c.writable? increment :dropped_events, @event_buffer.size increment :dropped_metrics, @metric_buffer.size return end @c.send_all @event_buffer, @metric_buffer increment :sent_events, @event_buffer.size increment :sent_metrics, @metric_buffer.size rescue => e log.error "Failed to flush buffers", e log.error "The following data could not be flushed:" @event_buffer.each_with_index do |event, i| log.error "##{i}: #{event.to_h}" end @metric_buffer.each_with_index do |metric, i| log.error "##{i}: #{metric.to_h}" end increment :failed_events, @event_buffer.size increment :failed_metrics, @metric_buffer.size ensure @event_buffer.clear @metric_buffer.clear end |
#reporter_meta ⇒ Object
41 42 43 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 41 def @c. end |