class Concurrent::Promises::Channel
@!macro warn.edge
Constants
- UNLIMITED
Default size of the Channel, makes it accept unlimited number of messages.
Public Class Methods
new(size = UNLIMITED)
click to toggle source
A channel to pass messages between promises. The size is limited to support back pressure. @param [Integer, UNLIMITED] size the maximum number of messages stored in the channel.
Calls superclass method
# File lib-edge/concurrent/edge/promises.rb, line 72 def initialize(size = UNLIMITED) super() @Size = size # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation @Mutex = Mutex.new @Probes = [] @Messages = [] @PendingPush = [] end
Public Instance Methods
<=>(other)
click to toggle source
# File lib-edge/concurrent/edge/promises.rb, line 61 def <=>(other) 1 end
pop(probe = Concurrent::Promises.resolvable_future)
click to toggle source
Returns a future witch will become fulfilled with a value from the channel when one is available. @param [ResolvableFuture] probe the future which will be fulfilled with a channel value @return [Future] the probe, its value will be the message when available.
# File lib-edge/concurrent/edge/promises.rb, line 111 def pop(probe = Concurrent::Promises.resolvable_future) # TODO (pitr-ch 26-Dec-2016): improve performance pop_for_select(probe).then(&:last) end
pop_for_select(probe = Concurrent::Promises.resolvable_future)
click to toggle source
@!visibility private
# File lib-edge/concurrent/edge/promises.rb, line 117 def pop_for_select(probe = Concurrent::Promises.resolvable_future) @Mutex.synchronize do if @Messages.empty? @Probes.push probe else message = @Messages.shift probe.fulfill [self, message] unless @PendingPush.empty? message, pushed = @PendingPush.shift @Messages.push message pushed.fulfill message end end end probe end
push(message)
click to toggle source
Returns future which will fulfill when the message is added to the channel. Its value is the message. @param [Object] message @return [Future]
# File lib-edge/concurrent/edge/promises.rb, line 86 def push(message) @Mutex.synchronize do while true if @Probes.empty? if @Size > @Messages.size @Messages.push message return Promises.fulfilled_future message else pushed = Promises.resolvable_future @PendingPush.push [message, pushed] return pushed.with_hidden_resolvable end else probe = @Probes.shift if probe.fulfill [self, message], false return Promises.fulfilled_future(message) end end end end end
to_s()
click to toggle source
# File lib-edge/concurrent/edge/promises.rb, line 65 def to_s 'unlimited' end
Also aliased as: inspect