Class: MessageBus::HTTPClient

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/http_client.rb,
lib/message_bus/http_client/channel.rb,
lib/message_bus/http_client/version.rb

Overview

MessageBus client that enables subscription via long polling with support for chunked encoding. Falls back to normal polling if long polling is not available.

Defined Under Namespace

Classes: Channel, InvalidChannel, MissingBlock

Constant Summary collapse

STOPPED =
0
STARTED =
1
VERSION =
'1.0.0.pre1'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(base_url, enable_long_polling: true, enable_chunked_encoding: true, min_poll_interval: 0.1, max_poll_interval: 180, background_callback_interval: 60, headers: {}) ⇒ Object

Returns Instance of MessageBus::HTTPClient.

Parameters:

  • base_url (String)

    Base URL of the message_bus server to connect to

  • enable_long_polling (Boolean) (defaults to: true)

    Enable long polling

  • enable_chunked_encoding (Boolean) (defaults to: true)

    Enable chunk encoding

  • min_poll_interval (Float, Integer) (defaults to: 0.1)

    Min poll interval when long polling in seconds

  • max_poll_interval (Float, Integer) (defaults to: 180)

    Max poll interval when long polling in seconds. When requests fail, the client will backoff and this is the upper limit.

  • background_callback_interval (Float, Integer) (defaults to: 60)

    Interval to poll when when polling in seconds.

  • headers (Hash) (defaults to: {})

    extra HTTP headers to be set on the polling requests.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/message_bus/http_client.rb', line 67

def initialize(base_url, enable_long_polling: true,
                         enable_chunked_encoding: true,
                         min_poll_interval: 0.1,
                         max_poll_interval: 180,
                         background_callback_interval: 60,
                         headers: {})

  @uri = URI(base_url)
  @enable_long_polling = enable_long_polling
  @enable_chunked_encoding = enable_chunked_encoding
  @min_poll_interval = min_poll_interval
  @max_poll_interval = max_poll_interval
  @background_callback_interval = background_callback_interval
  @headers = headers
  @client_id = SecureRandom.hex
  @channels = {}
  @status = STOPPED
  @mutex = Mutex.new
  @stats = Stats.new(0, 0)
end

Instance Attribute Details

#background_callback_intervalFloat

Returns the polling interval in seconds.

Returns:

  • (Float)

    the polling interval in seconds



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/message_bus/http_client.rb', line 31

class HTTPClient
  class InvalidChannel < StandardError; end
  class MissingBlock < StandardError; end

  attr_reader :channels,
              :stats

  attr_accessor :enable_long_polling,
                :status,
                :enable_chunked_encoding,
                :min_poll_interval,
                :max_poll_interval,
                :background_callback_interval

  CHUNK_SEPARATOR = "\r\n|\r\n".freeze
  private_constant :CHUNK_SEPARATOR
  STATUS_CHANNEL = "/__status".freeze
  private_constant :STATUS_CHANNEL

  STOPPED = 0
  STARTED = 1

  Stats = Struct.new(:failed, :success)
  private_constant :Stats

  # @param base_url [String] Base URL of the message_bus server to connect to
  # @param enable_long_polling [Boolean] Enable long polling
  # @param enable_chunked_encoding [Boolean] Enable chunk encoding
  # @param min_poll_interval [Float, Integer] Min poll interval when long polling in seconds
  # @param max_poll_interval [Float, Integer] Max poll interval when long polling in seconds.
  #   When requests fail, the client will backoff and this is the upper limit.
  # @param background_callback_interval [Float, Integer] Interval to poll when
  #   when polling in seconds.
  # @param headers [Hash] extra HTTP headers to be set on the polling requests.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def initialize(base_url, enable_long_polling: true,
                           enable_chunked_encoding: true,
                           min_poll_interval: 0.1,
                           max_poll_interval: 180,
                           background_callback_interval: 60,
                           headers: {})

    @uri = URI(base_url)
    @enable_long_polling = enable_long_polling
    @enable_chunked_encoding = enable_chunked_encoding
    @min_poll_interval = min_poll_interval
    @max_poll_interval = max_poll_interval
    @background_callback_interval = background_callback_interval
    @headers = headers
    @client_id = SecureRandom.hex
    @channels = {}
    @status = STOPPED
    @mutex = Mutex.new
    @stats = Stats.new(0, 0)
  end

  # Starts a background thread that polls the message bus endpoint
  # for the given base_url.
  #
  # Intervals for long polling can be configured via min_poll_interval and
  # max_poll_interval.
  #
  # Intervals for polling can be configured via background_callback_interval.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def start
    @mutex.synchronize do
      return if started?

      @status = STARTED

      thread = Thread.new do
        begin
          while started?
            unless @channels.empty?
              poll
              @stats.success += 1
              @stats.failed = 0
            end

            sleep interval
          end
        rescue StandardError => e
          @stats.failed += 1
          warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
          sleep interval
          retry
        ensure
          stop
        end
      end

      thread.abort_on_exception = true
    end

    self
  end

  # Stops the client from polling the message bus endpoint.
  #
  # @return [Integer] the current status of the client
  def stop
    @status = STOPPED
  end

  # Subscribes to a channel which executes the given callback when a message
  # is published to the channel
  #
  # @example Subscribing to a channel for message
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #
  #   client.subscribe("/test") do |payload, _message_id, _global_id|
  #     puts payload
  #   end
  #
  # A last_message_id may be provided.
  #  * -1 will subscribe to all new messages
  #  * -2 will receive last message + all new messages
  #  * -3 will receive last 2 message + all new messages
  #
  # @example Subscribing to a channel with `last_message_id`
  #   client.subscribe("/test", last_message_id: -2) do |payload|
  #     puts payload
  #   end
  #
  # @param channel [String] channel to listen for messages on
  # @param last_message_id [Integer] last message id to start polling on.
  #
  # @yield [data, message_id, global_id]
  #  callback to be executed whenever a message is received
  #
  # @yieldparam data [Hash] data payload of the message received on the channel
  # @yieldparam message_id [Integer] id of the message in the channel
  # @yieldparam global_id [Integer] id of the message in the global backlog
  # @yieldreturn [void]
  #
  # @return [Integer] the current status of the client
  def subscribe(channel, last_message_id: nil, &callback)
    raise InvalidChannel unless channel.to_s.start_with?("/")
    raise MissingBlock unless block_given?

    last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

    @channels[channel] ||= Channel.new
    channel = @channels[channel]
    channel.last_message_id = last_message_id if last_message_id
    channel.callbacks.push(callback)
    start if stopped?
  end

  # unsubscribes from a channel
  #
  # @example Unsubscribing from a channel
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #   callback = -> { |payload| puts payload }
  #   client.subscribe("/test", &callback)
  #   client.unsubscribe("/test")
  #
  # If a callback is given, only the specific callback will be unsubscribed.
  #
  # @example Unsubscribing a callback from a channel
  #   client.unsubscribe("/test", &callback)
  #
  # When the client does not have any channels left, it will stop polling and
  # waits until a new subscription is started.
  #
  # @param channel [String] channel to unsubscribe
  # @yield [data, global_id, message_id] specific callback to unsubscribe
  #
  # @return [Integer] the current status of the client
  def unsubscribe(channel, &callback)
    if callback
      @channels[channel].callbacks.delete(callback)
      remove_channel(channel) if @channels[channel].callbacks.empty?
    else
      remove_channel(channel)
    end

    stop if @channels.empty?
    @status
  end

  private

  def stopped?
    @status == STOPPED
  end

  def started?
    @status == STARTED
  end

  def remove_channel(channel)
    @channels.delete(channel)
  end

  def interval
    if @enable_long_polling
      if (failed_count = @stats.failed) > 2
        (@min_poll_interval * 2**failed_count).clamp(
          @min_poll_interval, @max_poll_interval
        )
      else
        @min_poll_interval
      end
    else
      @background_callback_interval
    end
  end

  def poll
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = true if @uri.scheme == 'https'
    request = Net::HTTP::Post.new(request_path, headers)
    request.body = poll_payload

    if @enable_long_polling
      buffer = +""

      http.request(request) do |response|
        response.read_body do |chunk|
          unless chunk.empty?
            buffer << chunk
            process_buffer(buffer)
          end
        end
      end
    else
      response = http.request(request)
      notify_channels(JSON.parse(response.body))
    end
  end

  def is_chunked?
    !headers["Dont-Chunk"]
  end

  def process_buffer(buffer)
    index = buffer.index(CHUNK_SEPARATOR)

    if is_chunked?
      return unless index

      messages = buffer[0..(index - 1)]
      buffer.slice!("#{messages}#{CHUNK_SEPARATOR}")
    else
      messages = buffer[0..-1]
      buffer.slice!(messages)
    end

    notify_channels(JSON.parse(messages))
  end

  def notify_channels(messages)
    messages.each do |message|
      current_channel = message['channel']

      if current_channel == STATUS_CHANNEL
        message["data"].each do |channel_name, last_message_id|
          if (channel = @channels[channel_name])
            channel.last_message_id = last_message_id
          end
        end
      else
        @channels.each do |channel_name, channel|
          next unless channel_name == current_channel

          channel.last_message_id = message['message_id']

          channel.callbacks.each do |callback|
            callback.call(
              message['data'],
              channel.last_message_id,
              message['global_id']
            )
          end
        end
      end
    end
  end

  def poll_payload
    payload = {}

    @channels.each do |channel_name, channel|
      payload[channel_name] = channel.last_message_id
    end

    payload.to_json
  end

  def request_path
    "/message-bus/#{@client_id}/poll"
  end

  def headers
    headers = {}
    headers['Content-Type'] = 'application/json'
    headers['X-Silence-logger'] = 'true'

    if !@enable_long_polling || !@enable_chunked_encoding
      headers['Dont-Chunk'] = 'true'
    end

    headers.merge!(@headers)
  end
end

#channelsHash (readonly)

Returns a map of the channels that the client is subscribed to.

Returns:

  • (Hash)

    a map of the channels that the client is subscribed to



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/message_bus/http_client.rb', line 31

class HTTPClient
  class InvalidChannel < StandardError; end
  class MissingBlock < StandardError; end

  attr_reader :channels,
              :stats

  attr_accessor :enable_long_polling,
                :status,
                :enable_chunked_encoding,
                :min_poll_interval,
                :max_poll_interval,
                :background_callback_interval

  CHUNK_SEPARATOR = "\r\n|\r\n".freeze
  private_constant :CHUNK_SEPARATOR
  STATUS_CHANNEL = "/__status".freeze
  private_constant :STATUS_CHANNEL

  STOPPED = 0
  STARTED = 1

  Stats = Struct.new(:failed, :success)
  private_constant :Stats

  # @param base_url [String] Base URL of the message_bus server to connect to
  # @param enable_long_polling [Boolean] Enable long polling
  # @param enable_chunked_encoding [Boolean] Enable chunk encoding
  # @param min_poll_interval [Float, Integer] Min poll interval when long polling in seconds
  # @param max_poll_interval [Float, Integer] Max poll interval when long polling in seconds.
  #   When requests fail, the client will backoff and this is the upper limit.
  # @param background_callback_interval [Float, Integer] Interval to poll when
  #   when polling in seconds.
  # @param headers [Hash] extra HTTP headers to be set on the polling requests.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def initialize(base_url, enable_long_polling: true,
                           enable_chunked_encoding: true,
                           min_poll_interval: 0.1,
                           max_poll_interval: 180,
                           background_callback_interval: 60,
                           headers: {})

    @uri = URI(base_url)
    @enable_long_polling = enable_long_polling
    @enable_chunked_encoding = enable_chunked_encoding
    @min_poll_interval = min_poll_interval
    @max_poll_interval = max_poll_interval
    @background_callback_interval = background_callback_interval
    @headers = headers
    @client_id = SecureRandom.hex
    @channels = {}
    @status = STOPPED
    @mutex = Mutex.new
    @stats = Stats.new(0, 0)
  end

  # Starts a background thread that polls the message bus endpoint
  # for the given base_url.
  #
  # Intervals for long polling can be configured via min_poll_interval and
  # max_poll_interval.
  #
  # Intervals for polling can be configured via background_callback_interval.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def start
    @mutex.synchronize do
      return if started?

      @status = STARTED

      thread = Thread.new do
        begin
          while started?
            unless @channels.empty?
              poll
              @stats.success += 1
              @stats.failed = 0
            end

            sleep interval
          end
        rescue StandardError => e
          @stats.failed += 1
          warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
          sleep interval
          retry
        ensure
          stop
        end
      end

      thread.abort_on_exception = true
    end

    self
  end

  # Stops the client from polling the message bus endpoint.
  #
  # @return [Integer] the current status of the client
  def stop
    @status = STOPPED
  end

  # Subscribes to a channel which executes the given callback when a message
  # is published to the channel
  #
  # @example Subscribing to a channel for message
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #
  #   client.subscribe("/test") do |payload, _message_id, _global_id|
  #     puts payload
  #   end
  #
  # A last_message_id may be provided.
  #  * -1 will subscribe to all new messages
  #  * -2 will receive last message + all new messages
  #  * -3 will receive last 2 message + all new messages
  #
  # @example Subscribing to a channel with `last_message_id`
  #   client.subscribe("/test", last_message_id: -2) do |payload|
  #     puts payload
  #   end
  #
  # @param channel [String] channel to listen for messages on
  # @param last_message_id [Integer] last message id to start polling on.
  #
  # @yield [data, message_id, global_id]
  #  callback to be executed whenever a message is received
  #
  # @yieldparam data [Hash] data payload of the message received on the channel
  # @yieldparam message_id [Integer] id of the message in the channel
  # @yieldparam global_id [Integer] id of the message in the global backlog
  # @yieldreturn [void]
  #
  # @return [Integer] the current status of the client
  def subscribe(channel, last_message_id: nil, &callback)
    raise InvalidChannel unless channel.to_s.start_with?("/")
    raise MissingBlock unless block_given?

    last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

    @channels[channel] ||= Channel.new
    channel = @channels[channel]
    channel.last_message_id = last_message_id if last_message_id
    channel.callbacks.push(callback)
    start if stopped?
  end

  # unsubscribes from a channel
  #
  # @example Unsubscribing from a channel
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #   callback = -> { |payload| puts payload }
  #   client.subscribe("/test", &callback)
  #   client.unsubscribe("/test")
  #
  # If a callback is given, only the specific callback will be unsubscribed.
  #
  # @example Unsubscribing a callback from a channel
  #   client.unsubscribe("/test", &callback)
  #
  # When the client does not have any channels left, it will stop polling and
  # waits until a new subscription is started.
  #
  # @param channel [String] channel to unsubscribe
  # @yield [data, global_id, message_id] specific callback to unsubscribe
  #
  # @return [Integer] the current status of the client
  def unsubscribe(channel, &callback)
    if callback
      @channels[channel].callbacks.delete(callback)
      remove_channel(channel) if @channels[channel].callbacks.empty?
    else
      remove_channel(channel)
    end

    stop if @channels.empty?
    @status
  end

  private

  def stopped?
    @status == STOPPED
  end

  def started?
    @status == STARTED
  end

  def remove_channel(channel)
    @channels.delete(channel)
  end

  def interval
    if @enable_long_polling
      if (failed_count = @stats.failed) > 2
        (@min_poll_interval * 2**failed_count).clamp(
          @min_poll_interval, @max_poll_interval
        )
      else
        @min_poll_interval
      end
    else
      @background_callback_interval
    end
  end

  def poll
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = true if @uri.scheme == 'https'
    request = Net::HTTP::Post.new(request_path, headers)
    request.body = poll_payload

    if @enable_long_polling
      buffer = +""

      http.request(request) do |response|
        response.read_body do |chunk|
          unless chunk.empty?
            buffer << chunk
            process_buffer(buffer)
          end
        end
      end
    else
      response = http.request(request)
      notify_channels(JSON.parse(response.body))
    end
  end

  def is_chunked?
    !headers["Dont-Chunk"]
  end

  def process_buffer(buffer)
    index = buffer.index(CHUNK_SEPARATOR)

    if is_chunked?
      return unless index

      messages = buffer[0..(index - 1)]
      buffer.slice!("#{messages}#{CHUNK_SEPARATOR}")
    else
      messages = buffer[0..-1]
      buffer.slice!(messages)
    end

    notify_channels(JSON.parse(messages))
  end

  def notify_channels(messages)
    messages.each do |message|
      current_channel = message['channel']

      if current_channel == STATUS_CHANNEL
        message["data"].each do |channel_name, last_message_id|
          if (channel = @channels[channel_name])
            channel.last_message_id = last_message_id
          end
        end
      else
        @channels.each do |channel_name, channel|
          next unless channel_name == current_channel

          channel.last_message_id = message['message_id']

          channel.callbacks.each do |callback|
            callback.call(
              message['data'],
              channel.last_message_id,
              message['global_id']
            )
          end
        end
      end
    end
  end

  def poll_payload
    payload = {}

    @channels.each do |channel_name, channel|
      payload[channel_name] = channel.last_message_id
    end

    payload.to_json
  end

  def request_path
    "/message-bus/#{@client_id}/poll"
  end

  def headers
    headers = {}
    headers['Content-Type'] = 'application/json'
    headers['X-Silence-logger'] = 'true'

    if !@enable_long_polling || !@enable_chunked_encoding
      headers['Dont-Chunk'] = 'true'
    end

    headers.merge!(@headers)
  end
end

#enable_chunked_encodingBoolean

Returns whether chunked encoding is enabled.

Returns:

  • (Boolean)

    whether chunked encoding is enabled



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/message_bus/http_client.rb', line 31

class HTTPClient
  class InvalidChannel < StandardError; end
  class MissingBlock < StandardError; end

  attr_reader :channels,
              :stats

  attr_accessor :enable_long_polling,
                :status,
                :enable_chunked_encoding,
                :min_poll_interval,
                :max_poll_interval,
                :background_callback_interval

  CHUNK_SEPARATOR = "\r\n|\r\n".freeze
  private_constant :CHUNK_SEPARATOR
  STATUS_CHANNEL = "/__status".freeze
  private_constant :STATUS_CHANNEL

  STOPPED = 0
  STARTED = 1

  Stats = Struct.new(:failed, :success)
  private_constant :Stats

  # @param base_url [String] Base URL of the message_bus server to connect to
  # @param enable_long_polling [Boolean] Enable long polling
  # @param enable_chunked_encoding [Boolean] Enable chunk encoding
  # @param min_poll_interval [Float, Integer] Min poll interval when long polling in seconds
  # @param max_poll_interval [Float, Integer] Max poll interval when long polling in seconds.
  #   When requests fail, the client will backoff and this is the upper limit.
  # @param background_callback_interval [Float, Integer] Interval to poll when
  #   when polling in seconds.
  # @param headers [Hash] extra HTTP headers to be set on the polling requests.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def initialize(base_url, enable_long_polling: true,
                           enable_chunked_encoding: true,
                           min_poll_interval: 0.1,
                           max_poll_interval: 180,
                           background_callback_interval: 60,
                           headers: {})

    @uri = URI(base_url)
    @enable_long_polling = enable_long_polling
    @enable_chunked_encoding = enable_chunked_encoding
    @min_poll_interval = min_poll_interval
    @max_poll_interval = max_poll_interval
    @background_callback_interval = background_callback_interval
    @headers = headers
    @client_id = SecureRandom.hex
    @channels = {}
    @status = STOPPED
    @mutex = Mutex.new
    @stats = Stats.new(0, 0)
  end

  # Starts a background thread that polls the message bus endpoint
  # for the given base_url.
  #
  # Intervals for long polling can be configured via min_poll_interval and
  # max_poll_interval.
  #
  # Intervals for polling can be configured via background_callback_interval.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def start
    @mutex.synchronize do
      return if started?

      @status = STARTED

      thread = Thread.new do
        begin
          while started?
            unless @channels.empty?
              poll
              @stats.success += 1
              @stats.failed = 0
            end

            sleep interval
          end
        rescue StandardError => e
          @stats.failed += 1
          warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
          sleep interval
          retry
        ensure
          stop
        end
      end

      thread.abort_on_exception = true
    end

    self
  end

  # Stops the client from polling the message bus endpoint.
  #
  # @return [Integer] the current status of the client
  def stop
    @status = STOPPED
  end

  # Subscribes to a channel which executes the given callback when a message
  # is published to the channel
  #
  # @example Subscribing to a channel for message
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #
  #   client.subscribe("/test") do |payload, _message_id, _global_id|
  #     puts payload
  #   end
  #
  # A last_message_id may be provided.
  #  * -1 will subscribe to all new messages
  #  * -2 will receive last message + all new messages
  #  * -3 will receive last 2 message + all new messages
  #
  # @example Subscribing to a channel with `last_message_id`
  #   client.subscribe("/test", last_message_id: -2) do |payload|
  #     puts payload
  #   end
  #
  # @param channel [String] channel to listen for messages on
  # @param last_message_id [Integer] last message id to start polling on.
  #
  # @yield [data, message_id, global_id]
  #  callback to be executed whenever a message is received
  #
  # @yieldparam data [Hash] data payload of the message received on the channel
  # @yieldparam message_id [Integer] id of the message in the channel
  # @yieldparam global_id [Integer] id of the message in the global backlog
  # @yieldreturn [void]
  #
  # @return [Integer] the current status of the client
  def subscribe(channel, last_message_id: nil, &callback)
    raise InvalidChannel unless channel.to_s.start_with?("/")
    raise MissingBlock unless block_given?

    last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

    @channels[channel] ||= Channel.new
    channel = @channels[channel]
    channel.last_message_id = last_message_id if last_message_id
    channel.callbacks.push(callback)
    start if stopped?
  end

  # unsubscribes from a channel
  #
  # @example Unsubscribing from a channel
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #   callback = -> { |payload| puts payload }
  #   client.subscribe("/test", &callback)
  #   client.unsubscribe("/test")
  #
  # If a callback is given, only the specific callback will be unsubscribed.
  #
  # @example Unsubscribing a callback from a channel
  #   client.unsubscribe("/test", &callback)
  #
  # When the client does not have any channels left, it will stop polling and
  # waits until a new subscription is started.
  #
  # @param channel [String] channel to unsubscribe
  # @yield [data, global_id, message_id] specific callback to unsubscribe
  #
  # @return [Integer] the current status of the client
  def unsubscribe(channel, &callback)
    if callback
      @channels[channel].callbacks.delete(callback)
      remove_channel(channel) if @channels[channel].callbacks.empty?
    else
      remove_channel(channel)
    end

    stop if @channels.empty?
    @status
  end

  private

  def stopped?
    @status == STOPPED
  end

  def started?
    @status == STARTED
  end

  def remove_channel(channel)
    @channels.delete(channel)
  end

  def interval
    if @enable_long_polling
      if (failed_count = @stats.failed) > 2
        (@min_poll_interval * 2**failed_count).clamp(
          @min_poll_interval, @max_poll_interval
        )
      else
        @min_poll_interval
      end
    else
      @background_callback_interval
    end
  end

  def poll
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = true if @uri.scheme == 'https'
    request = Net::HTTP::Post.new(request_path, headers)
    request.body = poll_payload

    if @enable_long_polling
      buffer = +""

      http.request(request) do |response|
        response.read_body do |chunk|
          unless chunk.empty?
            buffer << chunk
            process_buffer(buffer)
          end
        end
      end
    else
      response = http.request(request)
      notify_channels(JSON.parse(response.body))
    end
  end

  def is_chunked?
    !headers["Dont-Chunk"]
  end

  def process_buffer(buffer)
    index = buffer.index(CHUNK_SEPARATOR)

    if is_chunked?
      return unless index

      messages = buffer[0..(index - 1)]
      buffer.slice!("#{messages}#{CHUNK_SEPARATOR}")
    else
      messages = buffer[0..-1]
      buffer.slice!(messages)
    end

    notify_channels(JSON.parse(messages))
  end

  def notify_channels(messages)
    messages.each do |message|
      current_channel = message['channel']

      if current_channel == STATUS_CHANNEL
        message["data"].each do |channel_name, last_message_id|
          if (channel = @channels[channel_name])
            channel.last_message_id = last_message_id
          end
        end
      else
        @channels.each do |channel_name, channel|
          next unless channel_name == current_channel

          channel.last_message_id = message['message_id']

          channel.callbacks.each do |callback|
            callback.call(
              message['data'],
              channel.last_message_id,
              message['global_id']
            )
          end
        end
      end
    end
  end

  def poll_payload
    payload = {}

    @channels.each do |channel_name, channel|
      payload[channel_name] = channel.last_message_id
    end

    payload.to_json
  end

  def request_path
    "/message-bus/#{@client_id}/poll"
  end

  def headers
    headers = {}
    headers['Content-Type'] = 'application/json'
    headers['X-Silence-logger'] = 'true'

    if !@enable_long_polling || !@enable_chunked_encoding
      headers['Dont-Chunk'] = 'true'
    end

    headers.merge!(@headers)
  end
end

#enable_long_pollingBoolean

Returns whether long polling is enabled.

Returns:

  • (Boolean)

    whether long polling is enabled



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/message_bus/http_client.rb', line 31

class HTTPClient
  class InvalidChannel < StandardError; end
  class MissingBlock < StandardError; end

  attr_reader :channels,
              :stats

  attr_accessor :enable_long_polling,
                :status,
                :enable_chunked_encoding,
                :min_poll_interval,
                :max_poll_interval,
                :background_callback_interval

  CHUNK_SEPARATOR = "\r\n|\r\n".freeze
  private_constant :CHUNK_SEPARATOR
  STATUS_CHANNEL = "/__status".freeze
  private_constant :STATUS_CHANNEL

  STOPPED = 0
  STARTED = 1

  Stats = Struct.new(:failed, :success)
  private_constant :Stats

  # @param base_url [String] Base URL of the message_bus server to connect to
  # @param enable_long_polling [Boolean] Enable long polling
  # @param enable_chunked_encoding [Boolean] Enable chunk encoding
  # @param min_poll_interval [Float, Integer] Min poll interval when long polling in seconds
  # @param max_poll_interval [Float, Integer] Max poll interval when long polling in seconds.
  #   When requests fail, the client will backoff and this is the upper limit.
  # @param background_callback_interval [Float, Integer] Interval to poll when
  #   when polling in seconds.
  # @param headers [Hash] extra HTTP headers to be set on the polling requests.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def initialize(base_url, enable_long_polling: true,
                           enable_chunked_encoding: true,
                           min_poll_interval: 0.1,
                           max_poll_interval: 180,
                           background_callback_interval: 60,
                           headers: {})

    @uri = URI(base_url)
    @enable_long_polling = enable_long_polling
    @enable_chunked_encoding = enable_chunked_encoding
    @min_poll_interval = min_poll_interval
    @max_poll_interval = max_poll_interval
    @background_callback_interval = background_callback_interval
    @headers = headers
    @client_id = SecureRandom.hex
    @channels = {}
    @status = STOPPED
    @mutex = Mutex.new
    @stats = Stats.new(0, 0)
  end

  # Starts a background thread that polls the message bus endpoint
  # for the given base_url.
  #
  # Intervals for long polling can be configured via min_poll_interval and
  # max_poll_interval.
  #
  # Intervals for polling can be configured via background_callback_interval.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def start
    @mutex.synchronize do
      return if started?

      @status = STARTED

      thread = Thread.new do
        begin
          while started?
            unless @channels.empty?
              poll
              @stats.success += 1
              @stats.failed = 0
            end

            sleep interval
          end
        rescue StandardError => e
          @stats.failed += 1
          warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
          sleep interval
          retry
        ensure
          stop
        end
      end

      thread.abort_on_exception = true
    end

    self
  end

  # Stops the client from polling the message bus endpoint.
  #
  # @return [Integer] the current status of the client
  def stop
    @status = STOPPED
  end

  # Subscribes to a channel which executes the given callback when a message
  # is published to the channel
  #
  # @example Subscribing to a channel for message
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #
  #   client.subscribe("/test") do |payload, _message_id, _global_id|
  #     puts payload
  #   end
  #
  # A last_message_id may be provided.
  #  * -1 will subscribe to all new messages
  #  * -2 will receive last message + all new messages
  #  * -3 will receive last 2 message + all new messages
  #
  # @example Subscribing to a channel with `last_message_id`
  #   client.subscribe("/test", last_message_id: -2) do |payload|
  #     puts payload
  #   end
  #
  # @param channel [String] channel to listen for messages on
  # @param last_message_id [Integer] last message id to start polling on.
  #
  # @yield [data, message_id, global_id]
  #  callback to be executed whenever a message is received
  #
  # @yieldparam data [Hash] data payload of the message received on the channel
  # @yieldparam message_id [Integer] id of the message in the channel
  # @yieldparam global_id [Integer] id of the message in the global backlog
  # @yieldreturn [void]
  #
  # @return [Integer] the current status of the client
  def subscribe(channel, last_message_id: nil, &callback)
    raise InvalidChannel unless channel.to_s.start_with?("/")
    raise MissingBlock unless block_given?

    last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

    @channels[channel] ||= Channel.new
    channel = @channels[channel]
    channel.last_message_id = last_message_id if last_message_id
    channel.callbacks.push(callback)
    start if stopped?
  end

  # unsubscribes from a channel
  #
  # @example Unsubscribing from a channel
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #   callback = -> { |payload| puts payload }
  #   client.subscribe("/test", &callback)
  #   client.unsubscribe("/test")
  #
  # If a callback is given, only the specific callback will be unsubscribed.
  #
  # @example Unsubscribing a callback from a channel
  #   client.unsubscribe("/test", &callback)
  #
  # When the client does not have any channels left, it will stop polling and
  # waits until a new subscription is started.
  #
  # @param channel [String] channel to unsubscribe
  # @yield [data, global_id, message_id] specific callback to unsubscribe
  #
  # @return [Integer] the current status of the client
  def unsubscribe(channel, &callback)
    if callback
      @channels[channel].callbacks.delete(callback)
      remove_channel(channel) if @channels[channel].callbacks.empty?
    else
      remove_channel(channel)
    end

    stop if @channels.empty?
    @status
  end

  private

  def stopped?
    @status == STOPPED
  end

  def started?
    @status == STARTED
  end

  def remove_channel(channel)
    @channels.delete(channel)
  end

  def interval
    if @enable_long_polling
      if (failed_count = @stats.failed) > 2
        (@min_poll_interval * 2**failed_count).clamp(
          @min_poll_interval, @max_poll_interval
        )
      else
        @min_poll_interval
      end
    else
      @background_callback_interval
    end
  end

  def poll
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = true if @uri.scheme == 'https'
    request = Net::HTTP::Post.new(request_path, headers)
    request.body = poll_payload

    if @enable_long_polling
      buffer = +""

      http.request(request) do |response|
        response.read_body do |chunk|
          unless chunk.empty?
            buffer << chunk
            process_buffer(buffer)
          end
        end
      end
    else
      response = http.request(request)
      notify_channels(JSON.parse(response.body))
    end
  end

  def is_chunked?
    !headers["Dont-Chunk"]
  end

  def process_buffer(buffer)
    index = buffer.index(CHUNK_SEPARATOR)

    if is_chunked?
      return unless index

      messages = buffer[0..(index - 1)]
      buffer.slice!("#{messages}#{CHUNK_SEPARATOR}")
    else
      messages = buffer[0..-1]
      buffer.slice!(messages)
    end

    notify_channels(JSON.parse(messages))
  end

  def notify_channels(messages)
    messages.each do |message|
      current_channel = message['channel']

      if current_channel == STATUS_CHANNEL
        message["data"].each do |channel_name, last_message_id|
          if (channel = @channels[channel_name])
            channel.last_message_id = last_message_id
          end
        end
      else
        @channels.each do |channel_name, channel|
          next unless channel_name == current_channel

          channel.last_message_id = message['message_id']

          channel.callbacks.each do |callback|
            callback.call(
              message['data'],
              channel.last_message_id,
              message['global_id']
            )
          end
        end
      end
    end
  end

  def poll_payload
    payload = {}

    @channels.each do |channel_name, channel|
      payload[channel_name] = channel.last_message_id
    end

    payload.to_json
  end

  def request_path
    "/message-bus/#{@client_id}/poll"
  end

  def headers
    headers = {}
    headers['Content-Type'] = 'application/json'
    headers['X-Silence-logger'] = 'true'

    if !@enable_long_polling || !@enable_chunked_encoding
      headers['Dont-Chunk'] = 'true'
    end

    headers.merge!(@headers)
  end
end

#max_poll_intervalFloat

Returns the max poll interval for long polling in seconds.

Returns:

  • (Float)

    the max poll interval for long polling in seconds



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/message_bus/http_client.rb', line 31

class HTTPClient
  class InvalidChannel < StandardError; end
  class MissingBlock < StandardError; end

  attr_reader :channels,
              :stats

  attr_accessor :enable_long_polling,
                :status,
                :enable_chunked_encoding,
                :min_poll_interval,
                :max_poll_interval,
                :background_callback_interval

  CHUNK_SEPARATOR = "\r\n|\r\n".freeze
  private_constant :CHUNK_SEPARATOR
  STATUS_CHANNEL = "/__status".freeze
  private_constant :STATUS_CHANNEL

  STOPPED = 0
  STARTED = 1

  Stats = Struct.new(:failed, :success)
  private_constant :Stats

  # @param base_url [String] Base URL of the message_bus server to connect to
  # @param enable_long_polling [Boolean] Enable long polling
  # @param enable_chunked_encoding [Boolean] Enable chunk encoding
  # @param min_poll_interval [Float, Integer] Min poll interval when long polling in seconds
  # @param max_poll_interval [Float, Integer] Max poll interval when long polling in seconds.
  #   When requests fail, the client will backoff and this is the upper limit.
  # @param background_callback_interval [Float, Integer] Interval to poll when
  #   when polling in seconds.
  # @param headers [Hash] extra HTTP headers to be set on the polling requests.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def initialize(base_url, enable_long_polling: true,
                           enable_chunked_encoding: true,
                           min_poll_interval: 0.1,
                           max_poll_interval: 180,
                           background_callback_interval: 60,
                           headers: {})

    @uri = URI(base_url)
    @enable_long_polling = enable_long_polling
    @enable_chunked_encoding = enable_chunked_encoding
    @min_poll_interval = min_poll_interval
    @max_poll_interval = max_poll_interval
    @background_callback_interval = background_callback_interval
    @headers = headers
    @client_id = SecureRandom.hex
    @channels = {}
    @status = STOPPED
    @mutex = Mutex.new
    @stats = Stats.new(0, 0)
  end

  # Starts a background thread that polls the message bus endpoint
  # for the given base_url.
  #
  # Intervals for long polling can be configured via min_poll_interval and
  # max_poll_interval.
  #
  # Intervals for polling can be configured via background_callback_interval.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def start
    @mutex.synchronize do
      return if started?

      @status = STARTED

      thread = Thread.new do
        begin
          while started?
            unless @channels.empty?
              poll
              @stats.success += 1
              @stats.failed = 0
            end

            sleep interval
          end
        rescue StandardError => e
          @stats.failed += 1
          warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
          sleep interval
          retry
        ensure
          stop
        end
      end

      thread.abort_on_exception = true
    end

    self
  end

  # Stops the client from polling the message bus endpoint.
  #
  # @return [Integer] the current status of the client
  def stop
    @status = STOPPED
  end

  # Subscribes to a channel which executes the given callback when a message
  # is published to the channel
  #
  # @example Subscribing to a channel for message
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #
  #   client.subscribe("/test") do |payload, _message_id, _global_id|
  #     puts payload
  #   end
  #
  # A last_message_id may be provided.
  #  * -1 will subscribe to all new messages
  #  * -2 will receive last message + all new messages
  #  * -3 will receive last 2 message + all new messages
  #
  # @example Subscribing to a channel with `last_message_id`
  #   client.subscribe("/test", last_message_id: -2) do |payload|
  #     puts payload
  #   end
  #
  # @param channel [String] channel to listen for messages on
  # @param last_message_id [Integer] last message id to start polling on.
  #
  # @yield [data, message_id, global_id]
  #  callback to be executed whenever a message is received
  #
  # @yieldparam data [Hash] data payload of the message received on the channel
  # @yieldparam message_id [Integer] id of the message in the channel
  # @yieldparam global_id [Integer] id of the message in the global backlog
  # @yieldreturn [void]
  #
  # @return [Integer] the current status of the client
  def subscribe(channel, last_message_id: nil, &callback)
    raise InvalidChannel unless channel.to_s.start_with?("/")
    raise MissingBlock unless block_given?

    last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

    @channels[channel] ||= Channel.new
    channel = @channels[channel]
    channel.last_message_id = last_message_id if last_message_id
    channel.callbacks.push(callback)
    start if stopped?
  end

  # unsubscribes from a channel
  #
  # @example Unsubscribing from a channel
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #   callback = -> { |payload| puts payload }
  #   client.subscribe("/test", &callback)
  #   client.unsubscribe("/test")
  #
  # If a callback is given, only the specific callback will be unsubscribed.
  #
  # @example Unsubscribing a callback from a channel
  #   client.unsubscribe("/test", &callback)
  #
  # When the client does not have any channels left, it will stop polling and
  # waits until a new subscription is started.
  #
  # @param channel [String] channel to unsubscribe
  # @yield [data, global_id, message_id] specific callback to unsubscribe
  #
  # @return [Integer] the current status of the client
  def unsubscribe(channel, &callback)
    if callback
      @channels[channel].callbacks.delete(callback)
      remove_channel(channel) if @channels[channel].callbacks.empty?
    else
      remove_channel(channel)
    end

    stop if @channels.empty?
    @status
  end

  private

  def stopped?
    @status == STOPPED
  end

  def started?
    @status == STARTED
  end

  def remove_channel(channel)
    @channels.delete(channel)
  end

  def interval
    if @enable_long_polling
      if (failed_count = @stats.failed) > 2
        (@min_poll_interval * 2**failed_count).clamp(
          @min_poll_interval, @max_poll_interval
        )
      else
        @min_poll_interval
      end
    else
      @background_callback_interval
    end
  end

  def poll
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = true if @uri.scheme == 'https'
    request = Net::HTTP::Post.new(request_path, headers)
    request.body = poll_payload

    if @enable_long_polling
      buffer = +""

      http.request(request) do |response|
        response.read_body do |chunk|
          unless chunk.empty?
            buffer << chunk
            process_buffer(buffer)
          end
        end
      end
    else
      response = http.request(request)
      notify_channels(JSON.parse(response.body))
    end
  end

  def is_chunked?
    !headers["Dont-Chunk"]
  end

  def process_buffer(buffer)
    index = buffer.index(CHUNK_SEPARATOR)

    if is_chunked?
      return unless index

      messages = buffer[0..(index - 1)]
      buffer.slice!("#{messages}#{CHUNK_SEPARATOR}")
    else
      messages = buffer[0..-1]
      buffer.slice!(messages)
    end

    notify_channels(JSON.parse(messages))
  end

  def notify_channels(messages)
    messages.each do |message|
      current_channel = message['channel']

      if current_channel == STATUS_CHANNEL
        message["data"].each do |channel_name, last_message_id|
          if (channel = @channels[channel_name])
            channel.last_message_id = last_message_id
          end
        end
      else
        @channels.each do |channel_name, channel|
          next unless channel_name == current_channel

          channel.last_message_id = message['message_id']

          channel.callbacks.each do |callback|
            callback.call(
              message['data'],
              channel.last_message_id,
              message['global_id']
            )
          end
        end
      end
    end
  end

  def poll_payload
    payload = {}

    @channels.each do |channel_name, channel|
      payload[channel_name] = channel.last_message_id
    end

    payload.to_json
  end

  def request_path
    "/message-bus/#{@client_id}/poll"
  end

  def headers
    headers = {}
    headers['Content-Type'] = 'application/json'
    headers['X-Silence-logger'] = 'true'

    if !@enable_long_polling || !@enable_chunked_encoding
      headers['Dont-Chunk'] = 'true'
    end

    headers.merge!(@headers)
  end
end

#min_poll_intervalFloat

Returns the min poll interval for long polling in seconds.

Returns:

  • (Float)

    the min poll interval for long polling in seconds



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/message_bus/http_client.rb', line 31

class HTTPClient
  class InvalidChannel < StandardError; end
  class MissingBlock < StandardError; end

  attr_reader :channels,
              :stats

  attr_accessor :enable_long_polling,
                :status,
                :enable_chunked_encoding,
                :min_poll_interval,
                :max_poll_interval,
                :background_callback_interval

  CHUNK_SEPARATOR = "\r\n|\r\n".freeze
  private_constant :CHUNK_SEPARATOR
  STATUS_CHANNEL = "/__status".freeze
  private_constant :STATUS_CHANNEL

  STOPPED = 0
  STARTED = 1

  Stats = Struct.new(:failed, :success)
  private_constant :Stats

  # @param base_url [String] Base URL of the message_bus server to connect to
  # @param enable_long_polling [Boolean] Enable long polling
  # @param enable_chunked_encoding [Boolean] Enable chunk encoding
  # @param min_poll_interval [Float, Integer] Min poll interval when long polling in seconds
  # @param max_poll_interval [Float, Integer] Max poll interval when long polling in seconds.
  #   When requests fail, the client will backoff and this is the upper limit.
  # @param background_callback_interval [Float, Integer] Interval to poll when
  #   when polling in seconds.
  # @param headers [Hash] extra HTTP headers to be set on the polling requests.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def initialize(base_url, enable_long_polling: true,
                           enable_chunked_encoding: true,
                           min_poll_interval: 0.1,
                           max_poll_interval: 180,
                           background_callback_interval: 60,
                           headers: {})

    @uri = URI(base_url)
    @enable_long_polling = enable_long_polling
    @enable_chunked_encoding = enable_chunked_encoding
    @min_poll_interval = min_poll_interval
    @max_poll_interval = max_poll_interval
    @background_callback_interval = background_callback_interval
    @headers = headers
    @client_id = SecureRandom.hex
    @channels = {}
    @status = STOPPED
    @mutex = Mutex.new
    @stats = Stats.new(0, 0)
  end

  # Starts a background thread that polls the message bus endpoint
  # for the given base_url.
  #
  # Intervals for long polling can be configured via min_poll_interval and
  # max_poll_interval.
  #
  # Intervals for polling can be configured via background_callback_interval.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def start
    @mutex.synchronize do
      return if started?

      @status = STARTED

      thread = Thread.new do
        begin
          while started?
            unless @channels.empty?
              poll
              @stats.success += 1
              @stats.failed = 0
            end

            sleep interval
          end
        rescue StandardError => e
          @stats.failed += 1
          warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
          sleep interval
          retry
        ensure
          stop
        end
      end

      thread.abort_on_exception = true
    end

    self
  end

  # Stops the client from polling the message bus endpoint.
  #
  # @return [Integer] the current status of the client
  def stop
    @status = STOPPED
  end

  # Subscribes to a channel which executes the given callback when a message
  # is published to the channel
  #
  # @example Subscribing to a channel for message
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #
  #   client.subscribe("/test") do |payload, _message_id, _global_id|
  #     puts payload
  #   end
  #
  # A last_message_id may be provided.
  #  * -1 will subscribe to all new messages
  #  * -2 will receive last message + all new messages
  #  * -3 will receive last 2 message + all new messages
  #
  # @example Subscribing to a channel with `last_message_id`
  #   client.subscribe("/test", last_message_id: -2) do |payload|
  #     puts payload
  #   end
  #
  # @param channel [String] channel to listen for messages on
  # @param last_message_id [Integer] last message id to start polling on.
  #
  # @yield [data, message_id, global_id]
  #  callback to be executed whenever a message is received
  #
  # @yieldparam data [Hash] data payload of the message received on the channel
  # @yieldparam message_id [Integer] id of the message in the channel
  # @yieldparam global_id [Integer] id of the message in the global backlog
  # @yieldreturn [void]
  #
  # @return [Integer] the current status of the client
  def subscribe(channel, last_message_id: nil, &callback)
    raise InvalidChannel unless channel.to_s.start_with?("/")
    raise MissingBlock unless block_given?

    last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

    @channels[channel] ||= Channel.new
    channel = @channels[channel]
    channel.last_message_id = last_message_id if last_message_id
    channel.callbacks.push(callback)
    start if stopped?
  end

  # unsubscribes from a channel
  #
  # @example Unsubscribing from a channel
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #   callback = -> { |payload| puts payload }
  #   client.subscribe("/test", &callback)
  #   client.unsubscribe("/test")
  #
  # If a callback is given, only the specific callback will be unsubscribed.
  #
  # @example Unsubscribing a callback from a channel
  #   client.unsubscribe("/test", &callback)
  #
  # When the client does not have any channels left, it will stop polling and
  # waits until a new subscription is started.
  #
  # @param channel [String] channel to unsubscribe
  # @yield [data, global_id, message_id] specific callback to unsubscribe
  #
  # @return [Integer] the current status of the client
  def unsubscribe(channel, &callback)
    if callback
      @channels[channel].callbacks.delete(callback)
      remove_channel(channel) if @channels[channel].callbacks.empty?
    else
      remove_channel(channel)
    end

    stop if @channels.empty?
    @status
  end

  private

  def stopped?
    @status == STOPPED
  end

  def started?
    @status == STARTED
  end

  def remove_channel(channel)
    @channels.delete(channel)
  end

  def interval
    if @enable_long_polling
      if (failed_count = @stats.failed) > 2
        (@min_poll_interval * 2**failed_count).clamp(
          @min_poll_interval, @max_poll_interval
        )
      else
        @min_poll_interval
      end
    else
      @background_callback_interval
    end
  end

  def poll
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = true if @uri.scheme == 'https'
    request = Net::HTTP::Post.new(request_path, headers)
    request.body = poll_payload

    if @enable_long_polling
      buffer = +""

      http.request(request) do |response|
        response.read_body do |chunk|
          unless chunk.empty?
            buffer << chunk
            process_buffer(buffer)
          end
        end
      end
    else
      response = http.request(request)
      notify_channels(JSON.parse(response.body))
    end
  end

  def is_chunked?
    !headers["Dont-Chunk"]
  end

  def process_buffer(buffer)
    index = buffer.index(CHUNK_SEPARATOR)

    if is_chunked?
      return unless index

      messages = buffer[0..(index - 1)]
      buffer.slice!("#{messages}#{CHUNK_SEPARATOR}")
    else
      messages = buffer[0..-1]
      buffer.slice!(messages)
    end

    notify_channels(JSON.parse(messages))
  end

  def notify_channels(messages)
    messages.each do |message|
      current_channel = message['channel']

      if current_channel == STATUS_CHANNEL
        message["data"].each do |channel_name, last_message_id|
          if (channel = @channels[channel_name])
            channel.last_message_id = last_message_id
          end
        end
      else
        @channels.each do |channel_name, channel|
          next unless channel_name == current_channel

          channel.last_message_id = message['message_id']

          channel.callbacks.each do |callback|
            callback.call(
              message['data'],
              channel.last_message_id,
              message['global_id']
            )
          end
        end
      end
    end
  end

  def poll_payload
    payload = {}

    @channels.each do |channel_name, channel|
      payload[channel_name] = channel.last_message_id
    end

    payload.to_json
  end

  def request_path
    "/message-bus/#{@client_id}/poll"
  end

  def headers
    headers = {}
    headers['Content-Type'] = 'application/json'
    headers['X-Silence-logger'] = 'true'

    if !@enable_long_polling || !@enable_chunked_encoding
      headers['Dont-Chunk'] = 'true'
    end

    headers.merge!(@headers)
  end
end

#statsStats (readonly)

Returns a Struct containing the statistics of failed and successful polling requests.

Returns:

  • (Stats)

    a Struct containing the statistics of failed and successful polling requests



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/message_bus/http_client.rb', line 31

class HTTPClient
  class InvalidChannel < StandardError; end
  class MissingBlock < StandardError; end

  attr_reader :channels,
              :stats

  attr_accessor :enable_long_polling,
                :status,
                :enable_chunked_encoding,
                :min_poll_interval,
                :max_poll_interval,
                :background_callback_interval

  CHUNK_SEPARATOR = "\r\n|\r\n".freeze
  private_constant :CHUNK_SEPARATOR
  STATUS_CHANNEL = "/__status".freeze
  private_constant :STATUS_CHANNEL

  STOPPED = 0
  STARTED = 1

  Stats = Struct.new(:failed, :success)
  private_constant :Stats

  # @param base_url [String] Base URL of the message_bus server to connect to
  # @param enable_long_polling [Boolean] Enable long polling
  # @param enable_chunked_encoding [Boolean] Enable chunk encoding
  # @param min_poll_interval [Float, Integer] Min poll interval when long polling in seconds
  # @param max_poll_interval [Float, Integer] Max poll interval when long polling in seconds.
  #   When requests fail, the client will backoff and this is the upper limit.
  # @param background_callback_interval [Float, Integer] Interval to poll when
  #   when polling in seconds.
  # @param headers [Hash] extra HTTP headers to be set on the polling requests.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def initialize(base_url, enable_long_polling: true,
                           enable_chunked_encoding: true,
                           min_poll_interval: 0.1,
                           max_poll_interval: 180,
                           background_callback_interval: 60,
                           headers: {})

    @uri = URI(base_url)
    @enable_long_polling = enable_long_polling
    @enable_chunked_encoding = enable_chunked_encoding
    @min_poll_interval = min_poll_interval
    @max_poll_interval = max_poll_interval
    @background_callback_interval = background_callback_interval
    @headers = headers
    @client_id = SecureRandom.hex
    @channels = {}
    @status = STOPPED
    @mutex = Mutex.new
    @stats = Stats.new(0, 0)
  end

  # Starts a background thread that polls the message bus endpoint
  # for the given base_url.
  #
  # Intervals for long polling can be configured via min_poll_interval and
  # max_poll_interval.
  #
  # Intervals for polling can be configured via background_callback_interval.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def start
    @mutex.synchronize do
      return if started?

      @status = STARTED

      thread = Thread.new do
        begin
          while started?
            unless @channels.empty?
              poll
              @stats.success += 1
              @stats.failed = 0
            end

            sleep interval
          end
        rescue StandardError => e
          @stats.failed += 1
          warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
          sleep interval
          retry
        ensure
          stop
        end
      end

      thread.abort_on_exception = true
    end

    self
  end

  # Stops the client from polling the message bus endpoint.
  #
  # @return [Integer] the current status of the client
  def stop
    @status = STOPPED
  end

  # Subscribes to a channel which executes the given callback when a message
  # is published to the channel
  #
  # @example Subscribing to a channel for message
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #
  #   client.subscribe("/test") do |payload, _message_id, _global_id|
  #     puts payload
  #   end
  #
  # A last_message_id may be provided.
  #  * -1 will subscribe to all new messages
  #  * -2 will receive last message + all new messages
  #  * -3 will receive last 2 message + all new messages
  #
  # @example Subscribing to a channel with `last_message_id`
  #   client.subscribe("/test", last_message_id: -2) do |payload|
  #     puts payload
  #   end
  #
  # @param channel [String] channel to listen for messages on
  # @param last_message_id [Integer] last message id to start polling on.
  #
  # @yield [data, message_id, global_id]
  #  callback to be executed whenever a message is received
  #
  # @yieldparam data [Hash] data payload of the message received on the channel
  # @yieldparam message_id [Integer] id of the message in the channel
  # @yieldparam global_id [Integer] id of the message in the global backlog
  # @yieldreturn [void]
  #
  # @return [Integer] the current status of the client
  def subscribe(channel, last_message_id: nil, &callback)
    raise InvalidChannel unless channel.to_s.start_with?("/")
    raise MissingBlock unless block_given?

    last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

    @channels[channel] ||= Channel.new
    channel = @channels[channel]
    channel.last_message_id = last_message_id if last_message_id
    channel.callbacks.push(callback)
    start if stopped?
  end

  # unsubscribes from a channel
  #
  # @example Unsubscribing from a channel
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #   callback = -> { |payload| puts payload }
  #   client.subscribe("/test", &callback)
  #   client.unsubscribe("/test")
  #
  # If a callback is given, only the specific callback will be unsubscribed.
  #
  # @example Unsubscribing a callback from a channel
  #   client.unsubscribe("/test", &callback)
  #
  # When the client does not have any channels left, it will stop polling and
  # waits until a new subscription is started.
  #
  # @param channel [String] channel to unsubscribe
  # @yield [data, global_id, message_id] specific callback to unsubscribe
  #
  # @return [Integer] the current status of the client
  def unsubscribe(channel, &callback)
    if callback
      @channels[channel].callbacks.delete(callback)
      remove_channel(channel) if @channels[channel].callbacks.empty?
    else
      remove_channel(channel)
    end

    stop if @channels.empty?
    @status
  end

  private

  def stopped?
    @status == STOPPED
  end

  def started?
    @status == STARTED
  end

  def remove_channel(channel)
    @channels.delete(channel)
  end

  def interval
    if @enable_long_polling
      if (failed_count = @stats.failed) > 2
        (@min_poll_interval * 2**failed_count).clamp(
          @min_poll_interval, @max_poll_interval
        )
      else
        @min_poll_interval
      end
    else
      @background_callback_interval
    end
  end

  def poll
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = true if @uri.scheme == 'https'
    request = Net::HTTP::Post.new(request_path, headers)
    request.body = poll_payload

    if @enable_long_polling
      buffer = +""

      http.request(request) do |response|
        response.read_body do |chunk|
          unless chunk.empty?
            buffer << chunk
            process_buffer(buffer)
          end
        end
      end
    else
      response = http.request(request)
      notify_channels(JSON.parse(response.body))
    end
  end

  def is_chunked?
    !headers["Dont-Chunk"]
  end

  def process_buffer(buffer)
    index = buffer.index(CHUNK_SEPARATOR)

    if is_chunked?
      return unless index

      messages = buffer[0..(index - 1)]
      buffer.slice!("#{messages}#{CHUNK_SEPARATOR}")
    else
      messages = buffer[0..-1]
      buffer.slice!(messages)
    end

    notify_channels(JSON.parse(messages))
  end

  def notify_channels(messages)
    messages.each do |message|
      current_channel = message['channel']

      if current_channel == STATUS_CHANNEL
        message["data"].each do |channel_name, last_message_id|
          if (channel = @channels[channel_name])
            channel.last_message_id = last_message_id
          end
        end
      else
        @channels.each do |channel_name, channel|
          next unless channel_name == current_channel

          channel.last_message_id = message['message_id']

          channel.callbacks.each do |callback|
            callback.call(
              message['data'],
              channel.last_message_id,
              message['global_id']
            )
          end
        end
      end
    end
  end

  def poll_payload
    payload = {}

    @channels.each do |channel_name, channel|
      payload[channel_name] = channel.last_message_id
    end

    payload.to_json
  end

  def request_path
    "/message-bus/#{@client_id}/poll"
  end

  def headers
    headers = {}
    headers['Content-Type'] = 'application/json'
    headers['X-Silence-logger'] = 'true'

    if !@enable_long_polling || !@enable_chunked_encoding
      headers['Dont-Chunk'] = 'true'
    end

    headers.merge!(@headers)
  end
end

#statusHTTPClient::STOPPED, HTTPClient::STARTED

Returns the status of the client.

Returns:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/message_bus/http_client.rb', line 31

class HTTPClient
  class InvalidChannel < StandardError; end
  class MissingBlock < StandardError; end

  attr_reader :channels,
              :stats

  attr_accessor :enable_long_polling,
                :status,
                :enable_chunked_encoding,
                :min_poll_interval,
                :max_poll_interval,
                :background_callback_interval

  CHUNK_SEPARATOR = "\r\n|\r\n".freeze
  private_constant :CHUNK_SEPARATOR
  STATUS_CHANNEL = "/__status".freeze
  private_constant :STATUS_CHANNEL

  STOPPED = 0
  STARTED = 1

  Stats = Struct.new(:failed, :success)
  private_constant :Stats

  # @param base_url [String] Base URL of the message_bus server to connect to
  # @param enable_long_polling [Boolean] Enable long polling
  # @param enable_chunked_encoding [Boolean] Enable chunk encoding
  # @param min_poll_interval [Float, Integer] Min poll interval when long polling in seconds
  # @param max_poll_interval [Float, Integer] Max poll interval when long polling in seconds.
  #   When requests fail, the client will backoff and this is the upper limit.
  # @param background_callback_interval [Float, Integer] Interval to poll when
  #   when polling in seconds.
  # @param headers [Hash] extra HTTP headers to be set on the polling requests.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def initialize(base_url, enable_long_polling: true,
                           enable_chunked_encoding: true,
                           min_poll_interval: 0.1,
                           max_poll_interval: 180,
                           background_callback_interval: 60,
                           headers: {})

    @uri = URI(base_url)
    @enable_long_polling = enable_long_polling
    @enable_chunked_encoding = enable_chunked_encoding
    @min_poll_interval = min_poll_interval
    @max_poll_interval = max_poll_interval
    @background_callback_interval = background_callback_interval
    @headers = headers
    @client_id = SecureRandom.hex
    @channels = {}
    @status = STOPPED
    @mutex = Mutex.new
    @stats = Stats.new(0, 0)
  end

  # Starts a background thread that polls the message bus endpoint
  # for the given base_url.
  #
  # Intervals for long polling can be configured via min_poll_interval and
  # max_poll_interval.
  #
  # Intervals for polling can be configured via background_callback_interval.
  #
  # @return [Object] Instance of MessageBus::HTTPClient
  def start
    @mutex.synchronize do
      return if started?

      @status = STARTED

      thread = Thread.new do
        begin
          while started?
            unless @channels.empty?
              poll
              @stats.success += 1
              @stats.failed = 0
            end

            sleep interval
          end
        rescue StandardError => e
          @stats.failed += 1
          warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
          sleep interval
          retry
        ensure
          stop
        end
      end

      thread.abort_on_exception = true
    end

    self
  end

  # Stops the client from polling the message bus endpoint.
  #
  # @return [Integer] the current status of the client
  def stop
    @status = STOPPED
  end

  # Subscribes to a channel which executes the given callback when a message
  # is published to the channel
  #
  # @example Subscribing to a channel for message
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #
  #   client.subscribe("/test") do |payload, _message_id, _global_id|
  #     puts payload
  #   end
  #
  # A last_message_id may be provided.
  #  * -1 will subscribe to all new messages
  #  * -2 will receive last message + all new messages
  #  * -3 will receive last 2 message + all new messages
  #
  # @example Subscribing to a channel with `last_message_id`
  #   client.subscribe("/test", last_message_id: -2) do |payload|
  #     puts payload
  #   end
  #
  # @param channel [String] channel to listen for messages on
  # @param last_message_id [Integer] last message id to start polling on.
  #
  # @yield [data, message_id, global_id]
  #  callback to be executed whenever a message is received
  #
  # @yieldparam data [Hash] data payload of the message received on the channel
  # @yieldparam message_id [Integer] id of the message in the channel
  # @yieldparam global_id [Integer] id of the message in the global backlog
  # @yieldreturn [void]
  #
  # @return [Integer] the current status of the client
  def subscribe(channel, last_message_id: nil, &callback)
    raise InvalidChannel unless channel.to_s.start_with?("/")
    raise MissingBlock unless block_given?

    last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

    @channels[channel] ||= Channel.new
    channel = @channels[channel]
    channel.last_message_id = last_message_id if last_message_id
    channel.callbacks.push(callback)
    start if stopped?
  end

  # unsubscribes from a channel
  #
  # @example Unsubscribing from a channel
  #   client = MessageBus::HTTPClient.new('http://some.test.com')
  #   callback = -> { |payload| puts payload }
  #   client.subscribe("/test", &callback)
  #   client.unsubscribe("/test")
  #
  # If a callback is given, only the specific callback will be unsubscribed.
  #
  # @example Unsubscribing a callback from a channel
  #   client.unsubscribe("/test", &callback)
  #
  # When the client does not have any channels left, it will stop polling and
  # waits until a new subscription is started.
  #
  # @param channel [String] channel to unsubscribe
  # @yield [data, global_id, message_id] specific callback to unsubscribe
  #
  # @return [Integer] the current status of the client
  def unsubscribe(channel, &callback)
    if callback
      @channels[channel].callbacks.delete(callback)
      remove_channel(channel) if @channels[channel].callbacks.empty?
    else
      remove_channel(channel)
    end

    stop if @channels.empty?
    @status
  end

  private

  def stopped?
    @status == STOPPED
  end

  def started?
    @status == STARTED
  end

  def remove_channel(channel)
    @channels.delete(channel)
  end

  def interval
    if @enable_long_polling
      if (failed_count = @stats.failed) > 2
        (@min_poll_interval * 2**failed_count).clamp(
          @min_poll_interval, @max_poll_interval
        )
      else
        @min_poll_interval
      end
    else
      @background_callback_interval
    end
  end

  def poll
    http = Net::HTTP.new(@uri.host, @uri.port)
    http.use_ssl = true if @uri.scheme == 'https'
    request = Net::HTTP::Post.new(request_path, headers)
    request.body = poll_payload

    if @enable_long_polling
      buffer = +""

      http.request(request) do |response|
        response.read_body do |chunk|
          unless chunk.empty?
            buffer << chunk
            process_buffer(buffer)
          end
        end
      end
    else
      response = http.request(request)
      notify_channels(JSON.parse(response.body))
    end
  end

  def is_chunked?
    !headers["Dont-Chunk"]
  end

  def process_buffer(buffer)
    index = buffer.index(CHUNK_SEPARATOR)

    if is_chunked?
      return unless index

      messages = buffer[0..(index - 1)]
      buffer.slice!("#{messages}#{CHUNK_SEPARATOR}")
    else
      messages = buffer[0..-1]
      buffer.slice!(messages)
    end

    notify_channels(JSON.parse(messages))
  end

  def notify_channels(messages)
    messages.each do |message|
      current_channel = message['channel']

      if current_channel == STATUS_CHANNEL
        message["data"].each do |channel_name, last_message_id|
          if (channel = @channels[channel_name])
            channel.last_message_id = last_message_id
          end
        end
      else
        @channels.each do |channel_name, channel|
          next unless channel_name == current_channel

          channel.last_message_id = message['message_id']

          channel.callbacks.each do |callback|
            callback.call(
              message['data'],
              channel.last_message_id,
              message['global_id']
            )
          end
        end
      end
    end
  end

  def poll_payload
    payload = {}

    @channels.each do |channel_name, channel|
      payload[channel_name] = channel.last_message_id
    end

    payload.to_json
  end

  def request_path
    "/message-bus/#{@client_id}/poll"
  end

  def headers
    headers = {}
    headers['Content-Type'] = 'application/json'
    headers['X-Silence-logger'] = 'true'

    if !@enable_long_polling || !@enable_chunked_encoding
      headers['Dont-Chunk'] = 'true'
    end

    headers.merge!(@headers)
  end
end

Instance Method Details

#startObject

Starts a background thread that polls the message bus endpoint for the given base_url.

Intervals for long polling can be configured via min_poll_interval and max_poll_interval.

Intervals for polling can be configured via background_callback_interval.

Returns:

  • (Object)

    Instance of MessageBus::HTTPClient



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/message_bus/http_client.rb', line 97

def start
  @mutex.synchronize do
    return if started?

    @status = STARTED

    thread = Thread.new do
      begin
        while started?
          unless @channels.empty?
            poll
            @stats.success += 1
            @stats.failed = 0
          end

          sleep interval
        end
      rescue StandardError => e
        @stats.failed += 1
        warn("#{e.class} #{e.message}: #{e.backtrace.join("\n")}")
        sleep interval
        retry
      ensure
        stop
      end
    end

    thread.abort_on_exception = true
  end

  self
end

#stopInteger

Stops the client from polling the message bus endpoint.

Returns:

  • (Integer)

    the current status of the client



133
134
135
# File 'lib/message_bus/http_client.rb', line 133

def stop
  @status = STOPPED
end

#subscribe(channel, last_message_id: nil) {|data, message_id, global_id| ... } ⇒ Integer

Subscribes to a channel which executes the given callback when a message is published to the channel

A last_message_id may be provided.

* -1 will subscribe to all new messages
* -2 will receive last message + all new messages
* -3 will receive last 2 message + all new messages

Examples:

Subscribing to a channel for message

client = MessageBus::HTTPClient.new('http://some.test.com')

client.subscribe("/test") do |payload, _message_id, _global_id|
  puts payload
end

Subscribing to a channel with ‘last_message_id`

client.subscribe("/test", last_message_id: -2) do |payload|
  puts payload
end

Parameters:

  • channel (String)

    channel to listen for messages on

  • last_message_id (Integer) (defaults to: nil)

    last message id to start polling on.

Yields:

  • (data, message_id, global_id)

    callback to be executed whenever a message is received

Yield Parameters:

  • data (Hash)

    data payload of the message received on the channel

  • message_id (Integer)

    id of the message in the channel

  • global_id (Integer)

    id of the message in the global backlog

Yield Returns:

  • (void)

Returns:

  • (Integer)

    the current status of the client

Raises:



169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/message_bus/http_client.rb', line 169

def subscribe(channel, last_message_id: nil, &callback)
  raise InvalidChannel unless channel.to_s.start_with?("/")
  raise MissingBlock unless block_given?

  last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

  @channels[channel] ||= Channel.new
  channel = @channels[channel]
  channel.last_message_id = last_message_id if last_message_id
  channel.callbacks.push(callback)
  start if stopped?
end

#unsubscribe(channel) {|data, global_id, message_id| ... } ⇒ Integer

unsubscribes from a channel

If a callback is given, only the specific callback will be unsubscribed.

When the client does not have any channels left, it will stop polling and waits until a new subscription is started.

Examples:

Unsubscribing from a channel

client = MessageBus::HTTPClient.new('http://some.test.com')
callback = -> { |payload| puts payload }
client.subscribe("/test", &callback)
client.unsubscribe("/test")

Unsubscribing a callback from a channel

client.unsubscribe("/test", &callback)

Parameters:

  • channel (String)

    channel to unsubscribe

Yields:

  • (data, global_id, message_id)

    specific callback to unsubscribe

Returns:

  • (Integer)

    the current status of the client



202
203
204
205
206
207
208
209
210
211
212
# File 'lib/message_bus/http_client.rb', line 202

def unsubscribe(channel, &callback)
  if callback
    @channels[channel].callbacks.delete(callback)
    remove_channel(channel) if @channels[channel].callbacks.empty?
  else
    remove_channel(channel)
  end

  stop if @channels.empty?
  @status
end