Class: MessageBus::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Client

Returns a new instance of Client.



6
7
8
9
10
11
12
13
14
15
# File 'lib/message_bus/client.rb', line 6

def initialize(opts)
  self.client_id = opts[:client_id]
  self.user_id = opts[:user_id]
  self.group_ids = opts[:group_ids] || []
  self.site_id = opts[:site_id]
  self.seq = opts[:seq].to_i
  self.connect_time = Time.now
  @bus = opts[:message_bus] || MessageBus
  @subscriptions = {}
end

Instance Attribute Details

#async_responseObject

Returns the value of attribute async_response.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def async_response
  @async_response
end

#cleanup_timerObject

Returns the value of attribute cleanup_timer.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def cleanup_timer
  @cleanup_timer
end

#client_idObject

Returns the value of attribute client_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def client_id
  @client_id
end

#connect_timeObject

Returns the value of attribute connect_time.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def connect_time
  @connect_time
end

#group_idsObject

Returns the value of attribute group_ids.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def group_ids
  @group_ids
end

#headersObject

Returns the value of attribute headers.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def headers
  @headers
end

#ioObject

Returns the value of attribute io.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def io
  @io
end

#seqObject

Returns the value of attribute seq.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def seq
  @seq
end

#site_idObject

Returns the value of attribute site_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def site_id
  @site_id
end

#subscribed_setsObject

Returns the value of attribute subscribed_sets.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def subscribed_sets
  @subscribed_sets
end

#user_idObject

Returns the value of attribute user_id.



2
3
4
# File 'lib/message_bus/client.rb', line 2

def user_id
  @user_id
end

Instance Method Details

#<<(msg) ⇒ Object



58
59
60
# File 'lib/message_bus/client.rb', line 58

def <<(msg)
  write_and_close messages_to_json([msg])
end

#allowed?(msg) ⇒ Boolean

Returns:

  • (Boolean)


66
67
68
69
70
71
72
73
74
75
76
# File 'lib/message_bus/client.rb', line 66

def allowed?(msg)
  allowed = !msg.user_ids || msg.user_ids.include?(self.user_id)
  allowed &&= !msg.client_ids || msg.client_ids.include?(self.client_id)
  allowed && (
    msg.group_ids.nil? ||
    msg.group_ids.length == 0 ||
    (
      msg.group_ids - self.group_ids
    ).length < msg.group_ids.length
  )
end

#backlogObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/message_bus/client.rb', line 88

def backlog
  r = []
  @subscriptions.each do |k,v|
    next if v.to_i < 0
    messages = @bus.backlog(k,v)
    messages.each do |msg|
      r << msg if allowed?(msg)
    end
  end
  # stats message for all newly subscribed
  status_message = nil
  @subscriptions.each do |k,v|
    if v.to_i == -1
      status_message ||= {}
      status_message[k] = @bus.last_id(k)
    end
  end
  r << MessageBus::Message.new(-1, -1, '/__status', status_message) if status_message

  r.map!{|msg| filter(msg)}.compact!
  r || []
end

#cancelObject



17
18
19
20
21
22
23
24
# File 'lib/message_bus/client.rb', line 17

def cancel
  if cleanup_timer
    # concurrency may nil cleanup timer
    cleanup_timer.cancel rescue nil
    self.cleanup_timer = nil
  end
  ensure_closed!
end

#closeObject



39
40
41
42
# File 'lib/message_bus/client.rb', line 39

def close
  return unless in_async?
  write_and_close "[]"
end

#closedObject



44
45
46
# File 'lib/message_bus/client.rb', line 44

def closed
  !@async_response
end

#ensure_closed!Object



30
31
32
33
34
35
36
37
# File 'lib/message_bus/client.rb', line 30

def ensure_closed!
  return unless in_async?
  write_and_close "[]"
rescue
  # we may have a dead socket, just nil the @io
  @io = nil
  @async_response = nil
end

#filter(msg) ⇒ Object



78
79
80
81
82
83
84
85
86
# File 'lib/message_bus/client.rb', line 78

def filter(msg)
  filter = @bus.client_filter(msg.channel)

  if filter
    filter.call(self.user_id, msg)
  else
    msg
  end
end

#in_async?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/message_bus/client.rb', line 26

def in_async?
  @async_response || @io
end

#subscribe(channel, last_seen_id) ⇒ Object



48
49
50
51
52
# File 'lib/message_bus/client.rb', line 48

def subscribe(channel, last_seen_id)
  last_seen_id = nil if last_seen_id == ""
  last_seen_id ||= @bus.last_id(channel)
  @subscriptions[channel] = last_seen_id.to_i
end

#subscriptionsObject



54
55
56
# File 'lib/message_bus/client.rb', line 54

def subscriptions
  @subscriptions
end