Class: Droonga::WatchHandler
Instance Method Summary
collapse
#prefer_synchronous?
#command, extended, #inherited, #method_name, #processable?, #repository
Methods inherited from Plugin
#process, #processable?, #shutdown, #start
Constructor Details
Returns a new instance of WatchHandler.
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/droonga/plugin/handler/watch.rb', line 27
def initialize(*args)
super
if $0 !~ /\AServer/
ensure_schema_created
else
until @context["Keyword"]
sleep 0.1
end
sleep 1
end
@watcher = Watcher.new(@context)
@sweeper = Sweeper.new(@context)
end
|
Instance Method Details
#feed(message, messenger) ⇒ Object
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/droonga/plugin/handler/watch.rb', line 75
def feed(message, messenger)
request = message.request
@watcher.feed(:targets => request["targets"]) do |route, subscribers|
notification_message = {
"to" => subscribers,
"body" => request,
}
notification_message = message.raw.merge(notification_message)
messenger.forward(notification_message,
"to" => route, "type" => "watch.notification")
end
end
|
#subscribe(message, messenger) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/droonga/plugin/handler/watch.rb', line 50
def subscribe(message, messenger)
subscriber, condition, query, route = parse_message(message)
normalized_request = {
:subscriber => subscriber,
:condition => condition,
:query => query,
:route => route,
}
@watcher.subscribe(normalized_request)
messenger.emit([true])
end
|
#sweep(message, messenger) ⇒ Object
89
90
91
|
# File 'lib/droonga/plugin/handler/watch.rb', line 89
def sweep(message, messenger)
@sweeper.sweep_expired_subscribers
end
|
#unsubscribe(message, messenger) ⇒ Object
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/droonga/plugin/handler/watch.rb', line 63
def unsubscribe(message, messenger)
subscriber, condition, query, route = parse_message(message)
normalized_request = {
:subscriber => subscriber,
:condition => condition,
:query => query,
}
@watcher.unsubscribe(normalized_request)
messenger.emit([true])
end
|