Class: LaunchDarkly::StreamProcessor
- Inherits:
-
Object
- Object
- LaunchDarkly::StreamProcessor
- Defined in:
- lib/ldclient-rb/stream.rb
Instance Method Summary collapse
- #get_feature(key) ⇒ Object
-
#initialize(api_key, config) ⇒ StreamProcessor
constructor
A new instance of StreamProcessor.
- #initialized? ⇒ Boolean
- #should_fallback_update ⇒ Object
- #start ⇒ Object
- #started? ⇒ Boolean
Constructor Details
#initialize(api_key, config) ⇒ StreamProcessor
Returns a new instance of StreamProcessor.
68 69 70 71 72 73 74 |
# File 'lib/ldclient-rb/stream.rb', line 68 def initialize(api_key, config) @api_key = api_key @config = config @store = config.feature_store ? config.feature_store : InMemoryFeatureStore.new @disconnected = Concurrent::AtomicReference.new(nil) @started = Concurrent::AtomicBoolean.new(false) end |
Instance Method Details
#get_feature(key) ⇒ Object
84 85 86 87 88 89 |
# File 'lib/ldclient-rb/stream.rb', line 84 def get_feature(key) if not initialized? throw :uninitialized end @store.get(key) end |
#initialized? ⇒ Boolean
76 77 78 |
# File 'lib/ldclient-rb/stream.rb', line 76 def initialized?() @store.initialized? end |
#should_fallback_update ⇒ Object
156 157 158 159 |
# File 'lib/ldclient-rb/stream.rb', line 156 def should_fallback_update() disc = @disconnected.get disc != nil and disc < (Time.now - 120) end |
#start ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/ldclient-rb/stream.rb', line 104 def start() # Try to start the reactor. If it's not started, we shouldn't start # the stream processor if not start_reactor return end # If someone else booted the stream processor connection, just return if not @started.make_true return end # If we're the first and only thread to set started, boot # the stream processor connection EM.defer do source = EM::EventSource.new(@config.stream_uri + "/features", {}, {'Accept' => 'text/event-stream', 'Authorization' => 'api_key ' + @api_key, 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION}) source.on PUT do || features = JSON.parse(, :symbolize_names => true) @store.init(features) set_connected end source.on PATCH do || json = JSON.parse(, :symbolize_names => true) @store.upsert(json[:path][1..-1], json[:data]) set_connected end source.on DELETE do || json = JSON.parse(, :symbolize_names => true) @store.delete(json[:path][1..-1], json[:version]) set_connected end source.error do |error| @config.logger.error("[LDClient] Error subscribing to stream API: #{error}") set_disconnected end source.inactivity_timeout = 0 source.start end end |
#started? ⇒ Boolean
80 81 82 |
# File 'lib/ldclient-rb/stream.rb', line 80 def started?() @started.value end |