Class: FluidFeatures::AppReporter

Inherits:
Object
  • Object
show all
Defined in:
lib/fluidfeatures/app/reporter.rb

Constant Summary collapse

MAX_BUCKETS =

Throw oldest buckets away or offload to persistent storage when this limit reached.

10
MAX_BUCKET_SIZE =

Max number of transactions we queue in a bucket.

100
WAIT_BETWEEN_QUEUE_EMTPY_CHECKS =

While queue is empty we will check size every 0.5 secs

0.5
WAIT_BETWEEN_SEND_SUCCESS_NONE_WAITING =

Soft max of 1 req/sec

1
WAIT_BETWEEN_SEND_SUCCESS_NEXT_WAITING =

Hard max of 10 req/sec

0.1
WAIT_BETWEEN_SEND_FAILURES =

If we are failing to communicate with the FluidFeautres API then wait for this long between requests.

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app) ⇒ AppReporter

seconds



28
29
30
31
32
33
34
35
36
37
# File 'lib/fluidfeatures/app/reporter.rb', line 28

def initialize(app)
  raise "app invalid : #{app}" unless app.is_a? ::FluidFeatures::App
  @sending = false
  configure(app)
  at_exit do
    if @buckets_storage and @buckets
      @buckets_storage.append(@buckets)
    end
  end
end

Instance Attribute Details

#appObject

Returns the value of attribute app.



7
8
9
# File 'lib/fluidfeatures/app/reporter.rb', line 7

def app
  @app
end

Instance Method Details

#bucket_countObject



239
240
241
242
243
244
245
# File 'lib/fluidfeatures/app/reporter.rb', line 239

def bucket_count
  num_buckets = 0
  buckets_lock_synchronize do
    num_buckets = @buckets.size
  end
  num_buckets
end

#buckets_lock_synchronizeObject



329
330
331
332
333
# File 'lib/fluidfeatures/app/reporter.rb', line 329

def buckets_lock_synchronize
  @buckets_lock.synchronize do
    yield
  end
end

#buckets_storageObject



52
53
54
# File 'lib/fluidfeatures/app/reporter.rb', line 52

def buckets_storage
  @buckets_storage ||= FluidFeatures::Persistence::Buckets.create(FluidFeatures.config["cache"])
end

#configure(app) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluidfeatures/app/reporter.rb', line 60

def configure(app)
  @app = app

  @buckets = buckets_storage.fetch(MAX_BUCKETS)

  @buckets_lock = ::Mutex.new

  #maybe could get rid of @current_bucket concept
  @current_bucket = nil
  @current_bucket_lock = ::Mutex.new
  @current_bucket = last_or_new_bucket

  @unknown_features = features_storage.list_unknown
  @unknown_features_lock = ::Mutex.new
end

#features_storageObject



56
57
58
# File 'lib/fluidfeatures/app/reporter.rb', line 56

def features_storage
  @features_storage ||= FluidFeatures::Persistence::Features.create(FluidFeatures.config["cache"])
end

#last_or_new_bucketObject



76
77
78
# File 'lib/fluidfeatures/app/reporter.rb', line 76

def last_or_new_bucket
  @buckets.empty? || @buckets.last.size >= MAX_BUCKET_SIZE ? new_bucket : @buckets.last
end

#new_bucketObject



248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/fluidfeatures/app/reporter.rb', line 248

def new_bucket
  bucket = []
  buckets_lock_synchronize do
    @buckets << bucket
    if @buckets.size > MAX_BUCKETS
      #offload to storage
      unless buckets_storage.append_one(@buckets.shift)
        app.logger.warn "[FF] Discarded transactions due to reporter backlog. These will not be reported to FluidFeatures."
      end
    end
  end
  bucket
end

#queue_transaction_payload(transaction_payload) ⇒ Object



299
300
301
302
303
304
305
306
# File 'lib/fluidfeatures/app/reporter.rb', line 299

def queue_transaction_payload(transaction_payload)
  @current_bucket_lock.synchronize do
    if @current_bucket.size >= MAX_BUCKET_SIZE
      @current_bucket = new_bucket
    end
    @current_bucket << transaction_payload
  end
end

#queue_unknown_features(unknown_features) ⇒ Object



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/fluidfeatures/app/reporter.rb', line 309

def queue_unknown_features(unknown_features)
  raise "unknown_features should be a Hash" unless unknown_features.is_a? Hash
  unknown_features.each_pair do |feature_name, versions|
    raise "unknown_features values should be a Hash. versions=#{versions}" unless versions.is_a? Hash
  end
  @unknown_features_lock.synchronize do
    unknown_features.each_pair do |feature_name, versions|
      unless @unknown_features.has_key? feature_name
        @unknown_features[feature_name] = {}
      end
      versions.each_pair do |version_name, default_enabled|
        unless @unknown_features[feature_name].has_key? version_name
          @unknown_features[feature_name][version_name] = default_enabled
        end
      end
    end
  end
end

#remove_bucketObject



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/fluidfeatures/app/reporter.rb', line 263

def remove_bucket
  removed_bucket = nil
  buckets_lock_synchronize do
    #try to get buckets from storage first
    if @buckets.empty? && !buckets_storage.empty?
      @buckets = buckets_storage.fetch(MAX_BUCKETS)
    end

    if @buckets.size > 0
      removed_bucket = @buckets.shift
    end
    if @buckets.size == 0
      @current_bucket_lock.synchronize do
        @current_bucket = []
        @buckets << @current_bucket
      end
    end
  end
  removed_bucket
end

#report_transaction(transaction) ⇒ Object

Pass FluidFeatures::AppUserTransaction for reporting back to the FluidFeatures service.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/fluidfeatures/app/reporter.rb', line 82

def report_transaction(transaction)

  user = transaction.user

  payload = {
    :url => transaction.url,
    :user => {
      :id => user.unique_id
    },
    :hits => {
      :feature => transaction.features_hit,
      :goal    => transaction.goals_hit
    },
    # stats
    :stats => {
      :duration => transaction.duration
    }
  }

  payload_user = payload[:user] ||= {}
  payload_user[:name] = user.display_name if user.display_name
  payload_user[:anonymous] = user.anonymous if user.anonymous
  payload_user[:unique] = user.unique_attrs if user.unique_attrs
  payload_user[:cohorts] = user.cohort_attrs if user.cohort_attrs

  queue_transaction_payload(payload)

  if transaction.unknown_features.size > 0
    queue_unknown_features(transaction.unknown_features)
    features_storage.replace_unknown(@unknown_features)
  end

  start_sending unless @sending
end

#run_loopObject



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluidfeatures/app/reporter.rb', line 117

def run_loop

  return unless @sending
  return if @loop_thread and @loop_thread.alive?

  @loop_thread = Thread.new do
    while @sending
      run_loop_iteration(
        WAIT_BETWEEN_QUEUE_EMTPY_CHECKS,
        WAIT_BETWEEN_SEND_SUCCESS_NONE_WAITING,
        WAIT_BETWEEN_SEND_SUCCESS_NEXT_WAITING,
        WAIT_BETWEEN_SEND_FAILURES
      )
    end
  end
end

#run_loop_iteration(wait_between_queue_emtpy_checks, wait_between_send_success_none_waiting, wait_between_send_success_next_waiting, wait_between_send_failures) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/fluidfeatures/app/reporter.rb', line 134

def run_loop_iteration(
  wait_between_queue_emtpy_checks,
  wait_between_send_success_none_waiting,
  wait_between_send_success_next_waiting,
  wait_between_send_failures)
  begin

    unless transactions_queued?
      sleep wait_between_queue_emtpy_checks
      return
    end

    success = send_transactions

    if success
      # Unless we have a full bucket waiting do not make
      # more than N requests per second.
      if bucket_count <= 1
        sleep wait_between_send_success_none_waiting
      else
        sleep wait_between_send_success_next_waiting
      end
    else  
      # If service is down, then slow our requests
      # within this thread
      sleep wait_between_send_failures
    end

  rescue Exception => err
    # catch errors, so that we do not affect the rest of the application
    app.logger.error "[FF] send_transactions failed : #{err.message}\n#{err.backtrace.join("\n")}"
    # hold off for a little while and try again
    sleep wait_between_send_failures
  end
end

#send_transactionsObject



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/fluidfeatures/app/reporter.rb', line 188

def send_transactions
  bucket = remove_bucket

  # Take existing unknown features and reset
  unknown_features = nil
  @unknown_features_lock.synchronize do
    unknown_features = @unknown_features
    @unknown_features = {}
  end

  remaining_buckets_stats = nil
  buckets_lock_synchronize do
    remaining_buckets_stats = @buckets.map { |b| b.size }
  end

  api_request_log = app.client.siphon_api_request_log

  payload = {
    :client_uuid => app.client.uuid,
    :transactions => bucket,
    :stats => {
      :waiting_buckets => remaining_buckets_stats
    },
    :unknown_features => unknown_features,
    :api_request_log => api_request_log
  }

  if remaining_buckets_stats.size > 0
    payload[:stats][:waiting_buckets] = remaining_buckets_stats
  end

  # attempt to send to fluidfeatures service
  success = app.post("/report/transactions", payload)

  # handle failure to send data
  unless success
    # return bucket into bucket queue until the next attempt at sending
    if not unremove_bucket(bucket)
      app.logger.warn "[FF] Discarded transactions due to reporter backlog. These will not be reported to FluidFeatures."
    end
    # return unknown features to queue until the next attempt at sending
    queue_unknown_features(unknown_features)
  else
    features_storage.replace_unknown({})
  end

  # return whether we were able to send or not
  success
end

#start_sendingObject



39
40
41
42
43
# File 'lib/fluidfeatures/app/reporter.rb', line 39

def start_sending
  return if @sending
  @sending = true
  run_loop
end

#stop_sending(wait = false) ⇒ Object



45
46
47
48
49
50
# File 'lib/fluidfeatures/app/reporter.rb', line 45

def stop_sending(wait=false)
  @sending = false
  if wait
    @loop_thread.join if @loop_thread and @loop_thread.alive?
  end
end

#transactions_queued?Boolean

Returns:

  • (Boolean)


171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/fluidfeatures/app/reporter.rb', line 171

def transactions_queued?
  have_transactions = false
  buckets_lock_synchronize do
    if @buckets.size == 1
      @current_bucket_lock.synchronize do
        if @current_bucket.size > 0
          have_transactions = true
        end
      end
    elsif @buckets.size > 1 and @buckets[0].size > 0
      have_transactions = true
    end
  end
  have_transactions
end

#unremove_bucket(bucket) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/fluidfeatures/app/reporter.rb', line 285

def unremove_bucket(bucket)
  success = false
  buckets_lock_synchronize do
    if @buckets.size <= MAX_BUCKETS
      @buckets.unshift bucket
      success = true
    else
      success = buckets_storage.append_one(bucket)
    end
  end
  success
end