Class: LogStash::Outputs::Iothub

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/iothub.rb

Instance Method Summary collapse

Instance Method Details

#closeObject



66
67
68
69
70
71
72
73
74
# File 'lib/logstash/outputs/iothub.rb', line 66

def close()
  begin
    # waiting callbacks for all sent messages.
    sleep 1 unless $wait_queue.empty?

    @client.close() if @client
  rescue => e
  end
end

#receive(event) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/logstash/outputs/iothub.rb', line 53

def receive(event)
  m = event.to_json
  $wait_queue[m] = true
  msg = Message.new(m)
  msg.setExpiryTime(3000)
  @client.sendEventAsync(
    msg,
    EventCallback.new, m)

  return "Event received"
end

#registerObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/logstash/outputs/iothub.rb', line 31

def register
  # Todo: Get hang in an open with AMQPS.
  #protocol = IotHubClientProtocol::AMQPS

  protocol = IotHubClientProtocol::MQTT

  @client = DeviceClient.new(@connection_string, protocol)

  # Todo: use params for AMQPS protocol.
  # @client.setOption(
  #   "SetCertificatePath",
  #   "/Users/tac/Desktop/logstash-output-iothub/cert.crt")

  # @client.setOption(
  #   "SetSASTokenExpiryTime",
  #   @sas_token_expiry_time_sec)

  @client.open()
end