Class: Fluent::GroongaInput::BaseInput

Inherits:
Object
  • Object
show all
Includes:
Configurable, DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/in_groonga.rb

Direct Known Subclasses

GQTPInput, HTTPInput

Constant Summary collapse

DEFAULT_EMIT_COMMANDS =
[
  /\Atable_/,
  /\Acolumn_/,
  "delete",
  /\Aplugin_/,
  "register",
  "truncate",
  "load",
]

Instance Method Summary collapse

Constructor Details

#initialize(input_plugin) ⇒ BaseInput

Returns a new instance of BaseInput.



112
113
114
# File 'lib/fluent/plugin/in_groonga.rb', line 112

def initialize(input_plugin)
  @input_plugin = input_plugin
end

Instance Method Details

#configure(conf) ⇒ Object



116
117
118
119
120
121
# File 'lib/fluent/plugin/in_groonga.rb', line 116

def configure(conf)
  super

  @port ||= default_port
  @real_port ||= default_port
end

#create_repeater(client) ⇒ Object



148
149
150
151
152
# File 'lib/fluent/plugin/in_groonga.rb', line 148

def create_repeater(client)
  repeater = Repeater.connect(@real_host, @real_port, client)
  repeater.attach(@loop)
  repeater
end

#emit(command, params) ⇒ Object



154
155
156
157
158
159
# File 'lib/fluent/plugin/in_groonga.rb', line 154

def emit(command, params)
  return unless emit_command?(command)
  @input_plugin.router.emit("groonga.command.#{command}",
                            Engine.now,
                            params)
end

#shutdownObject



141
142
143
144
145
146
# File 'lib/fluent/plugin/in_groonga.rb', line 141

def shutdown
  @loop.stop
  @socket.close
  @shutdown_notifier.signal
  @thread.join
end

#startObject



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/fluent/plugin/in_groonga.rb', line 123

def start
  listen_socket = TCPServer.new(@bind, @port)
  detach_multi_process do
    @loop = Coolio::Loop.new

    @socket = Coolio::TCPServer.new(listen_socket, nil,
                                    handler_class, self)
    @loop.attach(@socket)

    @shutdown_notifier = Coolio::AsyncWatcher.new
    @loop.attach(@shutdown_notifier)

    @thread = Thread.new do
      run
    end
  end
end