Class: Fluent::GroongaInput::BaseInput

Inherits:
Object
  • Object
show all
Includes:
Configurable, DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/in_groonga.rb

Direct Known Subclasses

GQTPInput, HTTPInput

Constant Summary collapse

DEFAULT_EMIT_COMMANDS =
[
  "clearlock",
  "column_copy",
  "column_create",
  "column_remove",
  "column_rename",
  "config_delete",
  "config_set",
  "delete",
  "load",
  "lock_acquire",
  "lock_clear",
  "lock_release",
  "logical_table_remove",
  "object_remove",
  "plugin_register",
  "plugin_unregister",
  "register",
  "reindex",
  "table_copy",
  "table_create",
  "table_remove",
  "table_rename",
  "truncate",
]

Instance Method Summary collapse

Constructor Details

#initialize(input_plugin) ⇒ BaseInput

Returns a new instance of BaseInput.



124
125
126
# File 'lib/fluent/plugin/in_groonga.rb', line 124

def initialize(input_plugin)
  @input_plugin = input_plugin
end

Instance Method Details

#configure(conf) ⇒ Object



128
129
130
131
132
133
# File 'lib/fluent/plugin/in_groonga.rb', line 128

def configure(conf)
  super

  @port ||= default_port
  @real_port ||= default_port
end

#create_repeater(client) ⇒ Object



160
161
162
163
164
# File 'lib/fluent/plugin/in_groonga.rb', line 160

def create_repeater(client)
  repeater = Repeater.connect(@real_host, @real_port, client)
  repeater.attach(@loop)
  repeater
end

#emit(command, params) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/fluent/plugin/in_groonga.rb', line 166

def emit(command, params)
  normalized_command = command.split(".")[0]
  return unless emit_command?(normalized_command)
  case @input_plugin.command_name_position
  when :tag
    tag = "groonga.command.#{normalized_command}"
    record = params
  else
    tag = "groonga.command"
    record = {
      "name" => normalized_command,
      "arguments" => params
    }
  end
  @input_plugin.router.emit(tag,
                            Engine.now,
                            record)
end

#shutdownObject



153
154
155
156
157
158
# File 'lib/fluent/plugin/in_groonga.rb', line 153

def shutdown
  @loop.stop
  @socket.close
  @shutdown_notifier.signal
  @thread.join
end

#startObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/fluent/plugin/in_groonga.rb', line 135

def start
  listen_socket = TCPServer.new(@bind, @port)
  detach_multi_process do
    @loop = Coolio::Loop.new

    @socket = Coolio::TCPServer.new(listen_socket, nil,
                                    handler_class, self)
    @loop.attach(@socket)

    @shutdown_notifier = Coolio::AsyncWatcher.new
    @loop.attach(@shutdown_notifier)

    @thread = Thread.new do
      run
    end
  end
end