Class: MessageBus::Client
- Inherits:
-
Object
- Object
- MessageBus::Client
- Defined in:
- lib/message_bus/client.rb
Instance Attribute Summary collapse
-
#async_response ⇒ Object
Returns the value of attribute async_response.
-
#cleanup_timer ⇒ Object
Returns the value of attribute cleanup_timer.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#connect_time ⇒ Object
Returns the value of attribute connect_time.
-
#group_ids ⇒ Object
Returns the value of attribute group_ids.
-
#headers ⇒ Object
Returns the value of attribute headers.
-
#io ⇒ Object
Returns the value of attribute io.
-
#seq ⇒ Object
Returns the value of attribute seq.
-
#site_id ⇒ Object
Returns the value of attribute site_id.
-
#subscribed_sets ⇒ Object
Returns the value of attribute subscribed_sets.
-
#user_id ⇒ Object
Returns the value of attribute user_id.
Instance Method Summary collapse
- #<<(msg) ⇒ Object
- #allowed?(msg) ⇒ Boolean
- #backlog ⇒ Object
- #cancel ⇒ Object
- #close ⇒ Object
- #closed ⇒ Object
- #ensure_closed! ⇒ Object
- #filter(msg) ⇒ Object
- #in_async? ⇒ Boolean
-
#initialize(opts) ⇒ Client
constructor
A new instance of Client.
- #subscribe(channel, last_seen_id) ⇒ Object
- #subscriptions ⇒ Object
Constructor Details
#initialize(opts) ⇒ Client
Returns a new instance of Client.
6 7 8 9 10 11 12 13 14 15 |
# File 'lib/message_bus/client.rb', line 6 def initialize(opts) self.client_id = opts[:client_id] self.user_id = opts[:user_id] self.group_ids = opts[:group_ids] || [] self.site_id = opts[:site_id] self.seq = opts[:seq].to_i self.connect_time = Time.now @bus = opts[:message_bus] || MessageBus @subscriptions = {} end |
Instance Attribute Details
#async_response ⇒ Object
Returns the value of attribute async_response.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def async_response @async_response end |
#cleanup_timer ⇒ Object
Returns the value of attribute cleanup_timer.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def cleanup_timer @cleanup_timer end |
#client_id ⇒ Object
Returns the value of attribute client_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def client_id @client_id end |
#connect_time ⇒ Object
Returns the value of attribute connect_time.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def connect_time @connect_time end |
#group_ids ⇒ Object
Returns the value of attribute group_ids.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def group_ids @group_ids end |
#headers ⇒ Object
Returns the value of attribute headers.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def headers @headers end |
#io ⇒ Object
Returns the value of attribute io.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def io @io end |
#seq ⇒ Object
Returns the value of attribute seq.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def seq @seq end |
#site_id ⇒ Object
Returns the value of attribute site_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def site_id @site_id end |
#subscribed_sets ⇒ Object
Returns the value of attribute subscribed_sets.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def subscribed_sets @subscribed_sets end |
#user_id ⇒ Object
Returns the value of attribute user_id.
2 3 4 |
# File 'lib/message_bus/client.rb', line 2 def user_id @user_id end |
Instance Method Details
#<<(msg) ⇒ Object
58 59 60 |
# File 'lib/message_bus/client.rb', line 58 def <<(msg) write_and_close ([msg]) end |
#allowed?(msg) ⇒ Boolean
66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/message_bus/client.rb', line 66 def allowed?(msg) allowed = !msg.user_ids || msg.user_ids.include?(self.user_id) allowed &&= !msg.client_ids || msg.client_ids.include?(self.client_id) allowed && ( msg.group_ids.nil? || msg.group_ids.length == 0 || ( msg.group_ids - self.group_ids ).length < msg.group_ids.length ) end |
#backlog ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/message_bus/client.rb', line 88 def backlog r = [] @subscriptions.each do |k,v| next if v.to_i < 0 = @bus.backlog(k,v) .each do |msg| r << msg if allowed?(msg) end end # stats message for all newly subscribed = nil @subscriptions.each do |k,v| if v.to_i == -1 ||= {} [k] = @bus.last_id(k) end end r << MessageBus::Message.new(-1, -1, '/__status', ) if r.map!{|msg| filter(msg)}.compact! r || [] end |
#cancel ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/message_bus/client.rb', line 17 def cancel if cleanup_timer # concurrency may nil cleanup timer cleanup_timer.cancel rescue nil self.cleanup_timer = nil end ensure_closed! end |
#close ⇒ Object
39 40 41 42 |
# File 'lib/message_bus/client.rb', line 39 def close return unless in_async? write_and_close "[]" end |
#closed ⇒ Object
44 45 46 |
# File 'lib/message_bus/client.rb', line 44 def closed !@async_response end |
#ensure_closed! ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/message_bus/client.rb', line 30 def ensure_closed! return unless in_async? write_and_close "[]" rescue # we may have a dead socket, just nil the @io @io = nil @async_response = nil end |
#filter(msg) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/message_bus/client.rb', line 78 def filter(msg) filter = @bus.client_filter(msg.channel) if filter filter.call(self.user_id, msg) else msg end end |
#in_async? ⇒ Boolean
26 27 28 |
# File 'lib/message_bus/client.rb', line 26 def in_async? @async_response || @io end |
#subscribe(channel, last_seen_id) ⇒ Object
48 49 50 51 52 |
# File 'lib/message_bus/client.rb', line 48 def subscribe(channel, last_seen_id) last_seen_id = nil if last_seen_id == "" last_seen_id ||= @bus.last_id(channel) @subscriptions[channel] = last_seen_id.to_i end |
#subscriptions ⇒ Object
54 55 56 |
# File 'lib/message_bus/client.rb', line 54 def subscriptions @subscriptions end |