Class: Wampproto::Broker
- Inherits:
-
Object
- Object
- Wampproto::Broker
- Defined in:
- lib/wampproto/broker.rb
Overview
Wampproto broker implementation
Instance Attribute Summary collapse
-
#id_gen ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
-
#sessions ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
-
#subscriptions_by_session ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
-
#subscriptions_by_topic ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
Instance Method Summary collapse
- #add_session(details) ⇒ Object
-
#handle_publish(session_id, message) ⇒ Object
rubocop:disable Metrics/MethodLength, Metrics/AbcSize.
- #handle_subscribe(session_id, message) ⇒ Object
-
#handle_unsubscribe(session_id, message) ⇒ Object
rubocop:disable Metrics/MethodLength, Metrics/AbcSize.
-
#initialize(id_gen = IdGenerator.new) ⇒ Broker
constructor
A new instance of Broker.
- #receive_message(session_id, message) ⇒ Object
- #remove_session(session_id) ⇒ Object
- #subscription?(topic) ⇒ Boolean
Constructor Details
#initialize(id_gen = IdGenerator.new) ⇒ Broker
Returns a new instance of Broker.
8 9 10 11 12 13 |
# File 'lib/wampproto/broker.rb', line 8 def initialize(id_gen = IdGenerator.new) @id_gen = id_gen @subscriptions_by_session = {} @subscriptions_by_topic = {} @sessions = {} end |
Instance Attribute Details
#id_gen ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
6 7 8 |
# File 'lib/wampproto/broker.rb', line 6 def id_gen @id_gen end |
#sessions ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
6 7 8 |
# File 'lib/wampproto/broker.rb', line 6 def sessions @sessions end |
#subscriptions_by_session ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
6 7 8 |
# File 'lib/wampproto/broker.rb', line 6 def subscriptions_by_session @subscriptions_by_session end |
#subscriptions_by_topic ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
6 7 8 |
# File 'lib/wampproto/broker.rb', line 6 def subscriptions_by_topic @subscriptions_by_topic end |
Instance Method Details
#add_session(details) ⇒ Object
15 16 17 18 19 20 21 22 23 |
# File 'lib/wampproto/broker.rb', line 15 def add_session(details) session_id = details.session_id = "cannot add session twice" raise KeyError, if subscriptions_by_session.include?(session_id) subscriptions_by_session[session_id] = {} sessions[session_id] = details end |
#handle_publish(session_id, message) ⇒ Object
rubocop:disable Metrics/MethodLength, Metrics/AbcSize
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/wampproto/broker.rb', line 53 def handle_publish(session_id, ) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize = "cannot publish, session #{session_id} doesn't exist" raise ValueError, unless subscriptions_by_session.include?(session_id) subscriptions = subscriptions_by_topic.fetch(.topic, {}) return if subscriptions.empty? publication_id = id_gen.next = [] if .[:acknowledge] published = Message::Published.new(.request_id, publication_id) << MessageWithRecipient.new(published, session_id) end subscription_id, session_ids = subscriptions.first = event_details_for(session_id, ) event = Message::Event.new(subscription_id, publication_id, , *.args, **.kwargs) session_ids.each_with_object() do |recipient_id, list| list << MessageWithRecipient.new(event, recipient_id) unless exclude?(, session_id, recipient_id) end end |
#handle_subscribe(session_id, message) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/wampproto/broker.rb', line 77 def handle_subscribe(session_id, ) = "cannot subscribe, session #{session_id} doesn't exist" raise ValueError, unless subscriptions_by_session.include?(session_id) subscription_id = find_subscription_id_from(.topic) add_topic_subscriber(.topic, subscription_id, session_id) subscriptions_by_session[session_id][subscription_id] = .topic subscribed = Message::Subscribed.new(.request_id, subscription_id) MessageWithRecipient.new(subscribed, session_id) end |
#handle_unsubscribe(session_id, message) ⇒ Object
rubocop:disable Metrics/MethodLength, Metrics/AbcSize
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/wampproto/broker.rb', line 89 def handle_unsubscribe(session_id, ) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize = "cannot unsubscribe, session #{session_id} doesn't exist" raise ValueError, unless subscriptions_by_session.include?(session_id) subscriptions = subscriptions_by_session.fetch(session_id) unless subscriptions.include?(.subscription_id) error = Message::Error.new(Message::Type::UNSUBSCRIBE, .request_id, {}, "wamp.error.no_such_subscription") return MessageWithRecipient.new(error, session_id) end topic = subscriptions.fetch(.subscription_id) remove_topic_subscriber(topic, .subscription_id, session_id) subscriptions_by_session[session_id].delete(.subscription_id) unsubscribed = Message::Unsubscribed.new(.request_id) MessageWithRecipient.new(unsubscribed, session_id) end |
#receive_message(session_id, message) ⇒ Object
43 44 45 46 47 48 49 50 51 |
# File 'lib/wampproto/broker.rb', line 43 def (session_id, ) case when Message::Subscribe then handle_subscribe(session_id, ) when Message::Unsubscribe then handle_unsubscribe(session_id, ) when Message::Publish then handle_publish(session_id, ) else raise ValueError, "message type not supported" end end |
#remove_session(session_id) ⇒ Object
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/wampproto/broker.rb', line 25 def remove_session(session_id) = "cannot remove non-existing session" raise KeyError, unless subscriptions_by_session.include?(session_id) subscriptions = subscriptions_by_session.delete(session_id) || {} subscriptions.each do |subscription_id, topic| remove_topic_subscriber(topic, subscription_id, session_id) end sessions.delete(session_id) end |
#subscription?(topic) ⇒ Boolean
36 37 38 39 40 41 |
# File 'lib/wampproto/broker.rb', line 36 def subscription?(topic) subscriptions = subscriptions_by_topic[topic] return false unless subscriptions subscriptions.any? end |