class Fog::AWS::Kinesis::Mock
Public Class Methods
data() { |data| ... }
click to toggle source
# File lib/fog/aws/kinesis.rb, line 130 def self.data @mutex.synchronize do @data ||= Hash.new do |hash, region| hash[region] = Hash.new do |region_hash, key| region_hash[key] = { :kinesis_streams => {} } end end yield @data if block_given? end end
new(options={})
click to toggle source
# File lib/fog/aws/kinesis.rb, line 150 def initialize(options={}) @account_id = Fog::AWS::Mock.owner_id @aws_access_key_id = options[:aws_access_key_id] @region = options[:region] || 'us-east-1' Fog::AWS.validate_region!(@region) end
next_sequence_number()
click to toggle source
# File lib/fog/aws/kinesis.rb, line 170 def self.next_sequence_number @mutex.synchronize do @sequence_number ||= -1 @sequence_number += 1 @sequence_number.to_s end end
next_shard_id()
click to toggle source
# File lib/fog/aws/kinesis.rb, line 180 def self.next_shard_id @mutex.synchronize do @shard_id ||= -1 @shard_id += 1 "shardId-#{@shard_id.to_s.rjust(12, "0")}" end end
reset()
click to toggle source
# File lib/fog/aws/kinesis.rb, line 144 def self.reset @mutex.synchronize do @data = nil end end
Public Instance Methods
create_stream(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/create_stream.rb, line 30 def create_stream(options={}) stream_name = options.delete("StreamName") shard_count = options.delete("ShardCount") || 1 stream_arn = "arn:aws:kinesis:#{@region}:#{@account_id}:stream/#{stream_name}" if data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceInUse.new("Stream #{stream_name} under account #{@account_id} already exists.") end shards = (0...shard_count).map do |shard| { "HashKeyRange"=>{ "EndingHashKey"=>"340282366920938463463374607431768211455", "StartingHashKey"=>"0" }, "SequenceNumberRange"=>{ "StartingSequenceNumber"=> next_sequence_number }, "ShardId"=>next_shard_id, "Records" => [] } end data[:kinesis_streams] = [{ "HasMoreShards" => false, "StreamARN" => stream_arn, "StreamName" => stream_name, "StreamStatus" => "ACTIVE", "Shards" => shards, "Tags" => {} }] response = Excon::Response.new response.status = 200 response.body = "" response end
data()
click to toggle source
# File lib/fog/aws/kinesis.rb, line 158 def data self.class.data do |data| data[@region][@aws_access_key_id] end end
delete_stream(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/delete_stream.rb, line 28 def delete_stream(options={}) stream_name = options.delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end data[:kinesis_streams].delete(stream) response = Excon::Response.new response.status = 200 response.body = "" response end
describe_stream(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/describe_stream.rb, line 36 def describe_stream(options={}) stream_name = options.delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end # Strip Records key out of shards for response shards = stream["Shards"].reject{ |k,_| k == "Records" } response = Excon::Response.new response.status = 200 response.body = { "StreamDescription" => stream.dup.merge("Shards" => shards) } response end
get_records(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/get_records.rb, line 32 def get_records(options={}) shard_iterator = Fog::JSON.decode(options.delete("ShardIterator")) limit = options.delete("Limit") || -1 stream_name = shard_iterator["StreamName"] shard_id = shard_iterator["ShardId"] starting_sequence_number = (shard_iterator["StartingSequenceNumber"] || 1).to_i unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.") end records = [] shard["Records"].each do |record| next if record["SequenceNumber"].to_i < starting_sequence_number records << record break if records.size == limit end shard_iterator["StartingSequenceNumber"] = if records.empty? starting_sequence_number.to_s else (records.last["SequenceNumber"].to_i + 1).to_s end response = Excon::Response.new response.status = 200 response.body = { "MillisBehindLatest"=> 0, "NextShardIterator"=> Fog::JSON.encode(shard_iterator), "Records"=> records } response end
get_shard_iterator(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/get_shard_iterator.rb, line 36 def get_shard_iterator(options={}) stream_name = options["StreamName"] unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end response = Excon::Response.new response.status = 200 response.body = { "ShardIterator" => Fog::JSON.encode(options) # just encode the options that were given, we decode them in get_records } response end
list_streams(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/list_streams.rb, line 28 def list_streams(options={}) response = Excon::Response.new response.status = 200 response.body = { "HasMoreStreams" => false, "StreamNames" => data[:kinesis_streams].map{ |stream| stream["StreamName"] } } response end
merge_shards(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/merge_shards.rb, line 32 def merge_shards(options={}) stream_name = options.delete("StreamName") shard_to_merge_id = options.delete("ShardToMerge") adjacent_shard_to_merge_id = options.delete("AdjacentShardToMerge") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_to_merge_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.") end unless adjacent_shard_to_merge = stream["Shards"].detect{ |shard| shard["ShardId"] == adjacent_shard_to_merge_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{adjacent_shard_to_merge_id} in stream #{stream_name} under account #{@account_id}.") end # Close shards (set an EndingSequenceNumber on them) shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number adjacent_shard_to_merge["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number new_starting_hash_key = [ shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i, adjacent_shard_to_merge["HashKeyRange"]["StartingHashKey"].to_i ].min.to_s new_ending_hash_key = [ shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i, adjacent_shard_to_merge["HashKeyRange"]["EndingHashKey"].to_i ].max.to_s # create a new shard with ParentShardId and AdjacentParentShardID stream["Shards"] << { "HashKeyRange"=> { "EndingHashKey" => new_ending_hash_key, "StartingHashKey" => new_starting_hash_key }, "SequenceNumberRange" => { "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_to_merge_id, "AdjacentParentShardId" => adjacent_shard_to_merge_id } response = Excon::Response.new response.status = 200 response.body = "" response end
next_sequence_number()
click to toggle source
# File lib/fog/aws/kinesis.rb, line 178 def next_sequence_number; self.class.next_sequence_number; end
next_shard_id()
click to toggle source
# File lib/fog/aws/kinesis.rb, line 188 def next_shard_id; self.class.next_shard_id; end
put_record(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/put_record.rb, line 38 def put_record(options={}) stream_name = options.delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end sequence_number = next_sequence_number data = options.delete("Data") partition_key = options.delete("PartitionKey") shard_id = stream["Shards"].sample["ShardId"] shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } # store the records on the shard(s) shard["Records"] << { "SequenceNumber" => sequence_number, "Data" => data, "PartitionKey" => partition_key } response = Excon::Response.new response.status = 200 response.body = { "SequenceNumber" => sequence_number, "ShardId" => shard_id } response end
put_records(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/put_records.rb, line 36 def put_records(options={}) stream_name = options.delete("StreamName") unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end records = options.delete("Records") record_results = records.map { |r| sequence_number = next_sequence_number shard_id = stream["Shards"].sample["ShardId"] shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } # store the records on the shard(s) shard["Records"] << r.merge("SequenceNumber" => sequence_number) { "SequenceNumber" => sequence_number, "ShardId" => shard_id } } response = Excon::Response.new response.status = 200 response.body = { "FailedRecordCount" => 0, "Records" => record_results } response end
reset_data()
click to toggle source
# File lib/fog/aws/kinesis.rb, line 164 def reset_data self.class.data do |data| data[@region].delete(@aws_access_key_id) end end
split_shard(options={})
click to toggle source
# File lib/fog/aws/requests/kinesis/split_shard.rb, line 32 def split_shard(options={}) stream_name = options.delete("StreamName") shard_id = options.delete("ShardToSplit") stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } unless stream = data[:kinesis_streams].detect{ |s| s["StreamName"] == stream_name } raise Fog::AWS::Kinesis::ResourceNotFound.new("Stream #{stream_name} under account #{@account_id} not found.") end unless shard = stream["Shards"].detect{ |shard| shard["ShardId"] == shard_id } raise Fog::AWS::Kinesis::ResourceNotFound.new("Could not find shard #{shard_id} in stream #{stream_name} under account #{@account_id}.") end # Close original shard (set an EndingSequenceNumber on it) shard["SequenceNumberRange"]["EndingSequenceNumber"] = next_sequence_number # Calculate new shard ranges parent_starting_hash_key = shard["HashKeyRange"]["StartingHashKey"] parent_ending_hash_key = shard["HashKeyRange"]["EndingHashKey"] new_starting_hash_key = options.delete("NewStartingHashKey") # Create two new shards using contiguous hash space based on the original shard stream["Shards"] << { "HashKeyRange"=> { "EndingHashKey" => (new_starting_hash_key.to_i - 1).to_s, "StartingHashKey" => parent_starting_hash_key }, "SequenceNumberRange" => { "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_id } stream["Shards"] << { "HashKeyRange" => { "EndingHashKey" => parent_ending_hash_key, "StartingHashKey" => new_starting_hash_key }, "SequenceNumberRange" =>{ "StartingSequenceNumber" => next_sequence_number }, "ShardId" => next_shard_id, "ParentShardId" => shard_id } response = Excon::Response.new response.status = 200 response.body = "" response end