Module: Witch
- Defined in:
- lib/witch.rb,
lib/version.rb
Defined Under Namespace
Classes: ConsumerDisconnected
Constant Summary
collapse
- VERSION =
'0.0.4'
Class Method Summary
collapse
Class Method Details
.disconnect ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/witch.rb', line 87
def disconnect
return unless @id
@callbacks[:once].keys.each do |event|
unregister_from(event)
end
return unless @thread
logger.warn '[consumer] Disconnecting!'
lock.synchronize do
@thread.kill
end
end
|
.init(options = {}) ⇒ Object
10
11
12
13
14
15
16
17
18
19
|
# File 'lib/witch.rb', line 10
def init(options = {})
raise 'Already initialized!' if @queue
@queue = options.delete(:queue) || 'eventstream'
@logger = options.delete(:logger)
@options = options
@callbacks = {:once => {}, :on => {}}
@channels = []
@thread = nil
@id = Socket.gethostname + '-' + $$.to_s
end
|
.off(*events) ⇒ Object
47
48
49
50
51
52
|
# File 'lib/witch.rb', line 47
def off(*events)
raise 'Not initialized' unless @callbacks
events.each do |event|
@callbacks[:on].delete(event.to_sym)
end
end
|
.on(*events, &block) ⇒ Object
38
39
40
41
42
43
44
45
|
# File 'lib/witch.rb', line 38
def on(*events, &block)
raise 'Not initialized' unless @callbacks
events.each do |event|
@callbacks[:on][event.to_sym] ||= []
@callbacks[:on][event.to_sym].push(block)
end
listen(:on, @queue) unless listening_on?(@queue)
end
|
.once(*events, &block) ⇒ Object
28
29
30
31
32
33
34
35
36
|
# File 'lib/witch.rb', line 28
def once(*events, &block)
raise 'Not initialized' unless @callbacks
events.each do |event|
@callbacks[:once][event.to_sym] ||= []
@callbacks[:once][event.to_sym].push(block)
register_for(event)
end
listen(:once, @id) unless listening_on?(@id)
end
|
.publish(event, data = nil) ⇒ Object
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/witch.rb', line 54
def publish(event, data = nil)
raise 'Not initialized' unless @queue
str = event + '___' + data.to_s
lock.synchronize do
logger.debug '[publisher] Publishing on queue ' + @queue
connection.publish(@queue, str)
if consumer = connection.smembers('subscribers:' + event.to_s).shuffle.first
logger.info "[publisher] Found consumer for #{event}: #{consumer}"
count = connection.publish(consumer, str)
if count == 0
logger.warn "[publisher] Consumer #{consumer} disconnected! Removing and retrying..."
connection.srem('subscribers:' + event.to_s, consumer)
raise ConsumerDisconnected, "name: #{consumer}"
end
end
end
rescue ConsumerDisconnected => e
retry
end
|
.reset ⇒ Object
21
22
23
24
25
26
|
# File 'lib/witch.rb', line 21
def reset
return unless @queue
disconnect
@callbacks[:on] = {}
@queue = nil
end
|
.wait ⇒ Object
83
84
85
|
# File 'lib/witch.rb', line 83
def wait
@thread.join
end
|