class Concurrent::ErlangActor::OnThread

Constants

TERMINATE

Public Class Methods

new(channel, environment, name, executor) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1188
def initialize(channel, environment, name, executor)
  super channel, environment, name, executor
  @Thread = nil
end

Public Instance Methods

receive(*rules, timeout: nil, timeout_value: nil, &given_block) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1219
def receive(*rules, timeout: nil, timeout_value: nil, &given_block)
  clean_reply

  err = canonical_rules rules, timeout, timeout_value, given_block
  raise err if err

  rules_matcher = Or[*rules.map(&:first)]
  matcher       = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m }
  while true
    message = @Mailbox.pop_matching(matcher, timeout, TIMEOUT)
    log Logger::DEBUG, pid, got: message
    unless (message = consume_signal(message)) == NOTHING
      rules.each do |rule, job|
        return eval_task(message, job) if rule === message
      end
    end
  end
end
run(*args, &body) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1196
def run(*args, &body)
  initial_signal_consumption
  @Thread = Thread.new(@Terminated, self) do |terminated, _actor| # sync point
    Thread.abort_on_exception = true

    final_reason = begin
      reason, value = catch(TERMINATE) do
        [:normal, @Environment.instance_exec(*args, &body)]
      end
      send_exit_messages reason
      terminated.resolve(reason == :normal, value, reason)
      reason
    rescue => e
      send_exit_messages e
      terminated.reject e
      e
    end

    after_termination final_reason
    @Thread = nil
  end
end

Private Instance Methods

terminate_self(reason, value) click to toggle source
# File lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb, line 1240
def terminate_self(reason, value)
  throw TERMINATE, [reason, value]
end