In Files

Namespace

Files

Class/Module Index [+]

Quicksearch

Concurrent

Copyright (c) 2013 Brian Durand

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.

The design goals of this gem are:

Constants

AtExit

@see AtExitImplementation @!visibility private

Atomic

@see Concurrent::AtomicReference @deprecated Use Concurrent::AtomicReference instead.

AtomicBooleanImplementation

@!visibility private @!macro internal_implementation_note

AtomicFixnumImplementation

@!visibility private @!macro internal_implementation_note

CONFIGURATION

create the default configuration on load

CancelledOperationError

Raised when an asynchronous operation is cancelled before execution.

ConfigurationError

Raised when errors occur during configuration.

EDGE_VERSION
Error
GLOBAL_FAST_EXECUTOR

@!visibility private

GLOBAL_IMMEDIATE_EXECUTOR

@!visibility private

GLOBAL_IO_EXECUTOR

@!visibility private

GLOBAL_LOGGER

@!visibility private

GLOBAL_MONOTONIC_CLOCK

Clock that cannot be set and represents monotonic time since some unspecified starting point.

@!visibility private

GLOBAL_TIMER_SET

@!visibility private

ImmutabilityError

Raised when an attempt is made to violate an immutability guarantee.

InitializationError

Raised when an object's methods are called when it has not been properly initialized.

LifecycleError

Raised when a lifecycle method (such as `stop`) is called in an improper sequence or when the object is in an inappropriate state.

MaxRestartFrequencyError

Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.

MultipleAssignmentError

Raised when an attempt is made to modify an immutable object (such as an `IVar`) after its final state has been set.

NULL_LOGGER

Suppresses all output when used for logging.

PromiseExecutionError
RejectedExecutionError

Raised by an `Executor` when it is unable to process a given task, possibly because of a reject policy or other internal error.

ResourceLimitError

Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.

SemaphoreImplementation

@!visibility private @!macro internal_implementation_note

SingleThreadExecutorImplementation
ThreadLocalVarImplementation

@!visibility private @!macro internal_implementation_note

ThreadPoolExecutorImplementation
TimeoutError

Raised when an operation times out.

VERSION

Public Class Methods

abort_transaction() click to toggle source

Abort a currently running transaction - see `Concurrent::atomically`.

# File lib/concurrent/tvar.rb, line 146
def abort_transaction
  raise Transaction::AbortError.new
end
atomically() click to toggle source

Run a block that reads and writes `TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed

    more than once. In most cases your code should be free of
    side-effects, except for via TVar.
  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of

    the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

@example

a = new TVar(100_000)
b = new TVar(100)

Concurrent::atomically do
  a.value -= 10
  b.value += 10
end
# File lib/concurrent/tvar.rb, line 89
def atomically
  raise ArgumentError.new('no block given') unless block_given?

  # Get the current transaction

  transaction = Transaction::current

  # Are we not already in a transaction (not nested)?

  if transaction.nil?
    # New transaction

    begin
      # Retry loop

      loop do

        # Create a new transaction

        transaction = Transaction.new
        Transaction::current = transaction

        # Run the block, aborting on exceptions

        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue Transaction::LeaveError => e
          transaction.abort
          break result
        rescue => e
          transaction.abort
          raise e
        end
        # If we can commit, break out of the loop

        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction

      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block

    yield
  end
end
call_dataflow(method, executor, *inputs, &block) click to toggle source
# File lib/concurrent/dataflow.rb, line 55
def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }

  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end
configuration() click to toggle source

@return [Configuration]

# File lib/concurrent/configuration.rb, line 267
def self.configuration
  CONFIGURATION.value
end
configure() click to toggle source

Perform gem-level configuration.

@yield the configuration commands @yieldparam [Configuration] the current configuration object

# File lib/concurrent/configuration.rb, line 275
def self.configure
  yield(configuration)
end
create_stdlib_logger(level = Logger::FATAL, output = $stderr) click to toggle source

@return [Logger] Logger with provided level and output.

# File lib/concurrent/configuration.rb, line 15
def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr)
  logger           = Logger.new(output)
  logger.level     = level
  logger.formatter = lambda do |severity, datetime, progname, msg|
    formatted_message = case msg
                        when String
                          msg
                        when Exception
                          format "%s (%s)\n%s",
                                 msg.message, msg.class, (msg.backtrace || []).join("\n")
                        else
                          msg.inspect
                        end
    format "[%s] %5s -- %s: %s\n",
           datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
           severity,
           progname,
           formatted_message
  end

  lambda do |level, progname, message = nil, &block|
    logger.add level, message, progname, &block
  end
end
dataflow(*inputs, &block) click to toggle source

{include:file:doc/dataflow.md}

@param [Future] inputs zero or more `Future` operations that this dataflow depends upon

@yield The operation to perform once all the dependencies are met @yieldparam [Future] inputs each of the `Future` inputs to the dataflow @yieldreturn [Object] the result of the block operation

@return [Object] the result of all the operations

@raise [ArgumentError] if no block is given @raise [ArgumentError] if any of the inputs are not `IVar`s

# File lib/concurrent/dataflow.rb, line 33
def dataflow(*inputs, &block)
  dataflow_with(Concurrent.global_io_executor, *inputs, &block)
end
dataflow!(*inputs, &block) click to toggle source
# File lib/concurrent/dataflow.rb, line 43
def dataflow!(*inputs, &block)
  dataflow_with!(Concurrent.global_io_executor, *inputs, &block)
end
dataflow_with(executor, *inputs, &block) click to toggle source
# File lib/concurrent/dataflow.rb, line 38
def dataflow_with(executor, *inputs, &block)
  call_dataflow(:value, executor, *inputs, &block)
end
dataflow_with!(executor, *inputs, &block) click to toggle source
# File lib/concurrent/dataflow.rb, line 48
def dataflow_with!(executor, *inputs, &block)
  call_dataflow(:value!, executor, *inputs, &block)
end
disable_at_exit_handlers!() click to toggle source

Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer's responsibility to ensure that the handlers are shutdown properly prior to application exit by calling {AtExit.run} method.

@note this option should be needed only because of `at_exit` ordering

issues which may arise when running some of the testing frameworks.
E.g. Minitest's test-suite runs itself in `at_exit` callback which
executes after the pools are already terminated. Then auto termination
needs to be disabled and called manually after test-suite ends.

@note This method should never be called

from within a gem. It should *only* be used from within the main
application and even then it should be used only when necessary.

@see AtExit

# File lib/concurrent/configuration.rb, line 90
def self.disable_at_exit_handlers!
  AtExit.enabled = false
end
disable_executor_auto_termination!() click to toggle source
# File lib/concurrent/configuration.rb, line 94
def self.disable_executor_auto_termination!
  deprecated_method 'disable_executor_auto_termination!', 'disable_at_exit_handlers!'
  disable_at_exit_handlers!
end
disable_executor_auto_termination?() click to toggle source

@return [true,false] @see .disable_executor_auto_termination!

# File lib/concurrent/configuration.rb, line 101
def self.disable_executor_auto_termination?
  deprecated_method 'disable_executor_auto_termination?', 'Concurrent::AtExit.enabled?'
  AtExit.enabled?
end
executor(executor_identifier) click to toggle source

General access point to global executors. @param [Symbol, Executor] executor_identifier symbols:

- :fast - {Concurrent.global_fast_executor}
- :io - {Concurrent.global_io_executor}
- :immediate - {Concurrent.global_immediate_executor}

@return [Executor]

# File lib/concurrent/configuration.rb, line 144
def self.executor(executor_identifier)
  Executor.executor(executor_identifier)
end
global_fast_executor() click to toggle source

Global thread pool optimized for short, fast operations.

@return [ThreadPoolExecutor] the thread pool

# File lib/concurrent/configuration.rb, line 116
def self.global_fast_executor
  GLOBAL_FAST_EXECUTOR.value
end
global_immediate_executor() click to toggle source
# File lib/concurrent/configuration.rb, line 127
def self.global_immediate_executor
  GLOBAL_IMMEDIATE_EXECUTOR
end
global_io_executor() click to toggle source

Global thread pool optimized for long, blocking (IO) tasks.

@return [ThreadPoolExecutor] the thread pool

# File lib/concurrent/configuration.rb, line 123
def self.global_io_executor
  GLOBAL_IO_EXECUTOR.value
end
global_logger() click to toggle source
# File lib/concurrent/configuration.rb, line 52
def self.global_logger
  GLOBAL_LOGGER.value
end
global_logger=(value) click to toggle source
# File lib/concurrent/configuration.rb, line 56
def self.global_logger=(value)
  GLOBAL_LOGGER.value = value
end
global_timer_set() click to toggle source

Global thread pool user for global timers.

@return [Concurrent::TimerSet] the thread pool

# File lib/concurrent/configuration.rb, line 134
def self.global_timer_set
  GLOBAL_TIMER_SET.value
end
leave_transaction() click to toggle source

Leave a transaction without commiting or aborting - see `Concurrent::atomically`.

# File lib/concurrent/tvar.rb, line 151
def leave_transaction
  raise Transaction::LeaveError.new
end
monotonic_time() click to toggle source

@!macro [attach] monotonic_get_time

Returns the current time a tracked by the application monotonic clock.

@return [Float] The current monotonic time when `since` not given else
  the elapsed monotonic time between `since` and the current time

@!macro monotonic_clock_warning
# File lib/concurrent/utility/monotonic_time.rb, line 54
def monotonic_time
  GLOBAL_MONOTONIC_CLOCK.get_time
end
new() click to toggle source
# File lib/concurrent/utility/monotonic_time.rb, line 6
def initialize
  super()
  @last_time = Time.now.to_f
  ensure_ivar_visibility!
end
new_fast_executor(opts = {}) click to toggle source
# File lib/concurrent/configuration.rb, line 148
def self.new_fast_executor(opts = {})
  FixedThreadPool.new(
      [2, Concurrent.processor_count].max,
      auto_terminate:  opts.fetch(:auto_terminate, true),
      idletime:        60, # 1 minute
      max_queue:       0, # unlimited
      fallback_policy: :abort # shouldn't matter -- 0 max queue
  )
end
new_io_executor(opts = {}) click to toggle source
# File lib/concurrent/configuration.rb, line 158
def self.new_io_executor(opts = {})
  ThreadPoolExecutor.new(
      min_threads:     [2, Concurrent.processor_count].max,
      max_threads:     ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
      # max_threads:     1000,
      auto_terminate:  opts.fetch(:auto_terminate, true),
      idletime:        60, # 1 minute
      max_queue:       0, # unlimited
      fallback_policy: :abort # shouldn't matter -- 0 max queue
  )
end
physical_processor_count() click to toggle source
# File lib/concurrent/utility/processor_counter.rb, line 153
def self.physical_processor_count
  processor_counter.physical_processor_count
end
processor_count() click to toggle source
# File lib/concurrent/utility/processor_counter.rb, line 149
def self.processor_count
  processor_counter.processor_count
end
terminate_pools!() click to toggle source

terminates all pools and blocks until they are terminated @see .disable_executor_auto_termination!

# File lib/concurrent/configuration.rb, line 108
def self.terminate_pools!
  deprecated_method 'terminate_pools!', 'Concurrent::AtExit.run'
  AtExit.run
end
timeout(seconds, &block) click to toggle source
DEPRECATED

Wait the given number of seconds for the block operation to complete.

Intended to be a simpler and more reliable replacement to the Ruby standard library `Timeout::timeout` method. It does not kill the task so it finishes anyway. Advantage is that it cannot cause any ugly errors by killing threads.

@param [Integer] seconds The number of seconds to wait @return [Object] The result of the block operation

@raise [Concurrent::TimeoutError] when the block operation does not complete

in the allotted number of seconds.

@see ruby-doc.org/stdlib-2.2.0/libdoc/timeout/rdoc/Timeout.html Ruby Timeout::timeout

@!macro monotonic_clock_warning

@deprecated timeout is deprecated and will be removed

# File lib/concurrent/utility/timeout.rb, line 27
def timeout(seconds, &block)
  deprecated 'timeout is deprecated and will be removed'

  future = Future.execute(&block)
  future.wait(seconds)
  if future.complete?
    future.value!
  else
    raise TimeoutError
  end
end
timer(seconds, *args, &block) click to toggle source
DEPRECATED

Perform the given operation asynchronously after

the given number of seconds.

@param [Fixnum] seconds the interval in seconds to wait before executing the task

@yield the task to execute

@return [Concurrent::ScheduledTask] IVar representing the task

@see Concurrent::ScheduledTask

@deprecated use `ScheduledTask` instead

# File lib/concurrent/utility/timer.rb, line 19
def timer(seconds, *args, &block)
  deprecated_method 'Concurrent.timer', 'ScheduledTask'
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0
  Concurrent.global_timer_set.post(seconds, *args, &block)
end
use_stdlib_logger(level = Logger::FATAL, output = $stderr) click to toggle source

Use logger created by create_stdlib_logger to log concurrent-ruby messages.

# File lib/concurrent/configuration.rb, line 41
def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr)
  Concurrent.global_logger = create_stdlib_logger level, output
end

Public Instance Methods

get_time() click to toggle source

@!visibility private

# File lib/concurrent/utility/monotonic_time.rb, line 14
def get_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.