Class: FFWD::ProducingClient
- Inherits:
-
Object
- Object
- FFWD::ProducingClient
- Defined in:
- lib/ffwd/producing_client.rb
Overview
A client implementation that delegates all work to other threads.
Defined Under Namespace
Classes: Producer
Instance Method Summary collapse
- #flush! ⇒ Object
-
#initialize(channel, producer, flush_period, event_limit, metric_limit) ⇒ ProducingClient
constructor
A new instance of ProducingClient.
- #report! ⇒ Object
- #reporter_meta ⇒ Object
-
#safer_flush! ⇒ Object
Apply some heuristics to determine if we can ‘ignore’ the current flush to prevent loss of data.
Methods included from Logging
Methods included from Reporter
included, #increment, map_meta, #reporter_data
Constructor Details
#initialize(channel, producer, flush_period, event_limit, metric_limit) ⇒ ProducingClient
Returns a new instance of ProducingClient.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/ffwd/producing_client.rb', line 61 def initialize channel, producer, flush_period, event_limit, metric_limit @flush_period = flush_period @event_limit = event_limit @metric_limit = metric_limit if @flush_period <= 0 raise "Invalid flush period: #{flush_period}" end @producer = producer @producer_is_reporter = FFWD.is_reporter? producer @events = [] @metrics = [] # Pending request. @request = nil @timer = nil @subs = [] channel.starting do @timer = EM::PeriodicTimer.new(@flush_period){safer_flush!} @subs << channel.event_subscribe do |e| if @events.size >= @event_limit increment :dropped_events next end @events << e end @subs << channel.metric_subscribe do |m| if @metrics.size >= @metric_limit increment :dropped_metrics next end @metrics << m end @producer.setup end channel.stopping do if @timer @timer.cancel @timer = nil end flush! @subs.each(&:unsubscribe).clear @metrics.clear @events.clear @producer.teardown end end |
Instance Method Details
#flush! ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/ffwd/producing_client.rb', line 142 def flush! if @request or not @request = @producer.produce(@events, @metrics) increment :dropped_events, @events.size increment :dropped_metrics, @metrics.size return end # store buffer sizes for use in callbacks. events_size = @events.size metrics_size = @metrics.size @request.callback do increment :sent_events, events_size increment :sent_metrics, metrics_size @request = nil end @request.errback do |e| log.error "Failed to produce", e increment :failed_events, events_size increment :failed_metrics, metrics_size @request = nil end rescue => e increment :failed_events, @events.size increment :failed_metrics, @metrics.size log.error "Failed to produce", e ensure @events.clear @metrics.clear end |
#report! ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/ffwd/producing_client.rb', line 49 def report! super do |m| yield m end return unless @producer_is_reporter @producer.report! do |m| yield m end end |
#reporter_meta ⇒ Object
44 45 46 47 |
# File 'lib/ffwd/producing_client.rb', line 44 def @reporter_meta ||= @producer..merge( :type => "producing_client_out") end |
#safer_flush! ⇒ Object
Apply some heuristics to determine if we can ‘ignore’ the current flush to prevent loss of data.
Checks that if a request is pending; we have not breached the limit of allowed events.
128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/ffwd/producing_client.rb', line 128 def safer_flush! if @request increment :slow_requests ignore_flush = ( @events.size < @event_limit or @metrics.size < @metric_limit) return if ignore_flush end flush! end |