Class: Sublist

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/server/sublist.rb

Overview

Sublist implementation for a publish-subscribe system. This container class holds subscriptions and matches candidate subjects to those subscriptions. Certain wildcards are supported for subscriptions. ‘*’ will match any given token at any level. ‘>’ will match all subsequent tokens. – See included test for example usage:

Defined Under Namespace

Classes: SublistLevel, SublistNode

Constant Summary collapse

PWC =

:nodoc:

'*'.freeze
FWC =
'>'.freeze
CACHE_SIZE =
4096
EMPTY_LEVEL =
SublistLevel.new({})

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Sublist

Returns a new instance of Sublist.



25
26
27
28
29
30
# File 'lib/nats/server/sublist.rb', line 25

def initialize(options = {})
  @count = 0
  @results = []
  @root = SublistLevel.new({})
  @cache = {}
end

Instance Attribute Details

#countObject (readonly)

Returns the value of attribute count.



18
19
20
# File 'lib/nats/server/sublist.rb', line 18

def count
  @count
end

Instance Method Details

#clear_cacheObject



44
# File 'lib/nats/server/sublist.rb', line 44

def clear_cache; @cache = {} if @cache; end

#disable_cacheObject

Ruby is a great language to make selective trade offs of space versus time. We do that here with a low tech front end cache. The cache holds results until it is exhausted or if the instance inserts or removes a subscription. The assumption is that the cache is best suited for high speed matching, and that once it is cleared out it will naturally fill with the high speed matches. This can obviously be improved with a smarter LRU structure that does not need to completely go away when a remove happens..

front end caching is on by default, but we can turn it off here if needed



42
# File 'lib/nats/server/sublist.rb', line 42

def disable_cache; @cache = nil; end

#enable_cacheObject



43
# File 'lib/nats/server/sublist.rb', line 43

def enable_cache;  @cache ||= {};  end

#insert(subject, subscriber) ⇒ Object

Insert a subscriber into the sublist for the given subject.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/nats/server/sublist.rb', line 54

def insert(subject, subscriber)
  # TODO - validate subject as correct.
  level, tokens = @root, subject.split('.')
  for token in tokens
    # This is slightly slower than direct if statements, but looks cleaner.
    case token
      when FWC then node = (level.fwc || (level.fwc = SublistNode.new([])))
      when PWC then node = (level.pwc || (level.pwc = SublistNode.new([])))
      else node  = ((level.nodes[token]) || (level.nodes[token] = SublistNode.new([])))
    end
    level = (node.next_level || (node.next_level = SublistLevel.new({})))
  end
  node.leaf_nodes.push(subscriber)
  @count += 1
  clear_cache # Clear the cache
  node.next_level = nil if node.next_level == EMPTY_LEVEL
end

#match(subject) ⇒ Object

Match a subject to all subscribers, return the array of matches.



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/nats/server/sublist.rb', line 79

def match(subject)
  return @cache[subject] if (@cache && @cache[subject])
  tokens = subject.split('.')
  @results.clear
  matchAll(@root, tokens)
  # FIXME: This is too low tech, will revisit when needed.
  if @cache
    prune_cache if @cache.size > CACHE_SIZE
    @cache[subject] = Array.new(@results).freeze # Avoid tampering of copy
  end
  @results
end

#prune_cacheObject

Random removal



47
48
49
50
51
# File 'lib/nats/server/sublist.rb', line 47

def prune_cache
  return unless @cache
  keys = @cache.keys
  @cache.delete(keys[rand(keys.size)])
end

#remove(subject, subscriber) ⇒ Object

Remove a given subscriber from the sublist for the given subject.



73
74
75
76
# File 'lib/nats/server/sublist.rb', line 73

def remove(subject, subscriber)
  return unless subject && subscriber
  remove_level(@root, subject.split('.'), subscriber)
end