Class: Droonga::Client::Connection::DroongaProtocol::Thread

Inherits:
Object
  • Object
show all
Defined in:
lib/droonga/client/connection/droonga-protocol/thread.rb

Defined Under Namespace

Classes: Receiver, Request

Instance Method Summary collapse

Constructor Details

#initialize(host, port, tag, options = {}) ⇒ Thread

Returns a new instance of Thread.



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 36

def initialize(host, port, tag, options={})
  @host = host
  @port = port
  @tag = tag
  default_options = {
    :timeout => 1,
  }
  @options = default_options.merge(options)
  @logger = Fluent::Logger::FluentLogger.new(@tag, @options)
  @timeout = @options[:timeout]
end

Instance Method Details

#closeObject



113
114
115
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 113

def close
  @logger.close
end

#request(message, options = {}, &block) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 48

def request(message, options={}, &block)
  receiver = create_receiver
  message = message.dup
  message["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga"
  send(message, options)

  sync = block.nil?
  if sync
    responses = []
    receive(receiver, options) do |response|
      responses << response
    end
    if responses.size > 1
      responses
    else
      responses.first
    end
  else
    thread = ::Thread.new do
      receive(receiver, options, &block)
    end
    Request.new(thread)
  end
end

#send(message, options = {}, &block) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 105

def send(message, options={}, &block)
  if message["id"].nil? or message["date"].nil?
    message = message.merge("id"   => Time.now.to_f.to_s,
                            "date" => Time.now)
  end
  @logger.post("message", message)
end

#subscribe(message, options = {}, &block) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 73

def subscribe(message, options={}, &block)
  receiver = create_receiver
  message = message.dup
  message["from"] = "#{receiver.host}:#{receiver.port}/droonga"
  send(message, options)

  receive_options = {
    :timeout => nil,
  }
  sync = block.nil?
  if sync
    Enumerator.new do |yielder|
      loop do
        receiver.receive(receive_options) do |object|
          yielder << object
        end
      end
    end
  else
    thread = ::Thread.new do
      begin
        loop do
          receiver.receive(receive_options, &block)
        end
      ensure
        receiver.close
      end
    end
    Request.new(thread)
  end
end