Class: FluidFeatures::AppReporter

Inherits:
Object
  • Object
show all
Defined in:
lib/fluidfeatures/app/reporter.rb

Constant Summary collapse

MAX_BUCKETS =

Throw away oldest buckets when this limit reached.

10
MAX_BUCKET_SIZE =

Max number of transactions we queue in a bucket.

100
WAIT_BETWEEN_QUEUE_EMTPY_CHECKS =

While queue is empty we will check size every 0.5 secs

0.5
WAIT_BETWEEN_SEND_SUCCESS_NONE_WAITING =

Soft max of 1 req/sec

1
WAIT_BETWEEN_SEND_SUCCESS_NEXT_WAITING =

Hard max of 10 req/sec

0.1
WAIT_BETWEEN_SEND_FAILURES =

If we are failing to communicate with the FluidFeautres API then wait for this long between requests.

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app) ⇒ AppReporter

seconds



28
29
30
31
32
# File 'lib/fluidfeatures/app/reporter.rb', line 28

def initialize(app)
  raise "app invalid : #{app}" unless app.is_a? ::FluidFeatures::App
  configure(app)
  run_loop
end

Instance Attribute Details

#appObject

Returns the value of attribute app.



7
8
9
# File 'lib/fluidfeatures/app/reporter.rb', line 7

def app
  @app
end

Instance Method Details

#bucket_countObject



186
187
188
189
190
191
192
# File 'lib/fluidfeatures/app/reporter.rb', line 186

def bucket_count
  num_buckets = 0
  @buckets_lock.synchronize do
    num_buckets = @buckets.size
  end
  num_buckets
end

#configure(app) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/fluidfeatures/app/reporter.rb', line 34

def configure(app)
  @app = app

  @buckets = []
  @buckets_lock = ::Mutex.new

  @current_bucket = nil
  @current_bucket_lock = ::Mutex.new
  @current_bucket = new_bucket

  @unknown_features = {}
  @unknown_features_lock = ::Mutex.new
end

#new_bucketObject



195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/fluidfeatures/app/reporter.rb', line 195

def new_bucket
  bucket = []
  discarded_bucket = nil
  @buckets_lock.synchronize do
    @buckets << bucket
    if @buckets.size > MAX_BUCKETS
      discarded_bucket = @buckets.shift
    end
  end
  if discarded_bucket
    app.logger.warn "[FF] Discarded #{discarded_bucket.size} transactions due to reporter backlog. These will not be reported to FluidFeatures."
  end
  bucket
end

#queue_transaction_payload(transaction_payload) ⇒ Object



240
241
242
243
244
245
246
247
# File 'lib/fluidfeatures/app/reporter.rb', line 240

def queue_transaction_payload(transaction_payload)
  @current_bucket_lock.synchronize do
    if @current_bucket.size >= MAX_BUCKET_SIZE
      @current_bucket = new_bucket
    end
    @current_bucket << transaction_payload
  end
end

#queue_unknown_features(unknown_features) ⇒ Object



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/fluidfeatures/app/reporter.rb', line 250

def queue_unknown_features(unknown_features)
  raise "unknown_features should be a Hash" unless unknown_features.is_a? Hash
  unknown_features.each_pair do |feature_name, versions|
    raise "unknown_features values should be a Hash. versions=#{versions}" unless versions.is_a? Hash
  end
  @unknown_features_lock.synchronize do
    unknown_features.each_pair do |feature_name, versions|
      unless @unknown_features.has_key? feature_name
        @unknown_features[feature_name] = {}
      end
      versions.each_pair do |version_name, default_enabled|
        unless @unknown_features[feature_name].has_key? version_name
          @unknown_features[feature_name][version_name] = default_enabled
        end
      end
    end
  end
end

#remove_bucketObject



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/fluidfeatures/app/reporter.rb', line 211

def remove_bucket
  removed_bucket = nil
  @buckets_lock.synchronize do
    if @buckets.size > 0
      removed_bucket = @buckets.shift
    end
    if @buckets.size == 0
      @current_bucket_lock.synchronize do
        @current_bucket = []
        @buckets << @current_bucket
      end
    end
  end
  removed_bucket
end

#report_transaction(transaction) ⇒ Object

Pass FluidFeatures::AppUserTransaction for reporting back to the FluidFeatures service.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fluidfeatures/app/reporter.rb', line 50

def report_transaction(transaction)

  user = transaction.user

  payload = {
    :url => transaction.url,
    :user => {
      :id => user.unique_id
    },
    :hits => {
      :feature => transaction.features_hit,
      :goal    => transaction.goals_hit
    },
    # stats
    :stats => {
      :duration => transaction.duration
    }
  }

  payload_user = payload[:user] ||= {}
  payload_user[:name] = user.display_name if user.display_name
  payload_user[:anonymous] = user.anonymous if user.anonymous
  payload_user[:unique] = user.unique_attrs if user.unique_attrs
  payload_user[:cohorts] = user.cohort_attrs if user.cohort_attrs

  queue_transaction_payload(payload)

  if transaction.unknown_features.size > 0
    queue_unknown_features(transaction.unknown_features)
  end

end

#run_loopObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fluidfeatures/app/reporter.rb', line 83

def run_loop
  Thread.new do
    while true
      begin

        unless transactions_queued?
          sleep WAIT_BETWEEN_QUEUE_EMTPY_CHECKS
          next
        end

        success = send_transactions

        if success
          # Unless we have a full bucket waiting do not make
          # more than N requests per second.
          if bucket_count <= 1
            sleep WAIT_BETWEEN_SEND_SUCCESS_NONE_WAITING
          else
            sleep WAIT_BETWEEN_SEND_SUCCESS_NEXT_WAITING
          end
        else  
          # If service is down, then slow our requests
          # within this thread
          sleep WAIT_BETWEEN_SEND_FAILURES
        end

      rescue Exception => err
        # catch errors, so that we do not affect the rest of the application
        app.logger.error "[FF] send_transactions failed : #{err.message}\n#{err.backtrace.join("\n")}"
        # hold off for a little while and try again
        sleep WAIT_BETWEEN_SEND_FAILURES
      end
    end
  end
end

#send_transactionsObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/fluidfeatures/app/reporter.rb', line 137

def send_transactions
  bucket = remove_bucket

  # Take existing unknown features and reset
  unknown_features = nil
  @unknown_features_lock.synchronize do
    unknown_features = @unknown_features
    @unknown_features = {}
  end

  remaining_buckets_stats = nil
  @buckets_lock.synchronize do
    remaining_buckets_stats = @buckets.map { |b| b.size }
  end

  api_request_log = app.client.siphon_api_request_log

  payload = {
    :client_uuid => app.client.uuid,
    :transactions => bucket,
    :stats => {
      :waiting_buckets => remaining_buckets_stats
    },
    :unknown_features => unknown_features,
    :api_request_log => api_request_log
  }

  if remaining_buckets_stats.size > 0
    payload[:stats][:waiting_buckets] = remaining_buckets_stats
  end

  # attempt to send to fluidfeatures service
  success = app.post("/report/transactions", payload)

  # handle failure to send data
  unless success
    # return bucket into bucket queue until the next attempt at sending
    if not unremove_bucket(bucket)
      app.logger.warn "[FF] Discarded transactions due to reporter backlog. These will not be reported to FluidFeatures."
    end
    # return unknown features to queue until the next attempt at sending
    queue_unknown_features(unknown_features)
  end

  # return whether we were able to send or not
  success
end

#transactions_queued?Boolean



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/fluidfeatures/app/reporter.rb', line 120

def transactions_queued?
  have_transactions = false
  @buckets_lock.synchronize do
    if @buckets.size == 1
      @current_bucket_lock.synchronize do
        if @current_bucket.size > 0
          have_transactions = true
        end
      end
    elsif @buckets.size > 1 and @buckets[0].size > 0
      have_transactions = true
    end
  end
  have_transactions
end

#unremove_bucket(bucket) ⇒ Object



228
229
230
231
232
233
234
235
236
237
# File 'lib/fluidfeatures/app/reporter.rb', line 228

def unremove_bucket(bucket)
  success = false
  @buckets_lock.synchronize do
    if @buckets.size <= MAX_BUCKETS
      @buckets.unshift bucket
      success = true
    end
  end
  success
end