Class: MessageBus::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Client

Returns a new instance of Client.



3
4
5
6
7
8
9
10
11
# File 'lib/message_bus/client.rb', line 3

def initialize(opts)
  self.client_id = opts[:client_id]
  self.user_id = opts[:user_id]
  self.group_ids = opts[:group_ids] || []
  self.site_id = opts[:site_id]
  self.connect_time = Time.now
  @bus = opts[:message_bus] || MessageBus
  @subscriptions = {}
end

Instance Attribute Details

#async_responseObject

Returns the value of attribute async_response.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def async_response
  @async_response
end

#cleanup_timerObject

Returns the value of attribute cleanup_timer.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def cleanup_timer
  @cleanup_timer
end

#client_idObject

Returns the value of attribute client_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def client_id
  @client_id
end

#connect_timeObject

Returns the value of attribute connect_time.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def connect_time
  @connect_time
end

#group_idsObject

Returns the value of attribute group_ids.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def group_ids
  @group_ids
end

#ioObject

Returns the value of attribute io.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def io
  @io
end

#site_idObject

Returns the value of attribute site_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def site_id
  @site_id
end

#subscribed_setsObject

Returns the value of attribute subscribed_sets.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def subscribed_sets
  @subscribed_sets
end

#user_idObject

Returns the value of attribute user_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def user_id
  @user_id
end

Instance Method Details

#<<(msg) ⇒ Object



45
46
47
# File 'lib/message_bus/client.rb', line 45

def <<(msg)
  write_and_close messages_to_json([msg])
end

#allowed?(msg) ⇒ Boolean

Returns:

  • (Boolean)


53
54
55
56
57
58
59
60
61
62
# File 'lib/message_bus/client.rb', line 53

def allowed?(msg)
  allowed = !msg.user_ids || msg.user_ids.include?(self.user_id)
  allowed && (
    msg.group_ids.nil? ||
    msg.group_ids.length == 0 ||
    (
      msg.group_ids - self.group_ids
    ).length < msg.group_ids.length
  )
end

#backlogObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/message_bus/client.rb', line 74

def backlog
  r = []
  @subscriptions.each do |k,v|
    next if v.to_i < 0
    messages = @bus.backlog(k,v)
    messages.each do |msg|
      r << msg if allowed?(msg)
    end
  end
  # stats message for all newly subscribed
  status_message = nil
  @subscriptions.each do |k,v|
    if v.to_i == -1
      status_message ||= {}
      status_message[k] = @bus.last_id(k)
    end
  end
  r << MessageBus::Message.new(-1, -1, '/__status', status_message) if status_message

  r.map!{|msg| filter(msg)}.compact!
  r || []
end

#closeObject



26
27
28
29
# File 'lib/message_bus/client.rb', line 26

def close
  return unless in_async?
  write_and_close "[]"
end

#closedObject



31
32
33
# File 'lib/message_bus/client.rb', line 31

def closed
  !@async_response
end

#ensure_closed!Object



17
18
19
20
21
22
23
24
# File 'lib/message_bus/client.rb', line 17

def ensure_closed!
  return unless in_async?
  write_and_close "[]"
rescue
  # we may have a dead socket, just nil the @io
  @io = nil
  @async_response = nil
end

#filter(msg) ⇒ Object



64
65
66
67
68
69
70
71
72
# File 'lib/message_bus/client.rb', line 64

def filter(msg)
  filter = @bus.client_filter(msg.channel)

  if filter
    filter.call(self.user_id, msg)
  else
    msg
  end
end

#in_async?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/message_bus/client.rb', line 13

def in_async?
  @async_response || @io
end

#subscribe(channel, last_seen_id) ⇒ Object



35
36
37
38
39
# File 'lib/message_bus/client.rb', line 35

def subscribe(channel, last_seen_id)
  last_seen_id = nil if last_seen_id == ""
  last_seen_id ||= @bus.last_id(channel)
  @subscriptions[channel] = last_seen_id.to_i
end

#subscriptionsObject



41
42
43
# File 'lib/message_bus/client.rb', line 41

def subscriptions
  @subscriptions
end