class Fog::AWS::Kinesis::Real
Public Class Methods
# File lib/fog/aws/kinesis.rb, line 36 def initialize(options={}) @use_iam_profile = options[:use_iam_profile] @connection_options = options[:connection_options] || {} @instrumentor = options[:instrumentor] @instrumentor_name = options[:instrumentor_name] || 'fog.aws.kinesis' options[:region] ||= 'us-east-1' @region = options[:region] @host = options[:host] || "kinesis.#{options[:region]}.amazonaws.com" @path = options[:path] || '/' @persistent = options[:persistent] || true @port = options[:port] || 443 @scheme = options[:scheme] || 'https' @connection = Fog::XML::Connection.new("#{@scheme}://#{@host}:#{@port}#{@path}", @persistent, @connection_options) @version = "20131202" setup_credentials(options) end
Public Instance Methods
Creates a Amazon Kinesis
stream.
Options¶ ↑
-
ShardCount<~Number>: The number of shards that the stream will use.
-
StreamName<~String>: A name to identify the stream.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html
# File lib/fog/aws/requests/kinesis/create_stream.rb, line 16 def create_stream(options={}) body = { "ShardCount" => options.delete("ShardCount") || 1, "StreamName" => options.delete("StreamName") }.reject{ |_,v| v.nil? } request({ 'X-Amz-Target' => "Kinesis_#{@version}.CreateStream", :body => body, }.merge(options)) end
Deletes a stream and all its shards and data.
Options¶ ↑
-
StreamName<~String>: A name to identify the stream.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html
# File lib/fog/aws/requests/kinesis/delete_stream.rb, line 15 def delete_stream(options={}) body = { "StreamName" => options.delete("StreamName") }.reject{ |_,v| v.nil? } request({ 'X-Amz-Target' => "Kinesis_#{@version}.DeleteStream", :body => body, }.merge(options)) end
Describes the specified stream.
Options¶ ↑
-
ExclusiveStartShardId<~String>: The shard ID of the shard to start with.
-
Limit<~Number>: The maximum number of shards to return.
-
StreamName<~String>: The name of the stream to describe.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html
# File lib/fog/aws/requests/kinesis/describe_stream.rb, line 17 def describe_stream(options={}) body = { "ExclusiveStartShardId" => options.delete("ExclusiveStartShardId"), "Limit" => options.delete("Limit"), "StreamName" => options.delete("StreamName") }.reject{ |_,v| v.nil? } response = request({ :idempotent => true, 'X-Amz-Target' => "Kinesis_#{@version}.DescribeStream", :body => body, }.merge(options)) response.body = Fog::JSON.decode(response.body) unless response.body.nil? response.body response end
Gets data records from a shard.
Options¶ ↑
-
Limit<~Number>: The maximum number of records to return.
-
ShardIterator<~String>: The position in the shard from which you want to start sequentially reading data records.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
# File lib/fog/aws/requests/kinesis/get_records.rb, line 16 def get_records(options={}) body = { "Limit" => options.delete("Limit"), "ShardIterator" => options.delete("ShardIterator") }.reject{ |_,v| v.nil? } response = request({ 'X-Amz-Target' => "Kinesis_#{@version}.GetRecords", :body => body, }.merge(options)) response.body = Fog::JSON.decode(response.body) unless response.body.nil? response end
Gets a shard iterator.
Options¶ ↑
-
ShardId<~String>: The shard ID of the shard to get the iterator for.
-
ShardIteratorType<~String>: Determines how the shard iterator is used to start reading data records from the shard.
-
StartingSequenceNumber<~String>: The sequence number of the data record in the shard from which to start reading from.
-
StreamName<~String>: A name to identify the stream.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
# File lib/fog/aws/requests/kinesis/get_shard_iterator.rb, line 18 def get_shard_iterator(options={}) body = { "ShardId" => options.delete("ShardId"), "ShardIteratorType" => options.delete("ShardIteratorType"), "StartingSequenceNumber" => options.delete("StartingSequenceNumber"), "StreamName" => options.delete("StreamName") }.reject{ |_,v| v.nil? } response = request({ 'X-Amz-Target' => "Kinesis_#{@version}.GetShardIterator", :body => body, }.merge(options)) response.body = Fog::JSON.decode(response.body) unless response.body.nil? response end
List availabe streams
Options¶ ↑
-
ExclusiveStartStreamName<~String>: The name of the stream to start the list with.
-
Limit<~Number>: The maximum number of streams to list.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html
# File lib/fog/aws/requests/kinesis/list_streams.rb, line 16 def list_streams(options={}) response = request({ :idempotent => true, 'X-Amz-Target' => "Kinesis_#{@version}.ListStreams", :body => {}, }.merge(options)) response.body = Fog::JSON.decode(response.body) unless response.body.nil? response end
Merges two adjacent shards in a stream and combines them into a single shard to reduce the stream's capacity to ingest and transport data.
Options¶ ↑
-
AdjacentShardToMerge<~String>: The shard ID of the adjacent shard for the merge.
-
ShardToMerge<~String>: The shard ID of the shard to combine with the adjacent shard for the merge.
-
StreamName<~String>: The name of the stream for the merge.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html
# File lib/fog/aws/requests/kinesis/merge_shards.rb, line 17 def merge_shards(options={}) body = { "AdjacentShardToMerge" => options.delete("AdjacentShardToMerge"), "ShardToMerge" => options.delete("ShardToMerge"), "StreamName" => options.delete("StreamName") }.reject{ |_,v| v.nil? } request({ 'X-Amz-Target' => "Kinesis_#{@version}.MergeShards", :body => body, }.merge(options)) end
Writes a single data record from a producer into an Amazon Kinesis
stream.
Options¶ ↑
-
Data<~Blob>: The data blob to put into the record, which is base64-encoded when the blob is serialized.
-
ExplicitHashKey<~String>: The hash value used to determine explicitly the shard that the data record is assigned to by overriding the partition key hash.
-
PartitionKey<~String>: Determines which shard in the stream the data record is assigned to.
-
SequenceNumberForOrdering<~String>: Guarantees strictly increasing sequence numbers, for puts from the same client and to the same partition key.
-
StreamName<~String>: The stream name associated with the request.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
# File lib/fog/aws/requests/kinesis/put_record.rb, line 19 def put_record(options={}) body = { "Data" => options.delete("Data"), "ExplicitHashKey" => options.delete("ExplicitHashKey"), "PartitionKey" => options.delete("PartitionKey"), "SequenceNumberForOrdering" => options.delete("SequenceNumberForOrdering"), "StreamName" => options.delete("StreamName") }.reject{ |_,v| v.nil? } response = request({ 'X-Amz-Target' => "Kinesis_#{@version}.PutRecord", :body => body, }.merge(options)) response.body = Fog::JSON.decode(response.body) unless response.body.nil? response end
Writes multiple data records from a producer into an Amazon Kinesis
stream in a single call (also referred to as a PutRecords request).
Options¶ ↑
-
Records<~Array>: The records associated with the request.
-
Record<~Hash>: A record.
-
Data<~Blob>: The data blob to put into the record, which is base64-encoded when the blob is serialized.
-
ExplicitHashKey<~String>: The hash value used to determine explicitly the shard that the data record is assigned to by overriding the partition key hash.
-
PartitionKey<~String>: Determines which shard in the stream the data record is assigned to.
-
-
-
StreamName<~String>: The stream name associated with the request.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
# File lib/fog/aws/requests/kinesis/put_records.rb, line 20 def put_records(options={}) body = { "Records" => options.delete("Records"), "StreamName" => options.delete("StreamName") }.reject{ |_,v| v.nil? } response = request({ 'X-Amz-Target' => "Kinesis_#{@version}.PutRecords", :body => body, }.merge(options)) response.body = Fog::JSON.decode(response.body) unless response.body.nil? response end
Splits a shard into two new shards in the stream, to increase the stream's capacity to ingest and transport data.
Options¶ ↑
-
NewStartingHashKey<~String>: A hash key value for the starting hash key of one of the child shards created by the split.
-
ShardToSplit<~String>: The shard ID of the shard to split.
-
StreamName<~String>: The name of the stream for the shard split.
Returns¶ ↑
-
response<~Excon::Response>:
See Also¶ ↑
docs.aws.amazon.com/kinesis/latest/APIReference/API_SplitShard.html
# File lib/fog/aws/requests/kinesis/split_shard.rb, line 17 def split_shard(options={}) body = { "NewStartingHashKey" => options.delete("NewStartingHashKey"), "ShardToSplit" => options.delete("ShardToSplit"), "StreamName" => options.delete("StreamName") }.reject{ |_,v| v.nil? } request({ 'X-Amz-Target' => "Kinesis_#{@version}.SplitShard", :body => body, }.merge(options)) end
Private Instance Methods
# File lib/fog/aws/kinesis.rb, line 93 def _request(body, headers, idempotent, parser) @connection.request({ :body => body, :expects => 200, :headers => headers, :idempotent => idempotent, :method => 'POST', :parser => parser }) rescue Excon::Errors::HTTPStatusError => error match = Fog::AWS::Errors.match_error(error) raise if match.empty? raise case match[:code] when 'ExpiredIteratorException' Fog::AWS::Kinesis::ExpiredIterator.slurp(error, match[:message]) when 'LimitExceededException' Fog::AWS::Kinesis::LimitExceeded.slurp(error, match[:message]) when 'ResourceInUseException' Fog::AWS::Kinesis::ResourceInUse.slurp(error, match[:message]) when 'ResourceNotFoundException' Fog::AWS::Kinesis::ResourceNotFound.slurp(error, match[:message]) when 'ExpiredIteratorException' Fog::AWS::Kinesis::ExpiredIterator.slurp(error, match[:message]) when 'InvalidArgumentException' Fog::AWS::Kinesis::InvalidArgument.slurp(error, match[:message]) when 'ProvisionedThroughputExceededException' Fog::AWS::Kinesis::ProvisionedThroughputExceeded.slurp(error, match[:message]) else Fog::AWS::Kinesis::Error.slurp(error, "#{match[:code]} => #{match[:message]}") end end
# File lib/fog/aws/kinesis.rb, line 68 def request(params) refresh_credentials_if_expired idempotent = params.delete(:idempotent) parser = params.delete(:parser) date = Fog::Time.now headers = { 'X-Amz-Target' => params['X-Amz-Target'], 'Content-Type' => 'application/x-amz-json-1.1', 'Host' => @host, 'x-amz-date' => date.to_iso8601_basic } headers['x-amz-security-token'] = @aws_session_token if @aws_session_token body = MultiJson.dump(params[:body]) headers['Authorization'] = @signer.sign({:method => "POST", :headers => headers, :body => body, :query => {}, :path => @path}, date) if @instrumentor @instrumentor.instrument("#{@instrumentor_name}.request", params) do _request(body, headers, idempotent, parser) end else _request(body, headers, idempotent, parser) end end
# File lib/fog/aws/kinesis.rb, line 59 def setup_credentials(options) @aws_access_key_id = options[:aws_access_key_id] @aws_secret_access_key = options[:aws_secret_access_key] @aws_session_token = options[:aws_session_token] @aws_credentials_expire_at = options[:aws_credentials_expire_at] @signer = Fog::AWS::SignatureV4.new( @aws_access_key_id, @aws_secret_access_key, @region, 'kinesis') end