Class: Fog::AWS::Kinesis::Mock
- Inherits:
-
Object
- Object
- Fog::AWS::Kinesis::Mock
- Defined in:
- lib/fog/aws/kinesis.rb,
lib/fog/aws/requests/kinesis/put_record.rb,
lib/fog/aws/requests/kinesis/get_records.rb,
lib/fog/aws/requests/kinesis/put_records.rb,
lib/fog/aws/requests/kinesis/split_shard.rb,
lib/fog/aws/requests/kinesis/list_streams.rb,
lib/fog/aws/requests/kinesis/merge_shards.rb,
lib/fog/aws/requests/kinesis/create_stream.rb,
lib/fog/aws/requests/kinesis/delete_stream.rb,
lib/fog/aws/requests/kinesis/describe_stream.rb,
lib/fog/aws/requests/kinesis/add_tags_to_stream.rb,
lib/fog/aws/requests/kinesis/get_shard_iterator.rb,
lib/fog/aws/requests/kinesis/list_tags_for_stream.rb,
lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb
Class Method Summary collapse
- .data ⇒ Object
- .mutex ⇒ Object
- .next_sequence_number ⇒ Object
- .next_shard_id ⇒ Object
- .reset ⇒ Object
Instance Method Summary collapse
- #add_tags_to_stream(options = {}) ⇒ Object
- #create_stream(options = {}) ⇒ Object
- #data ⇒ Object
- #delete_stream(options = {}) ⇒ Object
- #describe_stream(options = {}) ⇒ Object
- #get_records(options = {}) ⇒ Object
- #get_shard_iterator(options = {}) ⇒ Object
-
#initialize(options = {}) ⇒ Mock
constructor
A new instance of Mock.
- #list_streams(options = {}) ⇒ Object
- #list_tags_for_stream(options = {}) ⇒ Object
- #merge_shards(options = {}) ⇒ Object
- #mutex ⇒ Object
- #next_sequence_number ⇒ Object
- #next_shard_id ⇒ Object
- #put_record(options = {}) ⇒ Object
- #put_records(options = {}) ⇒ Object
- #remove_tags_from_stream(options = {}) ⇒ Object
- #reset_data ⇒ Object
- #split_shard(options = {}) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Mock
Returns a new instance of Mock.
147 148 149 150 151 152 153 154 155 |
# File 'lib/fog/aws/kinesis.rb', line 147 def initialize(={}) @account_id = Fog::AWS::Mock.owner_id @aws_access_key_id = [:aws_access_key_id] @region = [:region] || 'us-east-1' unless ['ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', 'eu-central-1', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', 'us-west-2'].include?(@region) raise ArgumentError, "Unknown region: #{@region.inspect}" end end |
Class Method Details
.data ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/fog/aws/kinesis.rb', line 133 def self.data @data ||= Hash.new do |hash, region| hash[region] = Hash.new do |region_hash, key| region_hash[key] = { :kinesis_streams => {} } end end end |
.mutex ⇒ Object
128 129 130 |
# File 'lib/fog/aws/kinesis.rb', line 128 def self.mutex @mutex ||= Mutex.new end |
.next_sequence_number ⇒ Object
165 166 167 168 169 170 171 |
# File 'lib/fog/aws/kinesis.rb', line 165 def self.next_sequence_number mutex.synchronize do @sequence_number ||= -1 @sequence_number += 1 @sequence_number.to_s end end |
.next_shard_id ⇒ Object
174 175 176 177 178 179 180 |
# File 'lib/fog/aws/kinesis.rb', line 174 def self.next_shard_id mutex.synchronize do @shard_id ||= -1 @shard_id += 1 "shardId-#{@shard_id.to_s.rjust(12, "0")}" end end |
.reset ⇒ Object
143 144 145 |
# File 'lib/fog/aws/kinesis.rb', line 143 def self.reset @data = nil end |
Instance Method Details
#add_tags_to_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fog/aws/requests/kinesis/add_tags_to_stream.rb', line 30 def (={}) stream_name = .delete("StreamName") = .delete("Tags") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end stream["Tags"] = stream["Tags"].merge() response = Excon::Response.new response.status = 200 response.body = "" response end |
#create_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fog/aws/requests/kinesis/create_stream.rb', line 30 def create_stream(={}) stream_name = .delete("StreamName") shard_count = .delete("ShardCount") || 1 stream_arn = "arn:aws:kinesis:#{@region}:#{@account_id}:stream/#{stream_name}" if data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceInUse.new("Stream #{stream_name} under account #{@account_id} already exists.") end shards = (0...shard_count).map do |shard| { "HashKeyRange"=>{ "EndingHashKey"=>"340282366920938463463374607431768211455", "StartingHashKey"=>"0" }, "SequenceNumberRange"=>{ "StartingSequenceNumber"=> next_sequence_number }, "ShardId"=>next_shard_id, "Records" => [] } end data[:kinesis_streams] = [{ "HasMoreShards" => false, "StreamARN" => stream_arn, "StreamName" => stream_name, "StreamStatus" => "ACTIVE", "Shards" => shards, "Tags" => {} }] response = Excon::Response.new response.status = 200 response.body = "" response end |
#data ⇒ Object
157 158 159 |
# File 'lib/fog/aws/kinesis.rb', line 157 def data self.class.data[@region][@aws_access_key_id] end |
#delete_stream(options = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/fog/aws/requests/kinesis/delete_stream.rb', line 28 def delete_stream(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end data[:kinesis_streams].delete(stream) response = Excon::Response.new response.status = 200 response.body = "" response end |
#describe_stream(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fog/aws/requests/kinesis/describe_stream.rb', line 36 def describe_stream(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end # Strip Records key out of shards for response shards = stream["Shards"].reject{ |k,_| k == "Records" } response = Excon::Response.new response.status = 200 response.body = { "StreamDescription" => stream.dup.merge("Shards" => shards) } response end |
#get_records(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fog/aws/requests/kinesis/get_records.rb', line 32 def get_records(={}) shard_iterator = Fog::JSON.decode(.delete("ShardIterator")) limit = .delete("Limit") || -1 stream_name = shard_iterator["StreamName"] shard_id = shard_iterator["ShardId"] starting_sequence_number = (shard_iterator["StartingSequenceNumber"] || 1).to_i unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.") end records = [] shard["Records"].each do |record| next if record["SequenceNumber"].to_i < starting_sequence_number records << record break if records.size == limit end shard_iterator["StartingSequenceNumber"] = if records.empty? starting_sequence_number.to_s else (records.last["SequenceNumber"].to_i + 1).to_s end response = Excon::Response.new response.status = 200 response.body = { "MillisBehindLatest"=> 0, "NextShardIterator"=> Fog::JSON.encode(shard_iterator), "Records"=> records } response end |
#get_shard_iterator(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fog/aws/requests/kinesis/get_shard_iterator.rb', line 36 def get_shard_iterator(={}) stream_name = ["StreamName"] unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end response = Excon::Response.new response.status = 200 response.body = { "ShardIterator" => Fog::JSON.encode() # just encode the options that were given, we decode them in get_records } response end |
#list_streams(options = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/fog/aws/requests/kinesis/list_streams.rb', line 28 def list_streams(={}) response = Excon::Response.new response.status = 200 response.body = { "HasMoreStreams" => false, "StreamNames" => data[:kinesis_streams].map{ |stream| stream["StreamName"] } } response end |
#list_tags_for_stream(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fog/aws/requests/kinesis/list_tags_for_stream.rb', line 36 def (={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end response = Excon::Response.new response.status = 200 response.body = { "HasMoreTags" => false, "Tags" => stream["Tags"].map{ |k,v| {"Key" => k, "Value" => v} } } response end |
#merge_shards(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fog/aws/requests/kinesis/merge_shards.rb', line 32 def merge_shards(={}) stream_name = .delete("StreamName") shard_to_merge_id = .delete("ShardToMerge") adjacent_shard_to_merge_id = .delete("AdjacentShardToMerge") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_to_merge_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.") end unless adjacent_shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == adjacent_shard_to_merge_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{adjacent_shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.") end # Close shards (set an EndingSequenceNumber on them) shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number adjacent_shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number new_starting_hash_key = [ shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i, adjacent_shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i ].min.to_s new_ending_hash_key = [ shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i, adjacent_shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i ].max.to_s # create a new shard with ParentShardId and AdjacentParentShardID stream["Shards"] << { "HashKeyRange"=> { "EndingHashKey" => new_ending_hash_key, "StartingHashKey" => new_starting_hash_key }, "SequenceNumberRange" => { "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_to_merge_id, "AdjacentParentShardId" => adjacent_shard_to_merge_id } response = Excon::Response.new response.status = 200 response.body = "" response end |
#mutex ⇒ Object
131 |
# File 'lib/fog/aws/kinesis.rb', line 131 def mutex; self.class.mutex; end |
#next_sequence_number ⇒ Object
172 |
# File 'lib/fog/aws/kinesis.rb', line 172 def next_sequence_number; self.class.next_sequence_number; end |
#next_shard_id ⇒ Object
181 |
# File 'lib/fog/aws/kinesis.rb', line 181 def next_shard_id; self.class.next_shard_id; end |
#put_record(options = {}) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fog/aws/requests/kinesis/put_record.rb', line 38 def put_record(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end sequence_number = next_sequence_number data = .delete("Data") partition_key = .delete("PartitionKey") sample_method = RUBY_VERSION == "1.8.7" ? :choice : :sample shard_id = stream["Shards"].send(sample_method)["ShardId"] shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } # store the records on the shard(s) shard["Records"] << { "SequenceNumber" => sequence_number, "Data" => data, "PartitionKey" => partition_key } response = Excon::Response.new response.status = 200 response.body = { "SequenceNumber" => sequence_number, "ShardId" => shard_id } response end |
#put_records(options = {}) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fog/aws/requests/kinesis/put_records.rb', line 36 def put_records(={}) stream_name = .delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end records = .delete("Records") record_results = records.map { |r| sequence_number = next_sequence_number sample_method = RUBY_VERSION == "1.8.7" ? :choice : :sample shard_id = stream["Shards"].send(sample_method)["ShardId"] shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } # store the records on the shard(s) shard["Records"] << r.merge("SequenceNumber" => sequence_number) { "SequenceNumber" => sequence_number, "ShardId" => shard_id } } response = Excon::Response.new response.status = 200 response.body = { "FailedRecordCount" => 0, "Records" => record_results } response end |
#remove_tags_from_stream(options = {}) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fog/aws/requests/kinesis/remove_tags_from_stream.rb', line 30 def (={}) stream_name = .delete("StreamName") = .delete("TagKeys") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end stream["Tags"] = stream["Tags"].delete_if { |k,_| .include?(k) } response = Excon::Response.new response.status = 200 response.body = "" response end |
#reset_data ⇒ Object
161 162 163 |
# File 'lib/fog/aws/kinesis.rb', line 161 def reset_data self.class.data[@region].delete(@aws_access_key_id) end |
#split_shard(options = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fog/aws/requests/kinesis/split_shard.rb', line 32 def split_shard(={}) stream_name = .delete("StreamName") shard_id = .delete("ShardToSplit") stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.") end # Close original shard (set an EndingSequenceNumber on it) shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number # Calculate new shard ranges parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"] parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"] new_starting_hash_key = .delete("NewStartingHashKey") # Create two new shards using contiguous hash space based on the original shard stream["Shards"] << { "HashKeyRange"=> { "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s, "StartingHashKey" => parent_starting_hash_key }, "SequenceNumberRange" => { "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_id } stream["Shards"] << { "HashKeyRange" => { "EndingHashKey" => parent_ending_hash_key, "StartingHashKey" => new_starting_hash_key }, "SequenceNumberRange" =>{ "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_id } response = Excon::Response.new response.status = 200 response.body = "" response end |