Class: GnipApi::PowerTrack::Stream
- Inherits:
-
Object
- Object
- GnipApi::PowerTrack::Stream
- Defined in:
- lib/gnip_api/power_track/stream.rb
Overview
Handles a stream connection to PowerTrack to receive the data.
There are 3 ways to connect and consume the connection provided:
-
:common
-
:io
-
:pty
Each method uses a different backend. This is a result of experimentation to mitigate disconnect issues. Each method handles differently the keep-alive signals and works a bit differently at the low level. The recommended method is :common, and will in the future become the default once it’s polished enough.
In addition to the methods above, a third strategy using the :common method is also offered to detach any processing you do on your end using threads.
Instance Method Summary collapse
-
#build_message(params) ⇒ Object
Builds a Gnip::Message object from the item params received.
-
#consume(stream_method = :common) ⇒ Object
The following methods are different ways of consuming the stream There are 3 different methods that return data slighly different.
-
#consume_json(stream_method = :common) ⇒ Object
Similar to #consume but parses the JSON to Hash with no further processing.
-
#consume_raw(stream_method = :common) ⇒ Object
Similar to #consume with the difference this one spits out raw JSON and has no parsing on the data received.
-
#initialize ⇒ Stream
constructor
A new instance of Stream.
-
#logger ⇒ Object
Returns the configured logger.
-
#parse_json(json) ⇒ Object
Returns a Hash from a parsed JSON string.
-
#process_entries(entries) ⇒ Object
Processes the items received after splitting them up, returning appropiate Gnip objects.
-
#read_io_stream ⇒ Object
Opens the connection to the PowerTrack stream and returns any data received using CURL IO transfer method.
-
#read_pty_stream ⇒ Object
Opens the connection to the PowerTrack stream and returns any data received using CURL PTY transfer method.
-
#read_stream ⇒ Object
Opens the connection to the PowerTrack stream and returns any data received using HTTParty and standard net/http.
-
#thread_consume ⇒ Object
Consumes the stream using a streamer thread instead of a simple block.
Constructor Details
#initialize ⇒ Stream
19 20 21 22 23 24 25 26 |
# File 'lib/gnip_api/power_track/stream.rb', line 19 def initialize @user = GnipApi.configuration.user @password = GnipApi.configuration.password @account = GnipApi.configuration.account @adapter = GnipApi::Adapter.new @buffer = GnipApi::PowerTrack::Buffer.new @running = false end |
Instance Method Details
#build_message(params) ⇒ Object
Builds a Gnip::Message object from the item params received.
204 205 206 |
# File 'lib/gnip_api/power_track/stream.rb', line 204 def params Gnip::Message.build(params) end |
#consume(stream_method = :common) ⇒ Object
The following methods are different ways of consuming the stream There are 3 different methods that return data slighly different. :common method uses a simple HTTParty request reading chunks and decoding the GZip. This method has a flaw that it waits for certain data to be buffered by Zlib in order to return a decoded chunk. :common will return chunks that may contain more than 1 objects.
:io method uses curl under the hood, in combination with IO.popen to captrue stdout. For this method a single line is returned, which would be an object sent to stream. Curl handles the GZip decoding better, however the read method for the IO buffers up the keep alive signals due to not flushing STDOUT.
:pty method is an alternative for :io in where the stdout output is captured as it comes using PTY features. It almost works the same as :io, but the keep alive signals are now captured properly.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/gnip_api/power_track/stream.rb', line 85 def consume stream_method=:common raise ArgumentError, "Block required, non given" unless block_given? if stream_method == :common read_stream do |data| yield(process_entries(data)) end elsif stream_method == :io read_io_stream do |data| yield(process_entries([data])) end elsif stream_method == :pty read_pty_stream do |data| yield(process_entries([data])) end else raise ArgumentError, "Undefined stream method #{stream_method}" end end |
#consume_json(stream_method = :common) ⇒ Object
Similar to #consume but parses the JSON to Hash with no further processing. stream_method param accepts the same options as #consume.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/gnip_api/power_track/stream.rb', line 129 def consume_json stream_method=:common raise ArgumentError, "Block required, non given" unless block_given? if stream_method == :common read_stream do |data| yield(data.map{|item| parse_json(item)}) end elsif stream_method == :io read_io_stream do |data| yield(parse_json(data)) end elsif stream_method == :pty read_pty_stream do |data| yield(parse_json(data)) end else raise ArgumentError, "Undefined stream method #{stream_method}" end end |
#consume_raw(stream_method = :common) ⇒ Object
Similar to #consume with the difference this one spits out raw JSON and has no parsing on the data received. Use it for a faster consumtion. stream_method param accepts the same options as #consume.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/gnip_api/power_track/stream.rb', line 107 def consume_raw stream_method=:common raise ArgumentError, "Block required, non given" unless block_given? if stream_method == :common read_stream do |data| yield(data) end elsif stream_method == :io read_io_stream do |data| yield(data) end elsif stream_method == :pty read_pty_stream do |data| yield(data) end else raise ArgumentError, "Undefined stream method #{stream_method}" end end |
#logger ⇒ Object
Returns the configured logger.
29 30 31 |
# File 'lib/gnip_api/power_track/stream.rb', line 29 def logger GnipApi.logger end |
#parse_json(json) ⇒ Object
Returns a Hash from a parsed JSON string.
209 210 211 212 213 214 215 |
# File 'lib/gnip_api/power_track/stream.rb', line 209 def parse_json json begin GnipApi::JsonParser.new.parse json rescue GnipApi::Errors::JsonParser::ParseError nil end end |
#process_entries(entries) ⇒ Object
Processes the items received after splitting them up, returning appropiate Gnip objects.
195 196 197 198 199 200 201 |
# File 'lib/gnip_api/power_track/stream.rb', line 195 def process_entries entries logger.debug "PowerTrack Stream: #{entries.size} items received" data = entries.map{|e| parse_json(e)}.compact data.map!{|e| (e)} data.select(&:system_message?).each(&:log!) return data end |
#read_io_stream ⇒ Object
Opens the connection to the PowerTrack stream and returns any data received using CURL IO transfer method.
150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/gnip_api/power_track/stream.rb', line 150 def read_io_stream request = create_request logger.info "Opening PowerTrack parsed stream" begin @adapter.io_curl_stream(request) do |data| yield data end ensure logger.warn "Closing stream" end end |
#read_pty_stream ⇒ Object
Opens the connection to the PowerTrack stream and returns any data received using CURL PTY transfer method.
164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/gnip_api/power_track/stream.rb', line 164 def read_pty_stream request = create_request logger.info "Opening PowerTrack parsed stream" begin @adapter.pty_curl_stream(request) do |data| yield data end ensure logger.warn "Closing stream" end end |
#read_stream ⇒ Object
Opens the connection to the PowerTrack stream and returns any data received using HTTParty and standard net/http. The buffer is used in this case to collect the chunks and later split them into items.
179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/gnip_api/power_track/stream.rb', line 179 def read_stream request = create_request logger.info "Opening PowerTrack parsed stream" begin @adapter.stream_get request do |chunk| @buffer.insert! chunk yield @buffer.read! if block_given? end ensure logger.warn "Closing stream" @running = false end end |
#thread_consume ⇒ Object
Consumes the stream using a streamer thread instead of a simple block. This way the streamer can fill in the buffer and the block consumes it periodically.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/gnip_api/power_track/stream.rb', line 35 def thread_consume @pool = [] streamer = Thread.new do logger.info "Starting streamer Thread" begin read_stream do |items| items.each{|i| @pool << i} end ensure logger.warn "Streamer exited" end end begin loop do logger.warn "Streamer is down" unless streamer.alive? raise GnipApi::Errors::PowerTrack::StreamDown unless streamer.alive? entries = [] while @pool.any? entries << @pool.shift end if entries.any? processed = process_entries(entries) yield(processed) else sleep(0.1) next end end ensure streamer.kill if streamer.alive? end end |