Class: MessageBus::Backends::Memory::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/backends/memory.rb

Defined Under Namespace

Classes: Channel, Listener

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(_config) ⇒ Client

Returns a new instance of Client.



55
56
57
58
59
60
61
62
63
64
# File 'lib/message_bus/backends/memory.rb', line 55

def initialize(_config)
  @mutex = Mutex.new
  @listeners = []
  @timer_thread = MessageBus::TimerThread.new
  @timer_thread.on_error do |e|
    logger.warn "Failed to process job: #{e} #{e.backtrace}"
  end
  @timer_thread.every(1) { expire }
  reset!
end

Instance Attribute Details

#max_backlog_ageObject

Returns the value of attribute max_backlog_age.



16
17
18
# File 'lib/message_bus/backends/memory.rb', line 16

def max_backlog_age
  @max_backlog_age
end

Instance Method Details

#add(channel, value, max_backlog_age:) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/message_bus/backends/memory.rb', line 66

def add(channel, value, max_backlog_age:)
  listeners = nil
  id = nil
  sync do
    id = @global_id += 1
    channel_object = chan(channel)
    channel_object.backlog << [id, value, Time.now]
    if max_backlog_age
      channel_object.ttl = max_backlog_age
    end
    listeners = @listeners.dup
  end
  msg = MessageBus::Message.new id, id, channel, value
  payload = msg.encode
  listeners.each { |l| l.push(payload) }
  id
end

#backlog(channel, backlog_id) ⇒ Object



108
109
110
# File 'lib/message_bus/backends/memory.rb', line 108

def backlog(channel, backlog_id)
  sync { chan(channel).backlog.select { |id, _| id > backlog_id } }
end

#clear_channel_backlog(channel, backlog_id, num_to_keep) ⇒ Object



102
103
104
105
106
# File 'lib/message_bus/backends/memory.rb', line 102

def clear_channel_backlog(channel, backlog_id, num_to_keep)
  oldest = backlog_id - num_to_keep
  sync { chan(channel).backlog.delete_if { |id, _| id <= oldest } }
  nil
end

#clear_global_backlog(backlog_id, num_to_keep) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/message_bus/backends/memory.rb', line 90

def clear_global_backlog(backlog_id, num_to_keep)
  if backlog_id > num_to_keep
    oldest = backlog_id - num_to_keep
    sync do
      @channels.each_value do |channel|
        channel.backlog.delete_if { |id, _| id <= oldest }
      end
    end
    nil
  end
end

#expireObject



84
85
86
87
88
# File 'lib/message_bus/backends/memory.rb', line 84

def expire
  sync do
    @channels.delete_if { |_name, channel| channel.expired? }
  end
end

#expire_all_backlogs!Object

use with extreme care, will nuke all of the data



133
134
135
136
137
# File 'lib/message_bus/backends/memory.rb', line 133

def expire_all_backlogs!
  sync do
    @channels = {}
  end
end

#get_value(channel, id) ⇒ Object



120
121
122
# File 'lib/message_bus/backends/memory.rb', line 120

def get_value(channel, id)
  sync { chan(channel).backlog.find { |i, _| i == id }[1] }
end

#global_backlog(backlog_id) ⇒ Object



112
113
114
115
116
117
118
# File 'lib/message_bus/backends/memory.rb', line 112

def global_backlog(backlog_id)
  sync do
    @channels.dup.flat_map do |channel_name, channel|
      channel.backlog.select { |id, _| id > backlog_id }.map { |id, value| [id, channel_name, value] }
    end.sort
  end
end

#max_id(channel = nil) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
# File 'lib/message_bus/backends/memory.rb', line 139

def max_id(channel = nil)
  if channel
    sync do
      if entry = chan(channel).backlog.last
        entry.first
      end
    end
  else
    sync { @global_id - 1 }
  end || 0
end

#reset!Object

Dangerous, drops the message_bus table containing the backlog if it exists.



125
126
127
128
129
130
# File 'lib/message_bus/backends/memory.rb', line 125

def reset!
  sync do
    @global_id = 0
    @channels = {}
  end
end

#subscribe {|listener| ... } ⇒ Object

Yields:

  • (listener)


151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/message_bus/backends/memory.rb', line 151

def subscribe
  listener = Listener.new
  yield listener

  q = Queue.new
  sync do
    @listeners << q
  end

  listener.do_sub.call
  while msg = q.pop
    listener.do_message.call(nil, msg)
  end
  listener.do_unsub.call
  sync do
    @listeners.delete(q)
  end

  nil
end

#unsubscribeObject



172
173
174
# File 'lib/message_bus/backends/memory.rb', line 172

def unsubscribe
  sync { @listeners.each { |l| l.push(nil) } }
end