Class: Litecable

Inherits:
Object
  • Object
show all
Includes:
Litemetric::Measurable, Litesupport::Liteconnection
Defined in:
lib/litestack/litecable.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  config_path: "./litecable.yml",
  path: Litesupport.root.join("cable.sqlite3"),
  sync: 0,
  mmap_size: 16 * 1024 * 1024, # 16MB
  expire_after: 5, # remove messages older than 5 seconds
  listen_interval: 0.05, # check new messages every 50 milliseconds
  metrics: false
}

Instance Method Summary collapse

Methods included from Litemetric::Measurable

#capture, #capture_snapshot, #collect_metrics, #create_snapshotter, #measure, #metrics_identifier

Methods included from Litesupport::Liteconnection

#close, #journal_mode, #options, #path, #size, #synchronous

Methods included from Litesupport::Forkable

#_fork

Constructor Details

#initialize(options = {}) ⇒ Litecable

Returns a new instance of Litecable.



24
25
26
27
28
# File 'lib/litestack/litecable.rb', line 24

def initialize(options = {})
  @messages = Litesupport::Pool.new(1) { [] }
  init(options)
  collect_metrics if @options[:metrics]
end

Instance Method Details

#broadcast(channel, payload = nil) ⇒ Object

broadcast a message to a specific channel



31
32
33
34
35
36
37
# File 'lib/litestack/litecable.rb', line 31

def broadcast(channel, payload = nil)
  # group meesages and only do broadcast every 10 ms
  # but broadcast locally normally
  @messages.acquire { |msgs| msgs << [channel.to_s, Oj.dump(payload)] }
  capture(:broadcast, channel)
  local_broadcast(channel, payload)
end

#subscribe(channel, subscriber, success_callback = nil) ⇒ Object

subscribe to a channel, optionally providing a success callback proc



40
41
42
43
44
45
46
47
# File 'lib/litestack/litecable.rb', line 40

def subscribe(channel, subscriber, success_callback = nil)
  @subscribers.acquire do |subs|
    subs[channel] = {} unless subs[channel]
    subs[channel][subscriber] = true
  end
  success_callback&.call
  capture(:subscribe, channel)
end

#unsubscribe(channel, subscriber) ⇒ Object

unsubscribe from a channel



50
51
52
53
54
55
56
57
58
59
# File 'lib/litestack/litecable.rb', line 50

def unsubscribe(channel, subscriber)
  @subscribers.acquire { |subs|
    begin
      subs[channel].delete(subscriber)
    rescue
      nil
    end
  }
  capture(:unsubscribe, channel)
end