Class: FFWD::TCP::FlushingConnect

Inherits:
Object
  • Object
show all
Includes:
Reporter
Defined in:
lib/ffwd/protocol/tcp/flushing_connect.rb

Overview

A TCP connection implementation that buffers events and metrics in batches over a time window and calls ‘send_all’ on the connection.

Constant Summary collapse

DEFAULT_FORCED_FLUSH_FACTOR =

percent of maximum events/metrics which will cause a flush.

0.8
DEFAULT_EVENT_LIMIT =

defaults for buffered connections. maximum amount of events to buffer up.

1000
DEFAULT_METRIC_LIMIT =

maximum amount of metrics to buffer up.

10000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Reporter

build_meta, included, #increment, map_meta, #report!, #reporter_data

Constructor Details

#initialize(core, log, connection, config) ⇒ FlushingConnect

Returns a new instance of FlushingConnect.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 58

def initialize(core, log, connection, config)
  @log = log
  @c = connection

  flush_period = config[:flush_period]
  ignored = config[:ignored]
  forced_flush_factor = config[:forced_flush_factor]
  event_limit = config[:event_limit]
  metric_limit = config[:metric_limit]

  @event_buffer = []
  @metric_buffer = []
  @timer = nil
  @subs = []

  core.starting do
    @c.connect

    @timer = EM::PeriodicTimer.new(flush_period){flush!}

    unless ignored.include? :events
      event_consumer = setup_consumer(
        @event_buffer, event_limit, forced_flush_factor, :dropped_events)
      @subs << core.output.event_subscribe(&event_consumer)
    end

    unless ignored.include? :metrics
      metric_consumer = setup_consumer(
        @metric_buffer, metric_limit, forced_flush_factor, :dropped_metrics)
      @subs << core.output.metric_subscribe(&metric_consumer)
    end
  end

  core.stopping do
    @c.disconnect

    if @timer
      @timer.cancel
      @timer = nil
    end

    @subs.each(&:unsubscribe).clear
  end
end

Instance Attribute Details

#logObject (readonly)

Returns the value of attribute log.



45
46
47
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 45

def log
  @log
end

Class Method Details

.prepare(opts) ⇒ Object



51
52
53
54
55
56
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 51

def self.prepare opts
  opts[:forced_flush_factor] ||= DEFAULT_FORCED_FLUSH_FACTOR
  opts[:event_limit] ||= DEFAULT_EVENT_LIMIT
  opts[:metric_limit] ||= DEFAULT_METRIC_LIMIT
  opts
end

Instance Method Details

#flush!Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 103

def flush!
  if @event_buffer.empty? and @metric_buffer.empty?
    return
  end

  unless @c.writable?
    increment :dropped_events, @event_buffer.size
    increment :dropped_metrics, @metric_buffer.size
    return
  end

  @c.send_all @event_buffer, @metric_buffer
  increment :sent_events, @event_buffer.size
  increment :sent_metrics, @metric_buffer.size
rescue => e
  log.error "Failed to flush buffers", e

  log.error "The following data could not be flushed:"

  @event_buffer.each_with_index do |event, i|
    log.error "##{i}: #{event.to_h}"
  end

  @metric_buffer.each_with_index do |metric, i|
    log.error "##{i}: #{metric.to_h}"
  end

  increment :failed_events, @event_buffer.size
  increment :failed_metrics, @metric_buffer.size
ensure
  @event_buffer.clear
  @metric_buffer.clear
end

#reporter_metaObject



47
48
49
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 47

def reporter_meta
  @c.reporter_meta
end