Copyright (c) 2013 Brian Durand
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.
The design goals of this gem are:
Stay true to the spirit of the languages providing inspiration
But implement in a way that makes sense for Ruby
Keep the semantics as idiomatic Ruby as possible
Support features that make sense in Ruby
Exclude features that don't make sense in Ruby
Be small, lean, and loosely coupled
@see AtExitImplementation @!visibility private
@see Concurrent::AtomicReference @deprecated Use Concurrent::AtomicReference instead.
@!visibility private @!macro internal_implementation_note
@!visibility private @!macro internal_implementation_note
create the default configuration on load
Raised when an asynchronous operation is cancelled before execution.
Raised when errors occur during configuration.
@!visibility private
@!visibility private
@!visibility private
@!visibility private
Clock that cannot be set and represents monotonic time since some unspecified starting point.
@!visibility private
@!visibility private
Raised when an attempt is made to violate an immutability guarantee.
Raised when an object's methods are called when it has not been properly initialized.
Raised when a lifecycle method (such as `stop`) is called in an improper sequence or when the object is in an inappropriate state.
Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.
Raised when an attempt is made to modify an immutable object (such as an `IVar`) after its final state has been set.
Suppresses all output when used for logging.
Raised by an `Executor` when it is unable to process a given task, possibly because of a reject policy or other internal error.
Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.
@!visibility private @!macro internal_implementation_note
@!visibility private @!macro internal_implementation_note
Raised when an operation times out.
Abort a currently running transaction - see `Concurrent::atomically`.
# File lib/concurrent/tvar.rb, line 146 def abort_transaction raise Transaction::AbortError.new end
Run a block that reads and writes `TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.
There are some very important and unusual semantics that you must be aware of:
Most importantly, the block that you pass to atomically may be executed
more than once. In most cases your code should be free of side-effects, except for via TVar.
If an exception escapes an atomically block it will abort the transaction.
It is undefined behaviour to use callcc or Fiber with atomically.
If you create a new thread within an atomically, it will not be part of
the transaction. Creating a thread counts as a side-effect.
Transactions within transactions are flattened to a single transaction.
@example
a = new TVar(100_000) b = new TVar(100) Concurrent::atomically do a.value -= 10 b.value += 10 end
# File lib/concurrent/tvar.rb, line 89 def atomically raise ArgumentError.new('no block given') unless block_given? # Get the current transaction transaction = Transaction::current # Are we not already in a transaction (not nested)? if transaction.nil? # New transaction begin # Retry loop loop do # Create a new transaction transaction = Transaction.new Transaction::current = transaction # Run the block, aborting on exceptions begin result = yield rescue Transaction::AbortError => e transaction.abort result = Transaction::ABORTED rescue Transaction::LeaveError => e transaction.abort break result rescue => e transaction.abort raise e end # If we can commit, break out of the loop if result != Transaction::ABORTED if transaction.commit break result end end end ensure # Clear the current transaction Transaction::current = nil end else # Nested transaction - flatten it and just run the block yield end end
# File lib/concurrent/dataflow.rb, line 55 def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar } result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end
@return [Configuration]
# File lib/concurrent/configuration.rb, line 267 def self.configuration CONFIGURATION.value end
Perform gem-level configuration.
@yield the configuration commands @yieldparam [Configuration] the current configuration object
# File lib/concurrent/configuration.rb, line 275 def self.configure yield(configuration) end
@return [Logger] Logger with provided level and output.
# File lib/concurrent/configuration.rb, line 46 def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr) logger = Logger.new(output) logger.level = level logger.formatter = lambda do |severity, datetime, progname, msg| formatted_message = case msg when String msg when Exception format "%s (%s)\n%s", msg.message, msg.class, (msg.backtrace || []).join("\n") else msg.inspect end format "[%s] %5s -- %s: %s\n", datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), severity, progname, formatted_message end logger end
{include:file:doc/dataflow.md}
@param [Future] inputs zero or more `Future` operations that this dataflow depends upon
@yield The operation to perform once all the dependencies are met @yieldparam [Future] inputs each of the `Future` inputs to the dataflow @yieldreturn [Object] the result of the block operation
@return [Object] the result of all the operations
@raise [ArgumentError] if no block is given @raise [ArgumentError] if any of the inputs are not `IVar`s
# File lib/concurrent/dataflow.rb, line 33 def dataflow(*inputs, &block) dataflow_with(Concurrent.global_io_executor, *inputs, &block) end
# File lib/concurrent/dataflow.rb, line 43 def dataflow!(*inputs, &block) dataflow_with!(Concurrent.global_io_executor, *inputs, &block) end
# File lib/concurrent/dataflow.rb, line 38 def dataflow_with(executor, *inputs, &block) call_dataflow(:value, executor, *inputs, &block) end
# File lib/concurrent/dataflow.rb, line 48 def dataflow_with!(executor, *inputs, &block) call_dataflow(:value!, executor, *inputs, &block) end
Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer's responsibility to ensure that the handlers are shutdown properly prior to application exit by calling {AtExit.run} method.
@note this option should be needed only because of `at_exit` ordering
issues which may arise when running some of the testing frameworks. E.g. Minitest's test-suite runs itself in `at_exit` callback which executes after the pools are already terminated. Then auto termination needs to be disabled and called manually after test-suite ends.
@note This method should never be called
from within a gem. It should *only* be used from within the main application and even then it should be used only when necessary.
@see AtExit
# File lib/concurrent/configuration.rb, line 90 def self.disable_at_exit_handlers! AtExit.enabled = false end
# File lib/concurrent/configuration.rb, line 94 def self.disable_executor_auto_termination! deprecated_method 'disable_executor_auto_termination!', 'disable_at_exit_handlers!' disable_at_exit_handlers! end
@return [true,false] @see .disable_executor_auto_termination!
# File lib/concurrent/configuration.rb, line 101 def self.disable_executor_auto_termination? deprecated_method 'disable_executor_auto_termination?', 'Concurrent::AtExit.enabled?' AtExit.enabled? end
General access point to global executors. @param [Symbol, Executor] executor_identifier symbols:
- :fast - {Concurrent.global_fast_executor} - :io - {Concurrent.global_io_executor} - :immediate - {Concurrent.global_immediate_executor}
@return [Executor]
# File lib/concurrent/configuration.rb, line 144 def self.executor(executor_identifier) Executor.executor(executor_identifier) end
Global thread pool optimized for short, fast operations.
@return [ThreadPoolExecutor] the thread pool
# File lib/concurrent/configuration.rb, line 116 def self.global_fast_executor GLOBAL_FAST_EXECUTOR.value end
# File lib/concurrent/configuration.rb, line 127 def self.global_immediate_executor GLOBAL_IMMEDIATE_EXECUTOR end
Global thread pool optimized for long, blocking (IO) tasks.
@return [ThreadPoolExecutor] the thread pool
# File lib/concurrent/configuration.rb, line 123 def self.global_io_executor GLOBAL_IO_EXECUTOR.value end
# File lib/concurrent/configuration.rb, line 37 def self.global_logger GLOBAL_LOGGER.value end
# File lib/concurrent/configuration.rb, line 41 def self.global_logger=(value) GLOBAL_LOGGER.value = value end
Global thread pool user for global timers.
@return [Concurrent::TimerSet] the thread pool
# File lib/concurrent/configuration.rb, line 134 def self.global_timer_set GLOBAL_TIMER_SET.value end
Leave a transaction without commiting or aborting - see `Concurrent::atomically`.
# File lib/concurrent/tvar.rb, line 151 def leave_transaction raise Transaction::LeaveError.new end
@!macro [attach] monotonic_get_time
Returns the current time a tracked by the application monotonic clock. @return [Float] The current monotonic time when `since` not given else the elapsed monotonic time between `since` and the current time @!macro monotonic_clock_warning
# File lib/concurrent/utility/monotonic_time.rb, line 54 def monotonic_time GLOBAL_MONOTONIC_CLOCK.get_time end
# File lib/concurrent/utility/monotonic_time.rb, line 6 def initialize super() @last_time = Time.now.to_f ensure_ivar_visibility! end
# File lib/concurrent/configuration.rb, line 148 def self.new_fast_executor(opts = {}) FixedThreadPool.new( [2, Concurrent.processor_count].max, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :abort # shouldn't matter -- 0 max queue ) end
# File lib/concurrent/configuration.rb, line 158 def self.new_io_executor(opts = {}) ThreadPoolExecutor.new( min_threads: [2, Concurrent.processor_count].max, max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE, # max_threads: 1000, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :abort # shouldn't matter -- 0 max queue ) end
# File lib/concurrent/utility/processor_counter.rb, line 153 def self.physical_processor_count processor_counter.physical_processor_count end
# File lib/concurrent/utility/processor_counter.rb, line 149 def self.processor_count processor_counter.processor_count end
terminates all pools and blocks until they are terminated @see .disable_executor_auto_termination!
# File lib/concurrent/configuration.rb, line 108 def self.terminate_pools! deprecated_method 'terminate_pools!', 'Concurrent::AtExit.run' AtExit.run end
Wait the given number of seconds for the block operation to complete.
Intended to be a simpler and more reliable replacement to the Ruby standard library `Timeout::timeout` method. It does not kill the task so it finishes anyway. Advantage is that it cannot cause any ugly errors by killing threads.
@param [Integer] seconds The number of seconds to wait @return [Object] The result of the block operation
@raise [Concurrent::TimeoutError] when the block operation does not complete
in the allotted number of seconds.
@see ruby-doc.org/stdlib-2.2.0/libdoc/timeout/rdoc/Timeout.html Ruby Timeout::timeout
@!macro monotonic_clock_warning
@deprecated timeout is deprecated and will be removed
# File lib/concurrent/utility/timeout.rb, line 27 def timeout(seconds, &block) deprecated 'timeout is deprecated and will be removed' future = Future.execute(&block) future.wait(seconds) if future.complete? future.value! else raise TimeoutError end end
Perform the given operation asynchronously after
the given number of seconds.
@param [Fixnum] seconds the interval in seconds to wait before executing the task
@yield the task to execute
@return [Concurrent::ScheduledTask] IVar representing the task
@see Concurrent::ScheduledTask
@deprecated use `ScheduledTask` instead
# File lib/concurrent/utility/timer.rb, line 19 def timer(seconds, *args, &block) deprecated_method 'Concurrent.timer', 'ScheduledTask' raise ArgumentError.new('no block given') unless block_given? raise ArgumentError.new('interval must be greater than or equal to zero') if seconds < 0 Concurrent.global_timer_set.post(seconds, *args, &block) end
Use logger created by create_stdlib_logger to log concurrent-ruby messages.
# File lib/concurrent/configuration.rb, line 69 def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr) logger = create_stdlib_logger level, output Concurrent.global_logger = lambda do |level, progname, message = nil, &block| logger.add level, message, progname, &block end end
Generated with the Darkfish Rdoc Generator 2.