Module: Witch

Defined in:
lib/witch.rb,
lib/version.rb

Defined Under Namespace

Classes: ConsumerDisconnected

Constant Summary collapse

VERSION =
'0.0.4'

Class Method Summary collapse

Class Method Details

.disconnectObject



87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/witch.rb', line 87

def disconnect
  return unless @id

  @callbacks[:once].keys.each do |event|
    unregister_from(event)
  end

  return unless @thread

  logger.warn '[consumer] Disconnecting!'
  lock.synchronize do
    @thread.kill 
  end
end

.init(options = {}) ⇒ Object



10
11
12
13
14
15
16
17
18
19
# File 'lib/witch.rb', line 10

def init(options = {})
  raise 'Already initialized!' if @queue
  @queue     = options.delete(:queue) || 'eventstream'
  @logger    = options.delete(:logger)
  @options   = options
  @callbacks = {:once => {}, :on => {}}
  @channels  = []
  @thread    = nil
  @id = Socket.gethostname + '-' + $$.to_s
end

.off(*events) ⇒ Object



47
48
49
50
51
52
# File 'lib/witch.rb', line 47

def off(*events)
  raise 'Not initialized' unless @callbacks
  events.each do |event|
    @callbacks[:on].delete(event.to_sym)
  end
end

.on(*events, &block) ⇒ Object



38
39
40
41
42
43
44
45
# File 'lib/witch.rb', line 38

def on(*events, &block)
  raise 'Not initialized' unless @callbacks
  events.each do |event|
    @callbacks[:on][event.to_sym] ||= []
    @callbacks[:on][event.to_sym].push(block)
  end
  listen(:on, @queue) unless listening_on?(@queue)
end

.once(*events, &block) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/witch.rb', line 28

def once(*events, &block)
  raise 'Not initialized' unless @callbacks
  events.each do |event|
    @callbacks[:once][event.to_sym] ||= []
    @callbacks[:once][event.to_sym].push(block)
    register_for(event)
  end
  listen(:once, @id) unless listening_on?(@id)
end

.publish(event, data = nil) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/witch.rb', line 54

def publish(event, data = nil)
  raise 'Not initialized' unless @queue

  str = event + '___' + data.to_s

  lock.synchronize do
    # first, send to everyone on the main queue
    logger.debug '[publisher] Publishing on queue ' + @queue
    connection.publish(@queue, str)

    # if one or more clients are subscribed to the 'once' queue, choose one and notify.
    if consumer = connection.smembers('subscribers:' + event.to_s).shuffle.first
      # connection.publish(['events', event, subscriber].join(':'), data)
      logger.info "[publisher] Found consumer for #{event}: #{consumer}"
      count = connection.publish(consumer, str)

      if count == 0
        logger.warn "[publisher] Consumer #{consumer} disconnected! Removing and retrying..."
        connection.srem('subscribers:' + event.to_s, consumer)
        raise ConsumerDisconnected, "name: #{consumer}"
      end
    end

  end # lock

rescue ConsumerDisconnected => e
  retry
end

.resetObject



21
22
23
24
25
26
# File 'lib/witch.rb', line 21

def reset
  return unless @queue
  disconnect # clears @callbacks[:once]
  @callbacks[:on] = {}
  @queue = nil
end

.waitObject



83
84
85
# File 'lib/witch.rb', line 83

def wait
  @thread.join
end