Class: Wukong::Load::KafkaLoader

Inherits:
Loader
  • Object
show all
Defined in:
lib/wukong-load/loaders/kafka.rb

Overview

Loads data into Kafka.

Uses the kafka-rb gem to create a Kafka::Producer to write to Kafka.

Allows loading records into a given topic on a given partition. Records can have fields _topic and _partition which override the given topic and partition on a per-record basis.

The names of these fields within each record (_topic and _partition) can be customized.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Loader

#process

Instance Attribute Details

#producerObject

The Kafka producer used to send messages to Kafka.



45
46
47
# File 'lib/wukong-load/loaders/kafka.rb', line 45

def producer
  @producer
end

Instance Method Details

#load(record) ⇒ Object

Load a single record into Kafka.

Parameters:

  • record (Hash)


65
66
67
68
69
70
71
72
73
74
# File 'lib/wukong-load/loaders/kafka.rb', line 65

def load record
  begin
    topic     = topic_for(record)
    partition = partition_for(record)
    bytes     = producer.send(topic, messages_for(record), :partition => partition)
    log.info("Wrote #{bytes} bytes to #{topic}/#{partition}")
  rescue => e
    handle_error(record, e)
  end
end

#messages_for(record) ⇒ Object

:nodoc:



82
83
84
# File 'lib/wukong-load/loaders/kafka.rb', line 82

def messages_for record
  [Kafka::Message.new(MultiJson.dump(record))]
end

#partition_for(record) ⇒ Object

:nodoc:



87
88
89
# File 'lib/wukong-load/loaders/kafka.rb', line 87

def partition_for record
  record[partition_field] ? record[partition_field].to_i : partition
end

#setupObject

Creates the producer.



48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/wukong-load/loaders/kafka.rb', line 48

def setup
  begin
    require 'kafka'
  rescue LoadError => e
    raise Error.new("Please ensure that the 'kafka-rb' gem is installed and available (in your Gemfile)")
  end
  log.debug("Connecting to Kafka broker at #{host}:#{port}...")
  begin
    self.producer = Kafka::MultiProducer.new(:host => host, :port => port)
  rescue => e
    raise Error.new(e.message)
  end
end

#topic_for(record) ⇒ Object

:nodoc:



77
78
79
# File 'lib/wukong-load/loaders/kafka.rb', line 77

def topic_for record
  record[topic_field] || self.topic
end