Method: MessageBus::HTTPClient#subscribe

Defined in:
lib/message_bus/http_client.rb

#subscribe(channel, last_message_id: nil) {|data, message_id, global_id| ... } ⇒ Integer

Subscribes to a channel which executes the given callback when a message is published to the channel

A last_message_id may be provided.

* -1 will subscribe to all new messages
* -2 will receive last message + all new messages
* -3 will receive last 2 message + all new messages

Examples:

Subscribing to a channel for message

client = MessageBus::HTTPClient.new('http://some.test.com')

client.subscribe("/test") do |payload, _message_id, _global_id|
  puts payload
end

Subscribing to a channel with last_message_id

client.subscribe("/test", last_message_id: -2) do |payload|
  puts payload
end

Parameters:

  • channel (String)

    channel to listen for messages on

  • last_message_id (Integer) (defaults to: nil)

    last message id to start polling on.

Yields:

  • (data, message_id, global_id)

    callback to be executed whenever a message is received

Yield Parameters:

  • data (Hash)

    data payload of the message received on the channel

  • message_id (Integer)

    id of the message in the channel

  • global_id (Integer)

    id of the message in the global backlog

Yield Returns:

  • (void)

Returns:

  • (Integer)

    the current status of the client

Raises:



169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/message_bus/http_client.rb', line 169

def subscribe(channel, last_message_id: nil, &callback)
  raise InvalidChannel unless channel.to_s.start_with?("/")
  raise MissingBlock unless block_given?

  last_message_id = -1 if last_message_id && !last_message_id.is_a?(Integer)

  @channels[channel] ||= Channel.new
  channel = @channels[channel]
  channel.last_message_id = last_message_id if last_message_id
  channel.callbacks.push(callback)
  start if stopped?
end