Class: GnipApi::PowerTrack::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/gnip_api/power_track/stream.rb

Overview

Handles a stream connection to PowerTrack to receive the data.

There are 3 ways to connect and consume the connection provided:

  • :common

  • :io

  • :pty

Each method uses a different backend. This is a result of experimentation to mitigate disconnect issues. Each method handles differently the keep-alive signals and works a bit differently at the low level. The recommended method is :common, and will in the future become the default once it’s polished enough.

In addition to the methods above, a third strategy using the :common method is also offered to detach any processing you do on your end using threads.

Instance Method Summary collapse

Constructor Details

#initializeStream



19
20
21
22
23
24
25
26
# File 'lib/gnip_api/power_track/stream.rb', line 19

def initialize
  @user = GnipApi.configuration.user
  @password = GnipApi.configuration.password
  @account = GnipApi.configuration.
  @adapter = GnipApi::Adapter.new
  @buffer = GnipApi::PowerTrack::Buffer.new
  @running = false
end

Instance Method Details

#build_message(params) ⇒ Object

Builds a Gnip::Message object from the item params received.



204
205
206
# File 'lib/gnip_api/power_track/stream.rb', line 204

def build_message params
  Gnip::Message.build(params)
end

#consume(stream_method = :common) ⇒ Object

The following methods are different ways of consuming the stream There are 3 different methods that return data slighly different. :common method uses a simple HTTParty request reading chunks and decoding the GZip. This method has a flaw that it waits for certain data to be buffered by Zlib in order to return a decoded chunk. :common will return chunks that may contain more than 1 objects.

:io method uses curl under the hood, in combination with IO.popen to captrue stdout. For this method a single line is returned, which would be an object sent to stream. Curl handles the GZip decoding better, however the read method for the IO buffers up the keep alive signals due to not flushing STDOUT.

:pty method is an alternative for :io in where the stdout output is captured as it comes using PTY features. It almost works the same as :io, but the keep alive signals are now captured properly.

Raises:

  • (ArgumentError)


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/gnip_api/power_track/stream.rb', line 85

def consume stream_method=:common
  raise ArgumentError, "Block required, non given" unless block_given?
  if stream_method == :common
    read_stream do |data|
      yield(process_entries(data))
    end
  elsif stream_method == :io
    read_io_stream do |data|
      yield(process_entries([data]))
    end
  elsif stream_method == :pty
    read_pty_stream do |data|
      yield(process_entries([data]))
    end
  else 
    raise ArgumentError, "Undefined stream method #{stream_method}"
  end
end

#consume_json(stream_method = :common) ⇒ Object

Similar to #consume but parses the JSON to Hash with no further processing. stream_method param accepts the same options as #consume.

Raises:

  • (ArgumentError)


129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/gnip_api/power_track/stream.rb', line 129

def consume_json stream_method=:common
  raise ArgumentError, "Block required, non given" unless block_given?
  if stream_method == :common
    read_stream do |data|
      yield(data.map{|item| parse_json(item)})
    end
  elsif stream_method == :io
    read_io_stream do |data|
      yield(parse_json(data))
    end
  elsif stream_method == :pty
    read_pty_stream do |data|
      yield(parse_json(data))
    end
  else
    raise ArgumentError, "Undefined stream method #{stream_method}"
  end
end

#consume_raw(stream_method = :common) ⇒ Object

Similar to #consume with the difference this one spits out raw JSON and has no parsing on the data received. Use it for a faster consumtion. stream_method param accepts the same options as #consume.

Raises:

  • (ArgumentError)


107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/gnip_api/power_track/stream.rb', line 107

def consume_raw stream_method=:common
  raise ArgumentError, "Block required, non given" unless block_given?
  if stream_method == :common
    read_stream do |data|
      yield(data)
    end
  elsif stream_method == :io
    read_io_stream do |data|
      yield(data)
    end
  elsif stream_method == :pty
    read_pty_stream do |data|
      yield(data)
    end
  else 
    raise ArgumentError, "Undefined stream method #{stream_method}"
  end
end

#loggerObject

Returns the configured logger.



29
30
31
# File 'lib/gnip_api/power_track/stream.rb', line 29

def logger
  GnipApi.logger
end

#parse_json(json) ⇒ Object

Returns a Hash from a parsed JSON string.



209
210
211
212
213
214
215
# File 'lib/gnip_api/power_track/stream.rb', line 209

def parse_json json
  begin 
    GnipApi::JsonParser.new.parse json
  rescue GnipApi::Errors::JsonParser::ParseError
    nil
  end
end

#process_entries(entries) ⇒ Object

Processes the items received after splitting them up, returning appropiate Gnip objects.



195
196
197
198
199
200
201
# File 'lib/gnip_api/power_track/stream.rb', line 195

def process_entries entries
  logger.debug "PowerTrack Stream: #{entries.size} items received"
  data = entries.map{|e| parse_json(e)}.compact
  data.map!{|e| build_message(e)} 
  data.select(&:system_message?).each(&:log!)
  return data
end

#read_io_streamObject

Opens the connection to the PowerTrack stream and returns any data received using CURL IO transfer method.



150
151
152
153
154
155
156
157
158
159
160
# File 'lib/gnip_api/power_track/stream.rb', line 150

def read_io_stream
  request = create_request
  logger.info "Opening PowerTrack parsed stream"
  begin
    @adapter.io_curl_stream(request) do |data|
      yield data
    end
  ensure
    logger.warn "Closing stream"
  end
end

#read_pty_streamObject

Opens the connection to the PowerTrack stream and returns any data received using CURL PTY transfer method.



164
165
166
167
168
169
170
171
172
173
174
# File 'lib/gnip_api/power_track/stream.rb', line 164

def read_pty_stream
  request = create_request
  logger.info "Opening PowerTrack parsed stream"
  begin
    @adapter.pty_curl_stream(request) do |data|
      yield data
    end
  ensure
    logger.warn "Closing stream"
  end
end

#read_streamObject

Opens the connection to the PowerTrack stream and returns any data received using HTTParty and standard net/http. The buffer is used in this case to collect the chunks and later split them into items.



179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/gnip_api/power_track/stream.rb', line 179

def read_stream
  request = create_request
  logger.info "Opening PowerTrack parsed stream"
  begin
    @adapter.stream_get request do |chunk|
      @buffer.insert! chunk
      yield @buffer.read! if block_given?
    end
  ensure
    logger.warn "Closing stream"
    @running = false
  end
end

#thread_consumeObject

Consumes the stream using a streamer thread instead of a simple block. This way the streamer can fill in the buffer and the block consumes it periodically.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/gnip_api/power_track/stream.rb', line 35

def thread_consume
  @pool = []
  streamer = Thread.new do
    logger.info "Starting streamer Thread"
    begin
      read_stream do |items|
        items.each{|i| @pool << i}
      end
    ensure
      logger.warn "Streamer exited"
    end
  end

  begin
    loop do
      logger.warn "Streamer is down" unless streamer.alive?
      raise GnipApi::Errors::PowerTrack::StreamDown unless streamer.alive?
      entries = []
      while @pool.any?
        entries << @pool.shift
      end
      if entries.any?
        processed = process_entries(entries)
        yield(processed)
      else
        sleep(0.1)
        next
      end
    end
  ensure
    streamer.kill if streamer.alive?
  end
end