Class: Wampproto::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/wampproto/broker.rb

Overview

Wampproto broker implementation

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id_gen = IdGenerator.new) ⇒ Broker

Returns a new instance of Broker.



8
9
10
11
12
13
# File 'lib/wampproto/broker.rb', line 8

def initialize(id_gen = IdGenerator.new)
  @id_gen = id_gen
  @subscriptions_by_session = {}
  @subscriptions_by_topic = {}
  @sessions = {}
end

Instance Attribute Details

#id_genObject (readonly)

rubocop:disable Metrics/ClassLength



6
7
8
# File 'lib/wampproto/broker.rb', line 6

def id_gen
  @id_gen
end

#sessionsObject (readonly)

rubocop:disable Metrics/ClassLength



6
7
8
# File 'lib/wampproto/broker.rb', line 6

def sessions
  @sessions
end

#subscriptions_by_sessionObject (readonly)

rubocop:disable Metrics/ClassLength



6
7
8
# File 'lib/wampproto/broker.rb', line 6

def subscriptions_by_session
  @subscriptions_by_session
end

#subscriptions_by_topicObject (readonly)

rubocop:disable Metrics/ClassLength



6
7
8
# File 'lib/wampproto/broker.rb', line 6

def subscriptions_by_topic
  @subscriptions_by_topic
end

Instance Method Details

#add_session(details) ⇒ Object

Raises:

  • (KeyError)


15
16
17
18
19
20
21
22
23
# File 'lib/wampproto/broker.rb', line 15

def add_session(details)
  session_id = details.session_id

  error_message = "cannot add session twice"
  raise KeyError, error_message if subscriptions_by_session.include?(session_id)

  subscriptions_by_session[session_id] = {}
  sessions[session_id] = details
end

#handle_publish(session_id, message) ⇒ Object

rubocop:disable Metrics/MethodLength, Metrics/AbcSize

Raises:



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/wampproto/broker.rb', line 53

def handle_publish(session_id, message) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize
  error_message = "cannot publish, session #{session_id} doesn't exist"
  raise ValueError, error_message unless subscriptions_by_session.include?(session_id)

  subscriptions = subscriptions_by_topic.fetch(message.topic, {})
  return if subscriptions.empty?

  publication_id = id_gen.next

  messages = []
  if message.options[:acknowledge]
    published = Message::Published.new(message.request_id, publication_id)
    messages << MessageWithRecipient.new(published, session_id)
  end
  subscription_id, session_ids = subscriptions.first

  event_options = event_details_for(session_id, message)
  event = Message::Event.new(subscription_id, publication_id, event_options, *message.args, **message.kwargs)

  session_ids.each_with_object(messages) do |recipient_id, list|
    list << MessageWithRecipient.new(event, recipient_id) unless exclude?(message, session_id, recipient_id)
  end
end

#handle_subscribe(session_id, message) ⇒ Object

Raises:



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/wampproto/broker.rb', line 77

def handle_subscribe(session_id, message)
  error_message = "cannot subscribe, session #{session_id} doesn't exist"
  raise ValueError, error_message unless subscriptions_by_session.include?(session_id)

  subscription_id = find_subscription_id_from(message.topic)
  add_topic_subscriber(message.topic, subscription_id, session_id)
  subscriptions_by_session[session_id][subscription_id] = message.topic

  subscribed = Message::Subscribed.new(message.request_id, subscription_id)
  MessageWithRecipient.new(subscribed, session_id)
end

#handle_unsubscribe(session_id, message) ⇒ Object

rubocop:disable Metrics/MethodLength, Metrics/AbcSize

Raises:



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/wampproto/broker.rb', line 89

def handle_unsubscribe(session_id, message) # rubocop:disable  Metrics/MethodLength, Metrics/AbcSize
  error_message = "cannot unsubscribe, session #{session_id} doesn't exist"
  raise ValueError, error_message unless subscriptions_by_session.include?(session_id)

  subscriptions = subscriptions_by_session.fetch(session_id)

  unless subscriptions.include?(message.subscription_id)
    error = Message::Error.new(Message::Type::UNSUBSCRIBE, message.request_id, {},
                               "wamp.error.no_such_subscription")
    return MessageWithRecipient.new(error, session_id)
  end

  topic = subscriptions.fetch(message.subscription_id)

  remove_topic_subscriber(topic, message.subscription_id, session_id)
  subscriptions_by_session[session_id].delete(message.subscription_id)

  unsubscribed = Message::Unsubscribed.new(message.request_id)
  MessageWithRecipient.new(unsubscribed, session_id)
end

#receive_message(session_id, message) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/wampproto/broker.rb', line 43

def receive_message(session_id, message)
  case message
  when Message::Subscribe then handle_subscribe(session_id, message)
  when Message::Unsubscribe then handle_unsubscribe(session_id, message)
  when Message::Publish then handle_publish(session_id, message)
  else
    raise ValueError, "message type not supported"
  end
end

#remove_session(session_id) ⇒ Object

Raises:

  • (KeyError)


25
26
27
28
29
30
31
32
33
34
# File 'lib/wampproto/broker.rb', line 25

def remove_session(session_id)
  error_message = "cannot remove non-existing session"
  raise KeyError, error_message unless subscriptions_by_session.include?(session_id)

  subscriptions = subscriptions_by_session.delete(session_id) || {}
  subscriptions.each do |subscription_id, topic|
    remove_topic_subscriber(topic, subscription_id, session_id)
  end
  sessions.delete(session_id)
end

#subscription?(topic) ⇒ Boolean

Returns:

  • (Boolean)


36
37
38
39
40
41
# File 'lib/wampproto/broker.rb', line 36

def subscription?(topic)
  subscriptions = subscriptions_by_topic[topic]
  return false unless subscriptions

  subscriptions.any?
end