Module: Wampus::Pubsub::ServerExt

Extended by:
ActiveSupport::Concern
Defined in:
lib/wampus/pubsub/server_ext.rb

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#topicsObject

Returns the value of attribute topics.



8
9
10
# File 'lib/wampus/pubsub/server_ext.rb', line 8

def topics
  @topics
end

Instance Method Details

#dispatch_event(topic_uri, event, exclude, include) ⇒ Object



135
136
137
138
139
140
# File 'lib/wampus/pubsub/server_ext.rb', line 135

def dispatch_event(topic_uri, event, exclude, include)
  topic = topics[topic_uri]
  topic.subscribers(exclude, include).each do |subscriber|
    subscriber.write event_msg(topic_uri, event)
  end
end

#handle_publish_msg(connection, data) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/wampus/pubsub/server_ext.rb', line 104

def handle_publish_msg(connection, data)
  topic_uri, event, exclude, include = data
  uri = connection.resolve_prefix topic_uri
  handler = get_publish_handler uri

  # Do nothing unless the server recognizes the topic
  return unless handler

  # Do nothing unless the handler covers partial uri matches
  return unless handler[1] == '' && !handler[4]

  topic = topic_for_uri uri

  # Exclude true means to exclude myself from the message
  exclude = [connection.id] if exclude == true

  if handler[2].nil? && handler[3].nil?
    dispatch_event topic.uri, event, exclude, include
  else
    EM.defer lambda {
      run_publish_deferred connection, handler, topic_uri, event
    }, lambda {|result|
      if result.is_a? Exception
        # TODO Log errors
      elsif result
        dispatch_event topic.uri, event, exclude, include
      end
    }
  end
end

#handle_subscribe_msg(connection, data) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/wampus/pubsub/server_ext.rb', line 70

def handle_subscribe_msg(connection, data)
  topic_uri = connection.resolve_prefix data[0]
  handler = get_subscribe_handler topic_uri

  # Do nothing unless the server recognizes the topic
  return unless handler

  # Do nothing unless the handler covers partial uri matches
  return unless handler[1] == '' && !handler[4]

  topic = topic_for_uri topic_uri

  if handler[2].nil? && handler[3].nil?
    topic.add_subscriber connection
  else
    EM.defer lambda {
      run_subscribe_deferred connection, handler, topic_uri
    }, lambda { |result|
      if result.is_a? Exception
        # TODO Log errors
      elsif result
        topic.add_subscriber connection
      end
    }
  end
end

#handle_unsubscribe_msg(connection, data) ⇒ Object



97
98
99
100
101
102
# File 'lib/wampus/pubsub/server_ext.rb', line 97

def handle_unsubscribe_msg(connection, data)
  topic = topic_for_uri connection.resolve_prefix data[0]
  return unless topic
  topic.remove_subscriber connection
  connection.unsubscribe_from_topic topic
end

#initialize(*args) ⇒ Object



10
11
12
13
# File 'lib/wampus/pubsub/server_ext.rb', line 10

def initialize(*args)
  @topics = {}
  super(*args) if defined? super
end