Class: BirdGrinder::Tweeter::StreamProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/bird_grinder/tweeter/stream_processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(parent, stream_name, stream_meta = {}) ⇒ StreamProcessor

Returns a new instance of StreamProcessor.



6
7
8
9
10
11
# File 'lib/bird_grinder/tweeter/stream_processor.rb', line 6

def initialize(parent, stream_name, stream_meta = {})
  @parent = parent
  @stream_name = stream_name.to_sym
  @stream_meta = stream_meta.to_nash
  setup_parser
end

Instance Method Details

#process_stream_item(json) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/bird_grinder/tweeter/stream_processor.rb', line 19

def process_stream_item(json)
  return if !json.is_a?(Hash)
  processed = json.to_nash.normalized
  stream_type = lookup_type_for_steam_response(processed)
  case stream_type
  when :delete
    processed = processed[:delete].status
  when :limit
    processed = processed.limit
  end
  processed.stream_type = stream_type
  processed.streaming_source = @stream_name
  processed.meta = @stream_meta
  @parent.delegate.receive_message(:incoming_stream, processed)
end

#receive_chunk(chunk) ⇒ Object



13
14
15
16
17
# File 'lib/bird_grinder/tweeter/stream_processor.rb', line 13

def receive_chunk(chunk)
  @parser << chunk
rescue Yajl::ParseError => e
  logger.error "Couldn't parse json: #{e.message}"
end