class Concurrent::Promises::Channel

@!macro warn.edge

Constants

UNLIMITED

Default size of the Channel, makes it accept unlimited number of messages.

Public Class Methods

new(size = UNLIMITED) click to toggle source

A channel to pass messages between promises. The size is limited to support back pressure. @param [Integer, UNLIMITED] size the maximum number of messages stored in the channel.

Calls superclass method
# File lib-edge/concurrent/edge/promises.rb, line 72
def initialize(size = UNLIMITED)
  super()
  @Size        = size
  # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation
  @Mutex       = Mutex.new
  @Probes      = []
  @Messages    = []
  @PendingPush = []
end

Public Instance Methods

<=>(other) click to toggle source
# File lib-edge/concurrent/edge/promises.rb, line 61
def <=>(other)
  1
end
inspect()
Alias for: to_s
pop(probe = Concurrent::Promises.resolvable_future) click to toggle source

Returns a future witch will become fulfilled with a value from the channel when one is available. @param [ResolvableFuture] probe the future which will be fulfilled with a channel value @return [Future] the probe, its value will be the message when available.

# File lib-edge/concurrent/edge/promises.rb, line 111
def pop(probe = Concurrent::Promises.resolvable_future)
  # TODO (pitr-ch 26-Dec-2016): improve performance
  pop_for_select(probe).then(&:last)
end
pop_for_select(probe = Concurrent::Promises.resolvable_future) click to toggle source

@!visibility private

# File lib-edge/concurrent/edge/promises.rb, line 117
def pop_for_select(probe = Concurrent::Promises.resolvable_future)
  @Mutex.synchronize do
    if @Messages.empty?
      @Probes.push probe
    else
      message = @Messages.shift
      probe.fulfill [self, message]

      unless @PendingPush.empty?
        message, pushed = @PendingPush.shift
        @Messages.push message
        pushed.fulfill message
      end
    end
  end
  probe
end
push(message) click to toggle source

Returns future which will fulfill when the message is added to the channel. Its value is the message. @param [Object] message @return [Future]

# File lib-edge/concurrent/edge/promises.rb, line 86
def push(message)
  @Mutex.synchronize do
    while true
      if @Probes.empty?
        if @Size > @Messages.size
          @Messages.push message
          return Promises.fulfilled_future message
        else
          pushed = Promises.resolvable_future
          @PendingPush.push [message, pushed]
          return pushed.with_hidden_resolvable
        end
      else
        probe = @Probes.shift
        if probe.fulfill [self, message], false
          return Promises.fulfilled_future(message)
        end
      end
    end
  end
end
to_s() click to toggle source
# File lib-edge/concurrent/edge/promises.rb, line 65
def to_s
  'unlimited'
end
Also aliased as: inspect