Class: LaunchDarkly::EventProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/ldclient-rb/events.rb

Instance Method Summary collapse

Constructor Details

#initialize(sdk_key, config, client = nil) ⇒ EventProcessor

Returns a new instance of EventProcessor.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/ldclient-rb/events.rb', line 57

def initialize(sdk_key, config, client = nil)
  @queue = Queue.new
  @flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do
    @queue << FlushMessage.new
  end
  @flush_task.execute
  @users_flush_task = Concurrent::TimerTask.new(execution_interval: config.user_keys_flush_interval) do
    @queue << FlushUsersMessage.new
  end
  @users_flush_task.execute
  @stopped = Concurrent::AtomicBoolean.new(false)
  
  EventDispatcher.new(@queue, sdk_key, config, client)
end

Instance Method Details

#add_event(event) ⇒ Object



72
73
74
75
# File 'lib/ldclient-rb/events.rb', line 72

def add_event(event)
  event[:creationDate] = (Time.now.to_f * 1000).to_i
  @queue << EventMessage.new(event)
end

#flushObject



77
78
79
80
# File 'lib/ldclient-rb/events.rb', line 77

def flush
  # flush is done asynchronously
  @queue << FlushMessage.new
end

#stopObject



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/ldclient-rb/events.rb', line 82

def stop
  # final shutdown, which includes a final flush, is done synchronously
  if @stopped.make_true
    @flush_task.shutdown
    @users_flush_task.shutdown
    @queue << FlushMessage.new
    stop_msg = StopMessage.new
    @queue << stop_msg
    stop_msg.wait_for_completion
  end
end

#wait_until_inactiveObject

exposed only for testing



95
96
97
98
99
# File 'lib/ldclient-rb/events.rb', line 95

def wait_until_inactive
  sync_msg = TestSyncMessage.new
  @queue << sync_msg
  sync_msg.wait_for_completion
end