class Concurrent::ProcessingActor
A new implementation of actor which also simulates the process, therefore it can be used in the same way as Erlang's actors but without occupying thread. A tens of thousands ProcessingActors can run at the same time sharing a thread pool. @example
# Runs on a pool, does not consume 50_000 threads actors = 50_000.times.map do |i| Concurrent::ProcessingActor.act(i) { |a, i| a.receive.then_on(:fast, i) { |m, i| m + i } } end actors.each { |a| a.tell 1 } values = actors.map(&:termination).map(&:value) values[0,5] # => [1, 2, 3, 4, 5] values[-5, 5] # => [49996, 49997, 49998, 49999, 50000]
@!macro warn.edge
Public Class Methods
Creates an actor. @see act_listening Behaves the same way, but does not take mailbox as a first argument. @return [ProcessingActor] @example
actor = Concurrent::ProcessingActor.act do |actor| actor.receive.then do |message| # the actor ends normally with message message end end actor.tell :a_message # => <#Concurrent::ProcessingActor:0x7fff11280560 termination:pending> actor.termination.value! # => :a_message
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 50 def self.act(*args, &process) act_listening Promises::Channel.new, *args, &process end
Creates an actor listening to a specified channel (mailbox). @param [Object] args Arguments passed to the process. @param [Promises::Channel] channel which serves as mailing box. The channel can have limited
size to achieve backpressure.
@yield [actor, *args] to the process to get back a future which represents the actors execution. @yieldparam [ProcessingActor] actor @yieldparam [Object] *args @yieldreturn [Promises::Future(Object)] a future representing next step of execution @return [ProcessingActor]
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 63 def self.act_listening(channel, *args, &process) ProcessingActor.new channel, *args, &process end
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 173 def initialize(channel, *args, &process) @Mailbox = channel @Terminated = Promises.future(self, *args, &process).run super() end
Public Instance Methods
actor.ask2 { |a| [:count, a] }
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 153 def ask_op(answer = Promises.resolvable_future, &message_provider) # TODO (pitr-ch 12-Dec-2018): is it ok to let the answers be unanswered when the actor terminates tell_op(message_provider.call(answer)).then(answer) { |_, a| a } # answer.chain { |v| [true, v] } | @Terminated.then end
@return [Promises::Channel] actor's mailbox.
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 25 def mailbox @Mailbox end
# Receives a message when available, used in the actor's process. # @return [Promises::Future(Object)] a future which will be fulfilled with a message from # mailbox when it is available. def receive(*channels)
channels = [@Mailbox] if channels.empty? Promises::Channel.select(*channels) # TODO (pitr-ch 27-Dec-2016): support patterns # - put any received message aside if it does not match # - on each receive call check the messages put aside # - track where the message came from, cannot later receive m form other channel only because it matches
end
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 79 def receive(channel = mailbox) channel.pop_op end
Tells a message to the actor. May block current thread if the mailbox is full. {#tell_op} is a better option since it does not block. It's usually used to integrate with threading code. @example
Thread.new(actor) do |actor| # ... actor.tell! :a_message # blocks until the message is told # (there is a space for it in the channel) # ... end
@param [Object] message @return [self]
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 95 def tell!(message) @Mailbox.push(message) self end
Tells a message to the actor. @param [Object] message @return [Promises::Future(ProcessingActor
)] a future which will be fulfilled with the actor
when the message is pushed to mailbox.
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 104 def tell_op(message) @Mailbox.push_op(message).then(self) { |_ch, actor| actor } end
@return [Promises::Future(Object)] a future which is resolved when the actor ends its processing.
It can either be fulfilled with a value when actor ends normally or rejected with a reason (exception) when actor fails.
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 32 def termination @Terminated.with_hidden_resolvable end
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 167 def to_ary [@Mailbox, @Terminated] end
@return [String] string representation.
# File lib/concurrent-ruby-edge/concurrent/edge/processing_actor.rb, line 161 def to_s format '%s termination: %s>', super[0..-2], termination.state end