Class: MessageBus::Client
- Inherits:
-
Object
- Object
- MessageBus::Client
- Defined in:
- lib/message_bus/client.rb
Instance Attribute Summary collapse
-
#async_response ⇒ Object
Returns the value of attribute async_response.
-
#cleanup_timer ⇒ Object
Returns the value of attribute cleanup_timer.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connect_time ⇒ Object
Returns the value of attribute connect_time.
-
#group_ids ⇒ Object
Returns the value of attribute group_ids.
-
#io ⇒ Object
Returns the value of attribute io.
-
#site_id ⇒ Object
Returns the value of attribute site_id.
-
#subscribed_sets ⇒ Object
Returns the value of attribute subscribed_sets.
-
#user_id ⇒ Object
Returns the value of attribute user_id.
Instance Method Summary collapse
- #<<(msg) ⇒ Object
- #allowed?(msg) ⇒ Boolean
- #backlog ⇒ Object
- #close ⇒ Object
- #closed ⇒ Object
- #ensure_closed! ⇒ Object
- #filter(msg) ⇒ Object
- #in_async? ⇒ Boolean
-
#initialize(opts) ⇒ Client
constructor
A new instance of Client.
- #subscribe(channel, last_seen_id) ⇒ Object
- #subscriptions ⇒ Object
Constructor Details
#initialize(opts) ⇒ Client
Returns a new instance of Client.
3 4 5 6 7 8 9 10 11 |
# File 'lib/message_bus/client.rb', line 3 def initialize(opts) self.client_id = opts[:client_id] self.user_id = opts[:user_id] self.group_ids = opts[:group_ids] || [] self.site_id = opts[:site_id] self.connect_time = Time.now @bus = opts[:message_bus] || MessageBus @subscriptions = {} end |
Instance Attribute Details
#async_response ⇒ Object
Returns the value of attribute async_response.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def async_response @async_response end |
#cleanup_timer ⇒ Object
Returns the value of attribute cleanup_timer.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def cleanup_timer @cleanup_timer end |
#client_id ⇒ Object
Returns the value of attribute client_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def client_id @client_id end |
#connect_time ⇒ Object
Returns the value of attribute connect_time.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def connect_time @connect_time end |
#group_ids ⇒ Object
Returns the value of attribute group_ids.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def group_ids @group_ids end |
#io ⇒ Object
Returns the value of attribute io.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def io @io end |
#site_id ⇒ Object
Returns the value of attribute site_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def site_id @site_id end |
#subscribed_sets ⇒ Object
Returns the value of attribute subscribed_sets.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def subscribed_sets @subscribed_sets end |
#user_id ⇒ Object
Returns the value of attribute user_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def user_id @user_id end |
Instance Method Details
#<<(msg) ⇒ Object
45 46 47 |
# File 'lib/message_bus/client.rb', line 45 def <<(msg) write_and_close ([msg]) end |
#allowed?(msg) ⇒ Boolean
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/message_bus/client.rb', line 53 def allowed?(msg) allowed = !msg.user_ids || msg.user_ids.include?(self.user_id) allowed && ( msg.group_ids.nil? || msg.group_ids.length == 0 || ( msg.group_ids - self.group_ids ).length < msg.group_ids.length ) end |
#backlog ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/message_bus/client.rb', line 74 def backlog r = [] @subscriptions.each do |k,v| next if v.to_i < 0 = @bus.backlog(k,v) .each do |msg| r << msg if allowed?(msg) end end # stats message for all newly subscribed = nil @subscriptions.each do |k,v| if v.to_i == -1 ||= {} [k] = @bus.last_id(k) end end r << MessageBus::Message.new(-1, -1, '/__status', ) if r.map!{|msg| filter(msg)}.compact! r || [] end |
#close ⇒ Object
26 27 28 29 |
# File 'lib/message_bus/client.rb', line 26 def close return unless in_async? write_and_close "[]" end |
#closed ⇒ Object
31 32 33 |
# File 'lib/message_bus/client.rb', line 31 def closed !@async_response end |
#ensure_closed! ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/message_bus/client.rb', line 17 def ensure_closed! return unless in_async? write_and_close "[]" rescue # we may have a dead socket, just nil the @io @io = nil @async_response = nil end |
#filter(msg) ⇒ Object
64 65 66 67 68 69 70 71 72 |
# File 'lib/message_bus/client.rb', line 64 def filter(msg) filter = @bus.client_filter(msg.channel) if filter filter.call(self.user_id, msg) else msg end end |
#in_async? ⇒ Boolean
13 14 15 |
# File 'lib/message_bus/client.rb', line 13 def in_async? @async_response || @io end |
#subscribe(channel, last_seen_id) ⇒ Object
35 36 37 38 39 |
# File 'lib/message_bus/client.rb', line 35 def subscribe(channel, last_seen_id) last_seen_id = nil if last_seen_id == "" last_seen_id ||= @bus.last_id(channel) @subscriptions[channel] = last_seen_id.to_i end |
#subscriptions ⇒ Object
41 42 43 |
# File 'lib/message_bus/client.rb', line 41 def subscriptions @subscriptions end |