Class: Krakow::Producer

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Utils::Lazy
Defined in:
lib/krakow/producer.rb,
lib/krakow/producer/http.rb

Overview

TCP based producer

Defined Under Namespace

Classes: Http

Instance Attribute Summary collapse

Attributes collapse

Instance Method Summary collapse

Methods included from Utils::Lazy

included

Methods included from Utils::Logging

level=, #log

Constructor Details

#initialize(args = {}) ⇒ Producer

Returns a new instance of Producer.



41
42
43
44
45
46
47
# File 'lib/krakow/producer.rb', line 41

def initialize(args={})
  super
  arguments[:connection_options] = {:features => {}, :config => {}, :options => {}}.merge(
    arguments.fetch(:connection_options, {})
  )
  connect
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



22
23
24
# File 'lib/krakow/producer.rb', line 22

def connection
  @connection
end

#notifierObject (readonly)

Returns the value of attribute notifier.



23
24
25
# File 'lib/krakow/producer.rb', line 23

def notifier
  @notifier
end

Instance Method Details

#connectObject

Establish connection to configured ‘host` and `port`

Returns:

  • nil



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/krakow/producer.rb', line 52

def connect
  @connecting = true
  info "Establishing connection to: #{host}:#{port}"
  begin
    con_args = connection_options[:options].dup.tap do |args|
      args[:host] = host
      args[:port] = port
      if(connection_options[:features])
        args[:features] = connection_options[:features]
      end
      if(connection_options[:config])
        args[:features_args] = connection_options[:config]
      end
    end
    @connection = Connection.new(con_args)
    @connection.init!
    self.link @connection
    info "Connection established: #{@connection}"
    nil
  rescue => e
    abort e
  end
  @connecting = false
end

#connected?TrueClass, FalseClass

Returns currently connected to server.

Returns:

  • (TrueClass, FalseClass)

    currently connected to server



83
84
85
86
87
88
89
90
91
92
# File 'lib/krakow/producer.rb', line 83

def connected?
  begin
    !!(!@connecting &&
      connection &&
      connection.alive? &&
      connection.connected?)
  rescue Celluloid::DeadActorError
    false
  end
end

#connection_failure(obj, reason) ⇒ TrueClass

Process connection failure and attempt reconnection

Returns:

  • (TrueClass)


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/krakow/producer.rb', line 97

def connection_failure(obj, reason)
  if(obj == connection && !reason.nil?)
    begin
      @connection = nil
      warn "Connection failure detected for #{host}:#{port} - #{reason}"
      obj.terminate if obj.alive?
      connect
    rescue => reason
      warn "Failed to establish connection to #{host}:#{port}. Pausing #{reconnect_interval} before retry"
      sleep reconnect_interval
      retry
    end
  end
  true
end

#connection_optionsHash

Returns the connection_options attribute.

Returns:

  • (Hash)

    the connection_options attribute



37
# File 'lib/krakow/producer.rb', line 37

attribute :connection_options, Hash, :default => ->{ Hash.new }

#connection_options?TrueClass, FalseClass

Returns truthiness of the connection_options attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the connection_options attribute



37
# File 'lib/krakow/producer.rb', line 37

attribute :connection_options, Hash, :default => ->{ Hash.new }

#hostString

Returns the host attribute.

Returns:

  • (String)

    the host attribute



32
# File 'lib/krakow/producer.rb', line 32

attribute :host, String, :required => true

#host?TrueClass, FalseClass

Returns truthiness of the host attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the host attribute



32
# File 'lib/krakow/producer.rb', line 32

attribute :host, String, :required => true

#port[String, Integer]

Returns the port attribute.

Returns:

  • ([String, Integer])

    the port attribute



33
# File 'lib/krakow/producer.rb', line 33

attribute :port, [String, Integer], :required => true

#port?TrueClass, FalseClass

Returns truthiness of the port attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the port attribute



33
# File 'lib/krakow/producer.rb', line 33

attribute :port, [String, Integer], :required => true

#producer_cleanupObject

Instance destructor

Returns:

  • nil



115
116
117
118
119
120
121
122
123
# File 'lib/krakow/producer.rb', line 115

def producer_cleanup
  debug 'Tearing down producer'
  if(connection && connection.alive?)
    connection.terminate
  end
  @connection = nil
  info 'Producer torn down'
  nil
end

#reconnect_intervalInteger

Returns the reconnect_interval attribute.

Returns:

  • (Integer)

    the reconnect_interval attribute



36
# File 'lib/krakow/producer.rb', line 36

attribute :reconnect_interval, Integer, :default => 5

#reconnect_interval?TrueClass, FalseClass

Returns truthiness of the reconnect_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the reconnect_interval attribute



36
# File 'lib/krakow/producer.rb', line 36

attribute :reconnect_interval, Integer, :default => 5

#reconnect_retriesInteger

Returns the reconnect_retries attribute.

Returns:

  • (Integer)

    the reconnect_retries attribute



35
# File 'lib/krakow/producer.rb', line 35

attribute :reconnect_retries, Integer, :default => 10

#reconnect_retries?TrueClass, FalseClass

Returns truthiness of the reconnect_retries attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the reconnect_retries attribute



35
# File 'lib/krakow/producer.rb', line 35

attribute :reconnect_retries, Integer, :default => 10

#to_sString

Returns stringify object.

Returns:

  • (String)

    stringify object



78
79
80
# File 'lib/krakow/producer.rb', line 78

def to_s
  "<#{self.class.name}:#{object_id} {#{host}:#{port}} T:#{topic}>"
end

#topicString

Returns the topic attribute.

Returns:

  • (String)

    the topic attribute



34
# File 'lib/krakow/producer.rb', line 34

attribute :topic, String, :required => true

#topic?TrueClass, FalseClass

Returns truthiness of the topic attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the topic attribute



34
# File 'lib/krakow/producer.rb', line 34

attribute :topic, String, :required => true

#write(*message) ⇒ Krakow::FrameType, TrueClass

Note:

if connection response wait is set to 0, writes will return a ‘true` value on completion

Write message to server

Parameters:

  • message (String)

    message to write

Returns:

Raises:



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/krakow/producer.rb', line 132

def write(*message)
  if(message.empty?)
    abort ArgumentError.new 'Expecting one or more messages to send. None provided.'
  end
  begin
    if(message.size > 1)
      debug 'Multiple message publish'
      connection.transmit(
        Command::Mpub.new(
          :topic_name => topic,
          :messages => message
        )
      )
    else
      debug 'Single message publish'
      connection.transmit(
        Command::Pub.new(
          :message => message.first,
          :topic_name => topic
        )
      )
    end
  rescue Celluloid::Task::TerminatedError
    abort Error::ConnectionUnavailable.new 'Connection is currently unavailable'
  rescue => e
    abort e
  end
end