Class: Norikra::Listener::Base
- Inherits:
-
Object
- Object
- Norikra::Listener::Base
show all
- Defined in:
- lib/norikra/listener.rb
Constant Summary
collapse
- DEFAULT_ASYNC_INTERVAL =
0.1
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(argument, query_name, query_group) ⇒ Base
Returns a new instance of Base.
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/norikra/listener.rb', line 80
def initialize(argument, query_name, query_group)
@argument = argument
@query_name = query_name
@query_group = query_group
@async_interval = DEFAULT_ASYNC_INTERVAL
@mode = if self.respond_to?(:process_sync)
:sync
elsif self.respond_to?(:process_async)
:async
else
raise "BUG: Invalid custom listener '#{self.class.to_s}'"
end
@thread = nil
@events = []
@mutex = Mutex.new
@running = true
end
|
Instance Attribute Details
#events_statistics=(value) ⇒ Object
Sets the attribute events_statistics
72
73
74
|
# File 'lib/norikra/listener.rb', line 72
def events_statistics=(value)
@events_statistics = value
end
|
Class Method Details
.label ⇒ Object
attr_writer :engine attr_writer :output_pool
76
77
78
|
# File 'lib/norikra/listener.rb', line 76
def self.label
raise NotImplementedError
end
|
Instance Method Details
#background ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/norikra/listener.rb', line 108
def background
trace "backgroupd thread starts", query: @query_name
while @running
events_empty = true
events = nil
@mutex.synchronize do
events = @events
@events = []
end
unless events.empty?
events_empty = false
trace("calling #process_async"){ {listener: self.class, query: @query_name, events: events.size} }
process_async(events)
end
sleep @async_interval if events_empty
end
rescue => e
error "exception in listener background thread, stopped", listener: self.class, query: @query_name, error: e
end
|
#push(events) ⇒ Object
128
129
130
131
132
|
# File 'lib/norikra/listener.rb', line 128
def push(events)
@mutex.synchronize do
@events += events
end
end
|
#shutdown ⇒ Object
134
135
136
137
138
139
|
# File 'lib/norikra/listener.rb', line 134
def shutdown
trace "stopping listener", query: @query_name
@running = false
@thread.join if @thread
@thread = nil
end
|
#start ⇒ Object
101
102
103
104
105
106
|
# File 'lib/norikra/listener.rb', line 101
def start
if @mode == :async
trace "starting thread to process events in background", query: @query_name
@thread = Thread.new(&method(:background))
end
end
|
#type_convert(value) ⇒ Object
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
# File 'lib/norikra/listener.rb', line 141
def type_convert(value)
if value.respond_to?(:getUnderlying)
value = value.getUnderlying
end
trace("converting"){ { value: value } }
if value.nil?
value
elsif value.respond_to?(:to_hash)
Hash[ value.to_hash.map{|k,v| [ Norikra::Field.unescape_name(k), type_convert(v)] } ]
elsif value.respond_to?(:to_a)
value.to_a.map{|v| type_convert(v) }
elsif value.respond_to?(:force_encoding)
value.force_encoding('UTF-8')
else
value
end
end
|
#update(new_events, old_events) ⇒ Object
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
# File 'lib/norikra/listener.rb', line 161
def update(new_events, old_events)
t = Time.now.to_i
if @mode == :sync
news = new_events.map{|e| type_convert(e) }
olds = if old_events.respond_to?(:map)
old_events.map{|e| type_convert(e) }
else
type_convert(old_events)
end
trace("produced events"){ { listener: self.class, query: @query_name, group: @query_group, news: news, olds: olds } }
process_sync(news, olds)
@events_statistics[:output] += news.size
else events = new_events.map{|e| [t, type_convert(e)]}
trace("pushed events"){ { listener: self.class, query: @query_name, group: @query_group, event: events } }
push(events)
@events_statistics[:output] += events.size
end
end
|