Class: Norikra::Listener::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/norikra/listener.rb

Direct Known Subclasses

Loopback, MemoryPool, Stdout

Constant Summary collapse

DEFAULT_ASYNC_INTERVAL =
0.1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(argument, query_name, query_group) ⇒ Base

Returns a new instance of Base.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/norikra/listener.rb', line 80

def initialize(argument, query_name, query_group)
  @argument = argument
  @query_name = query_name
  @query_group = query_group

  @async_interval = DEFAULT_ASYNC_INTERVAL

  @mode = if self.respond_to?(:process_sync)
            :sync
          elsif self.respond_to?(:process_async)
            :async
          else
            raise "BUG: Invalid custom listener '#{self.class.to_s}'"
          end

  @thread = nil
  @events = []
  @mutex = Mutex.new
  @running = true
end

Instance Attribute Details

#events_statistics=(value) ⇒ Object (writeonly)

Sets the attribute events_statistics

Parameters:

  • value

    the value to set the attribute events_statistics to.



72
73
74
# File 'lib/norikra/listener.rb', line 72

def events_statistics=(value)
  @events_statistics = value
end

Class Method Details

.labelObject

attr_writer :engine attr_writer :output_pool

Raises:

  • (NotImplementedError)


76
77
78
# File 'lib/norikra/listener.rb', line 76

def self.label
  raise NotImplementedError
end

Instance Method Details

#backgroundObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/norikra/listener.rb', line 108

def background
  trace "backgroupd thread starts", query: @query_name
  while @running
    events_empty = true
    events = nil
    @mutex.synchronize do
      events = @events
      @events = []
    end
    unless events.empty?
      events_empty = false
      trace("calling #process_async"){ {listener: self.class, query: @query_name, events: events.size} }
      process_async(events)
    end
    sleep @async_interval if events_empty
  end
rescue => e
  error "exception in listener background thread, stopped", listener: self.class, query: @query_name, error: e
end

#push(events) ⇒ Object



128
129
130
131
132
# File 'lib/norikra/listener.rb', line 128

def push(events)
  @mutex.synchronize do
    @events += events
  end
end

#shutdownObject



134
135
136
137
138
139
# File 'lib/norikra/listener.rb', line 134

def shutdown
  trace "stopping listener", query: @query_name
  @running = false
  @thread.join if @thread
  @thread = nil
end

#startObject



101
102
103
104
105
106
# File 'lib/norikra/listener.rb', line 101

def start
  if @mode == :async
    trace "starting thread to process events in background", query: @query_name
    @thread = Thread.new(&method(:background))
  end
end

#type_convert(value) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/norikra/listener.rb', line 141

def type_convert(value)
  if value.respond_to?(:getUnderlying)
    value = value.getUnderlying
  end

  trace("converting"){ { value: value } }

  if value.nil?
    value
  elsif value.respond_to?(:to_hash)
    Hash[ value.to_hash.map{|k,v| [ Norikra::Field.unescape_name(k), type_convert(v)] } ]
  elsif value.respond_to?(:to_a)
    value.to_a.map{|v| type_convert(v) }
  elsif value.respond_to?(:force_encoding)
    value.force_encoding('UTF-8')
  else
    value
  end
end

#update(new_events, old_events) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/norikra/listener.rb', line 161

def update(new_events, old_events)
  t = Time.now.to_i
  if @mode == :sync
    news = new_events.map{|e| type_convert(e) }
    olds = if old_events.respond_to?(:map)
             old_events.map{|e| type_convert(e) }
           else
             type_convert(old_events)
           end
    trace("produced events"){ { listener: self.class, query: @query_name, group: @query_group, news: news, olds: olds } }
    process_sync(news, olds)
    @events_statistics[:output] += news.size
  else # async
    events = new_events.map{|e| [t, type_convert(e)]}
    trace("pushed events"){ { listener: self.class, query: @query_name, group: @query_group, event: events } }
    push(events)
    @events_statistics[:output] += events.size
  end
end