Class: Droonga::Client::Connection::DroongaProtocol::Thread

Inherits:
Object
  • Object
show all
Defined in:
lib/droonga/client/connection/droonga-protocol/thread.rb

Defined Under Namespace

Classes: Receiver, Request

Instance Method Summary collapse

Constructor Details

#initialize(host, port, tag, options = {}) ⇒ Thread

Returns a new instance of Thread.



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 36

def initialize(host, port, tag, options={})
  @host = host
  @port = port
  @tag = tag
  default_options = {
    :timeout => 1,
  }
  @options = default_options.merge(options)
  @logger = Fluent::Logger::FluentLogger.new(@tag, @options)
  @timeout = @options[:timeout]
end

Instance Method Details

#closeObject



115
116
117
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 115

def close
  @logger.close
end

#request(message, options = {}, &block) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 48

def request(message, options={}, &block)
  receiver = create_receiver
  message = message.dup
  message["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga"
  send(message, options)

  sync = block.nil?
  if sync
    responses = []
    receive(receiver, options) do |response|
      responses << response
    end
    if responses.size > 1
      responses
    else
      responses.first
    end
  else
    thread = ::Thread.new do
      receive(receiver, options, &block)
    end
    Request.new(thread)
  end
end

#send(message, options = {}, &block) ⇒ Object



107
108
109
110
111
112
113
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 107

def send(message, options={}, &block)
  if message["id"].nil? or message["date"].nil?
    message = message.merge("id"   => Time.now.to_f.to_s,
                            "date" => Time.now)
  end
  @logger.post("message", message)
end

#subscribe(message, options = {}, &block) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 73

def subscribe(message, options={}, &block)
  receiver = create_receiver
  receive_end_point = "#{receiver.host}:#{receiver.port}/droonga"
  message = message.dup
  message["replyTo"] = receive_end_point
  message["from"] = receive_end_point
  send(message, options)

  receive_options = {
    :timeout => nil,
  }
  sync = block.nil?
  if sync
    Enumerator.new do |yielder|
      loop do
        receiver.receive(receive_options) do |object|
          yielder << object
        end
      end
    end
  else
    thread = ::Thread.new do
      begin
        loop do
          receiver.receive(receive_options, &block)
        end
      ensure
        receiver.close
      end
    end
    Request.new(thread)
  end
end