Class: LaunchDarkly::EventProcessor

Inherits:
Object
  • Object
show all
Includes:
EventProcessorMethods
Defined in:
lib/ldclient-rb/events.rb

Instance Method Summary collapse

Constructor Details

#initialize(sdk_key, config, client = nil, diagnostic_accumulator = nil, test_properties = nil) ⇒ EventProcessor

Returns a new instance of EventProcessor.

Raises:

  • (ArgumentError)


117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/ldclient-rb/events.rb', line 117

def initialize(sdk_key, config, client = nil, diagnostic_accumulator = nil, test_properties = nil)
  raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil?  # see LDClient constructor comment on sdk_key
  @logger = config.logger
  @inbox = SizedQueue.new(config.capacity < 100 ? 100 : config.capacity)
  @flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do
    post_to_inbox(FlushMessage.new)
  end
  @flush_task.execute
  @contexts_flush_task = Concurrent::TimerTask.new(execution_interval: config.context_keys_flush_interval) do
    post_to_inbox(FlushContextsMessage.new)
  end
  @contexts_flush_task.execute
  if !diagnostic_accumulator.nil?
    interval = test_properties && test_properties.has_key?(:diagnostic_recording_interval) ?
      test_properties[:diagnostic_recording_interval] :
      config.diagnostic_recording_interval
    @diagnostic_event_task = Concurrent::TimerTask.new(execution_interval: interval) do
      post_to_inbox(DiagnosticEventMessage.new)
    end
    @diagnostic_event_task.execute
  else
    @diagnostic_event_task = nil
  end
  @stopped = Concurrent::AtomicBoolean.new(false)
  @inbox_full = Concurrent::AtomicBoolean.new(false)

  event_sender = (test_properties || {})[:event_sender] ||
    Impl::EventSender.new(sdk_key, config, client || Util.new_http_client(config.events_uri, config))

  @timestamp_fn = (test_properties || {})[:timestamp_fn] || proc { Impl::Util.current_time_millis }

  EventDispatcher.new(@inbox, sdk_key, config, diagnostic_accumulator, event_sender)
end

Instance Method Details

#flushObject



181
182
183
184
# File 'lib/ldclient-rb/events.rb', line 181

def flush
  # flush is done asynchronously
  post_to_inbox(FlushMessage.new)
end

#record_custom_event(context, key, data = nil, metric_value = nil) ⇒ Object



173
174
175
# File 'lib/ldclient-rb/events.rb', line 173

def record_custom_event(context, key, data = nil, metric_value = nil)
  post_to_inbox(LaunchDarkly::Impl::CustomEvent.new(timestamp, context, key, data, metric_value))
end

#record_eval_event(context, key, version = nil, variation = nil, value = nil, reason = nil, default = nil, track_events = false, debug_until = nil, prereq_of = nil, sampling_ratio = nil, exclude_from_summaries = false) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/ldclient-rb/events.rb', line 151

def record_eval_event(
  context,
  key,
  version = nil,
  variation = nil,
  value = nil,
  reason = nil,
  default = nil,
  track_events = false,
  debug_until = nil,
  prereq_of = nil,
  sampling_ratio = nil,
  exclude_from_summaries = false
)
  post_to_inbox(LaunchDarkly::Impl::EvalEvent.new(timestamp, context, key, version, variation, value, reason,
    default, track_events, debug_until, prereq_of, sampling_ratio, exclude_from_summaries))
end

#record_identify_event(context) ⇒ Object



169
170
171
# File 'lib/ldclient-rb/events.rb', line 169

def record_identify_event(context)
  post_to_inbox(LaunchDarkly::Impl::IdentifyEvent.new(timestamp, context))
end

#record_migration_op_event(event) ⇒ Object



177
178
179
# File 'lib/ldclient-rb/events.rb', line 177

def record_migration_op_event(event)
  post_to_inbox(event)
end

#stopObject



186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/ldclient-rb/events.rb', line 186

def stop
  # final shutdown, which includes a final flush, is done synchronously
  if @stopped.make_true
    @flush_task.shutdown
    @contexts_flush_task.shutdown
    @diagnostic_event_task.shutdown unless @diagnostic_event_task.nil?
    # Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox
    # is full; an orderly shutdown can't happen unless these messages are received.
    @inbox << FlushMessage.new
    stop_msg = StopMessage.new
    @inbox << stop_msg
    stop_msg.wait_for_completion
  end
end

#wait_until_inactiveObject

exposed only for testing



202
203
204
205
206
# File 'lib/ldclient-rb/events.rb', line 202

def wait_until_inactive
  sync_msg = TestSyncMessage.new
  @inbox << sync_msg
  sync_msg.wait_for_completion
end