Class: MessageBus::Memory::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/backends/memory.rb

Defined Under Namespace

Classes: Listener

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Client

Returns a new instance of Client.



20
21
22
23
24
# File 'lib/message_bus/backends/memory.rb', line 20

def initialize(config)
  @mutex = Mutex.new
  @listeners = []
  reset!
end

Instance Method Details

#add(channel, value) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/message_bus/backends/memory.rb', line 26

def add(channel, value)
  listeners = nil
  id = nil
  sync do
    id = @global_id += 1
    chan(channel) << [id, value]
    listeners = @listeners.dup
  end
  msg = MessageBus::Message.new id, id, channel, value
  payload = msg.encode
  listeners.each{|l| l.push(payload)}
  id
end

#backlog(channel, backlog_id) ⇒ Object



58
59
60
# File 'lib/message_bus/backends/memory.rb', line 58

def backlog(channel, backlog_id)
  sync{chan(channel).select{|id, _| id > backlog_id}}
end

#clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object



52
53
54
55
56
# File 'lib/message_bus/backends/memory.rb', line 52

def clear_channel_backlog(channel, backlog_id, num_to_keep)
  oldest = backlog_id - num_to_keep
  sync{chan(channel).delete_if{|id, _| id <= oldest}}
  nil
end

#clear_global_backlog(backlog_id, num_to_keep) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
# File 'lib/message_bus/backends/memory.rb', line 40

def clear_global_backlog(backlog_id, num_to_keep)
  if backlog_id > num_to_keep
    oldest = backlog_id - num_to_keep
    sync do
      @channels.each_value do |entries|
        entries.delete_if{|id, _| id <= oldest}
      end
    end
    nil
  end
end

#get_value(channel, id) ⇒ Object



70
71
72
# File 'lib/message_bus/backends/memory.rb', line 70

def get_value(channel, id)
  sync{chan(channel).find{|i, _| i == id}.last}
end

#global_backlog(backlog_id) ⇒ Object



62
63
64
65
66
67
68
# File 'lib/message_bus/backends/memory.rb', line 62

def global_backlog(backlog_id)
  sync do
    @channels.dup.flat_map do |channel, messages|
      messages.select{|id, _| id > backlog_id}.map{|id, value| [id, channel, value]}
    end.sort
  end
end

#max_id(channel = nil) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/message_bus/backends/memory.rb', line 82

def max_id(channel=nil)
  if channel
    sync do
      if entry = chan(channel).last
        entry.first
      end
    end
  else
    sync{@global_id - 1}
  end || 0
end

#reset!Object

Dangerous, drops the message_bus table containing the backlog if it exists.



75
76
77
78
79
80
# File 'lib/message_bus/backends/memory.rb', line 75

def reset!
  sync do
    @global_id = 0
    @channels = {}
  end
end

#subscribe {|listener| ... } ⇒ Object

Yields:

  • (listener)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/message_bus/backends/memory.rb', line 94

def subscribe
  listener = Listener.new
  yield listener

  q = Queue.new
  sync do
    @listeners << q
  end

  listener.do_sub.call
  while msg = q.pop
    listener.do_message.call(nil, msg)
  end
  listener.do_unsub.call
  sync do
    @listeners.delete(q)
  end

  nil
end

#unsubscribeObject



115
116
117
# File 'lib/message_bus/backends/memory.rb', line 115

def unsubscribe
  sync{@listeners.each{|l| l.push(nil)}}
end