Class: MessageBus::Rack::Middleware

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/rack/middleware.rb

Overview

Accepts requests from subscribers, validates and authenticates them, delivers existing messages from the backlog and informs a ‘MessageBus::ConnectionManager` of a connection which is remaining open.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, config = {}) ⇒ Middleware

Sets up the middleware to receive subscriber client requests and begins listening for messages published on the bus for re-distribution (unless the bus is disabled).

Parameters:

  • app (Proc)

    the rack app

  • config (Hash) (defaults to: {})

Options Hash (config):

  • :message_bus (MessageBus::Instance) — default: `MessageBus`

    a specific instance of message_bus



37
38
39
40
41
42
43
44
45
46
# File 'lib/message_bus/rack/middleware.rb', line 37

def initialize(app, config = {})
  @app = app
  @bus = config[:message_bus] || MessageBus
  @connection_manager = MessageBus::ConnectionManager.new(@bus)
  @started_listener = false
  @base_route = "#{@bus.base_route}message-bus/"
  @base_route_length = @base_route.length
  @broadcast_route = "#{@base_route}broadcast"
  start_listener unless @bus.off?
end

Instance Attribute Details

#started_listenerBoolean (readonly)

Returns whether the message listener (subscriber) is started or not).

Returns:

  • (Boolean)

    whether the message listener (subscriber) is started or not)



28
29
30
# File 'lib/message_bus/rack/middleware.rb', line 28

def started_listener
  @started_listener
end

Class Method Details

.backlog_to_json(backlog) ⇒ JSON

Returns a JSON representation of the backlog, compliant with the subscriber API specification.

Parameters:

Returns:

  • (JSON)

    a JSON representation of the backlog, compliant with the subscriber API specification



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/message_bus/rack/middleware.rb', line 15

def self.backlog_to_json(backlog)
  m = backlog.map do |msg|
    {
      global_id: msg.global_id,
      message_id: msg.message_id,
      channel: msg.channel,
      data: msg.data
    }
  end.to_a
  JSON.dump(m)
end

Instance Method Details

#call(env) ⇒ Object

Process an HTTP request from a subscriber client

Parameters:

  • env (Rack::Request::Env)

    the request environment



59
60
61
62
63
# File 'lib/message_bus/rack/middleware.rb', line 59

def call(env)
  return @app.call(env) unless env['PATH_INFO'].start_with? @base_route

  handle_request(env)
end

#stop_listenervoid

This method returns an undefined value.

Stops listening for messages on the bus



50
51
52
53
54
55
# File 'lib/message_bus/rack/middleware.rb', line 50

def stop_listener
  if @subscription
    @bus.unsubscribe(&@subscription)
    @started_listener = false
  end
end