Module: Wampus::Pubsub::ServerExt
- Extended by:
- ActiveSupport::Concern
- Defined in:
- lib/wampus/pubsub/server_ext.rb
Defined Under Namespace
Modules: ClassMethods
Instance Attribute Summary collapse
-
#topics ⇒ Object
Returns the value of attribute topics.
Instance Method Summary collapse
- #dispatch_event(topic_uri, event, exclude, include) ⇒ Object
- #handle_publish_msg(connection, data) ⇒ Object
- #handle_subscribe_msg(connection, data) ⇒ Object
- #handle_unsubscribe_msg(connection, data) ⇒ Object
- #initialize(*args) ⇒ Object
Instance Attribute Details
#topics ⇒ Object
Returns the value of attribute topics.
8 9 10 |
# File 'lib/wampus/pubsub/server_ext.rb', line 8 def topics @topics end |
Instance Method Details
#dispatch_event(topic_uri, event, exclude, include) ⇒ Object
135 136 137 138 139 140 |
# File 'lib/wampus/pubsub/server_ext.rb', line 135 def dispatch_event(topic_uri, event, exclude, include) topic = topics[topic_uri] topic.subscribers(exclude, include).each do |subscriber| subscriber.write event_msg(topic_uri, event) end end |
#handle_publish_msg(connection, data) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/wampus/pubsub/server_ext.rb', line 104 def handle_publish_msg(connection, data) topic_uri, event, exclude, include = data uri = connection.resolve_prefix topic_uri handler = get_publish_handler uri # Do nothing unless the server recognizes the topic return unless handler # Do nothing unless the handler covers partial uri matches return unless handler[1] == '' && !handler[4] topic = topic_for_uri uri # Exclude true means to exclude myself from the message exclude = [connection.id] if exclude == true if handler[2].nil? && handler[3].nil? dispatch_event topic.uri, event, exclude, include else EM.defer lambda { run_publish_deferred connection, handler, topic_uri, event }, lambda {|result| if result.is_a? Exception # TODO Log errors elsif result dispatch_event topic.uri, event, exclude, include end } end end |
#handle_subscribe_msg(connection, data) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/wampus/pubsub/server_ext.rb', line 70 def handle_subscribe_msg(connection, data) topic_uri = connection.resolve_prefix data[0] handler = get_subscribe_handler topic_uri # Do nothing unless the server recognizes the topic return unless handler # Do nothing unless the handler covers partial uri matches return unless handler[1] == '' && !handler[4] topic = topic_for_uri topic_uri if handler[2].nil? && handler[3].nil? topic.add_subscriber connection else EM.defer lambda { run_subscribe_deferred connection, handler, topic_uri }, lambda { |result| if result.is_a? Exception # TODO Log errors elsif result topic.add_subscriber connection end } end end |
#handle_unsubscribe_msg(connection, data) ⇒ Object
97 98 99 100 101 102 |
# File 'lib/wampus/pubsub/server_ext.rb', line 97 def handle_unsubscribe_msg(connection, data) topic = topic_for_uri connection.resolve_prefix data[0] return unless topic topic.remove_subscriber connection connection.unsubscribe_from_topic topic end |
#initialize(*args) ⇒ Object
10 11 12 13 |
# File 'lib/wampus/pubsub/server_ext.rb', line 10 def initialize(*args) @topics = {} super(*args) if defined? super end |