Class: MessageBus::Memory::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/backends/memory.rb

Defined Under Namespace

Classes: Listener

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Client

Returns a new instance of Client.



21
22
23
24
25
# File 'lib/message_bus/backends/memory.rb', line 21

def initialize(config)
  @mutex = Mutex.new
  @listeners = []
  reset!
end

Instance Method Details

#add(channel, value) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/message_bus/backends/memory.rb', line 27

def add(channel, value)
  listeners = nil
  id = nil
  sync do
    id = @global_id += 1
    chan(channel) << [id, value]
    listeners = @listeners.dup
  end
  msg = MessageBus::Message.new id, id, channel, value
  payload = msg.encode
  listeners.each { |l| l.push(payload) }
  id
end

#backlog(channel, backlog_id) ⇒ Object



59
60
61
# File 'lib/message_bus/backends/memory.rb', line 59

def backlog(channel, backlog_id)
  sync { chan(channel).select { |id, _| id > backlog_id } }
end

#clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object



53
54
55
56
57
# File 'lib/message_bus/backends/memory.rb', line 53

def clear_channel_backlog(channel, backlog_id, num_to_keep)
  oldest = backlog_id - num_to_keep
  sync { chan(channel).delete_if { |id, _| id <= oldest } }
  nil
end

#clear_global_backlog(backlog_id, num_to_keep) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/message_bus/backends/memory.rb', line 41

def clear_global_backlog(backlog_id, num_to_keep)
  if backlog_id > num_to_keep
    oldest = backlog_id - num_to_keep
    sync do
      @channels.each_value do |entries|
        entries.delete_if { |id, _| id <= oldest }
      end
    end
    nil
  end
end

#get_value(channel, id) ⇒ Object



71
72
73
# File 'lib/message_bus/backends/memory.rb', line 71

def get_value(channel, id)
  sync { chan(channel).find { |i, _| i == id }.last }
end

#global_backlog(backlog_id) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/message_bus/backends/memory.rb', line 63

def global_backlog(backlog_id)
  sync do
    @channels.dup.flat_map do |channel, messages|
      messages.select { |id, _| id > backlog_id }.map { |id, value| [id, channel, value] }
    end.sort
  end
end

#max_id(channel = nil) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/message_bus/backends/memory.rb', line 83

def max_id(channel = nil)
  if channel
    sync do
      if entry = chan(channel).last
        entry.first
      end
    end
  else
    sync { @global_id - 1 }
  end || 0
end

#reset!Object

Dangerous, drops the message_bus table containing the backlog if it exists.



76
77
78
79
80
81
# File 'lib/message_bus/backends/memory.rb', line 76

def reset!
  sync do
    @global_id = 0
    @channels = {}
  end
end

#subscribe {|listener| ... } ⇒ Object

Yields:

  • (listener)


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/message_bus/backends/memory.rb', line 95

def subscribe
  listener = Listener.new
  yield listener

  q = Queue.new
  sync do
    @listeners << q
  end

  listener.do_sub.call
  while msg = q.pop
    listener.do_message.call(nil, msg)
  end
  listener.do_unsub.call
  sync do
    @listeners.delete(q)
  end

  nil
end

#unsubscribeObject



116
117
118
# File 'lib/message_bus/backends/memory.rb', line 116

def unsubscribe
  sync { @listeners.each { |l| l.push(nil) } }
end