Class: SplitIoClient::Engine::Synchronizer

Inherits:
Object
  • Object
show all
Includes:
Cache::Fetchers, Cache::Senders
Defined in:
lib/splitclient-rb/engine/synchronizer.rb

Constant Summary collapse

ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES =
10

Instance Method Summary collapse

Constructor Details

#initialize(repositories, config, params) ⇒ Synchronizer

Returns a new instance of Synchronizer.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 11

def initialize(
  repositories,
  config,
  params
)
  @splits_repository = repositories[:splits]
  @segments_repository = repositories[:segments]
  @rule_based_segments_repository = repositories[:rule_based_segments]
  @impressions_repository = repositories[:impressions]
  @events_repository = repositories[:events]
  @config = config
  @split_fetcher = params[:split_fetcher]
  @segment_fetcher = params[:segment_fetcher]
  @impressions_api = params[:impressions_api]
  @impression_counter = params[:imp_counter]
  @telemetry_synchronizer = params[:telemetry_synchronizer]
  @impressions_sender_adapter = params[:impressions_sender_adapter]
  @unique_keys_tracker = params[:unique_keys_tracker]

  @splits_sync_backoff = Engine::BackOff.new(10, 0, 60)
  @segments_sync_backoff = Engine::BackOff.new(10, 0, 60)
end

Instance Method Details

#fetch_segment(name, target_change_number) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 105

def fetch_segment(name, target_change_number)
  return if target_change_number <= @segments_repository.get_change_number(name).to_i

  fetch_options = { cache_control_headers: true, till: nil }
  result = attempt_segment_sync(name,
                                target_change_number,
                                fetch_options,
                                @config.on_demand_fetch_max_retries,
                                @config.on_demand_fetch_retry_delay_seconds,
                                false)

  attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
  if result[:success]
    @config.logger.debug("Segment #{name} refresh completed in #{attempts} attempts.") if @config.debug_enabled

    return
  end

  fetch_options = { cache_control_headers: true, till: target_change_number }
  result = attempt_segment_sync(name,
                                target_change_number,
                                fetch_options,
                                ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
                                nil,
                                true)

  attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
  if result[:success]
    @config.logger.debug("Segment #{name} refresh completed bypassing the CDN in #{attempts} attempts.") if @config.debug_enabled
  else
    @config.logger.debug("No changes fetched for segment #{name} after #{attempts} attempts with CDN bypassed.") if @config.debug_enabled
  end
rescue StandardError => e
  @config.log_found_exception(__method__.to_s, e)
end

#fetch_splits(target_change_number, rbs_target_change_number) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 67

def fetch_splits(target_change_number, rbs_target_change_number)
  return if check_exit_conditions(target_change_number, rbs_target_change_number)

  fetch_options = { cache_control_headers: true, till: nil }

  result = attempt_splits_sync(target_change_number, rbs_target_change_number,
                               fetch_options,
                               @config.on_demand_fetch_max_retries,
                               @config.on_demand_fetch_retry_delay_seconds,
                               false)

  attempts = @config.on_demand_fetch_max_retries - result[:remaining_attempts]
  if result[:success]
    @segment_fetcher.fetch_segments_if_not_exists(result[:segment_names], true) unless result[:segment_names].empty?
    @config.logger.debug("Refresh completed in #{attempts} attempts.") if @config.debug_enabled

    return
  end

  if target_change_number != 0
    fetch_options[:till] = target_change_number 
  else
    fetch_options[:till] = rbs_target_change_number 
  end

  result = attempt_splits_sync(target_change_number, rbs_target_change_number,
                               fetch_options,
                               ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES,
                               nil,
                               true)

  attempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - result[:remaining_attempts]

  process_result(result, attempts)
rescue StandardError => e
  @config.log_found_exception(__method__.to_s, e)
end

#start_periodic_data_recordingObject



46
47
48
49
50
51
52
53
54
55
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 46

def start_periodic_data_recording
  unless @config.consumer?
    impressions_sender
    events_sender
    start_telemetry_sync_task
  end

  impressions_count_sender
  start_unique_keys_tracker_task
end

#start_periodic_fetchObject



57
58
59
60
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 57

def start_periodic_fetch
  @split_fetcher.call
  @segment_fetcher.call
end

#stop_periodic_fetchObject



62
63
64
65
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 62

def stop_periodic_fetch
  @split_fetcher.stop_splits_thread
  @segment_fetcher.stop_segments_thread
end

#sync_all(asynchronous = true) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/splitclient-rb/engine/synchronizer.rb', line 34

def sync_all(asynchronous = true)
  unless asynchronous
    return sync_splits_and_segments
  end

  @config.threads[:sync_all_thread] = Thread.new do
    sync_splits_and_segments
  end

  true
end