class Concurrent::TimerSet

Executes a collection of tasks, each after a given delay. A master task monitors the set and schedules each task for execution at the appropriate time. Tasks are run on the global thread pool or on the supplied executor. Each task is represented as a `ScheduledTask`.

@see Concurrent::ScheduledTask

@!macro monotonic_clock_warning

Public Class Methods

new(opts = {}) click to toggle source

Create a new set of timed tasks.

@!macro executor_options

@param [Hash] opts the options used to specify the executor on which to perform actions
@option opts [Executor] :executor when set use the given `Executor` instance.
  Three special values are also supported: `:task` returns the global task pool,
  `:operation` returns the global operation pool, and `:immediate` returns a new
  `ImmediateExecutor` object.
Calls superclass method Concurrent::RubyExecutorService::new
# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 30
def initialize(opts = {})
  super(opts)
end

Public Instance Methods

kill() click to toggle source

Begin an immediate shutdown. In-progress tasks will be allowed to complete but enqueued tasks will be dismissed and no new tasks will be accepted. Has no additional effect if the thread pool is not running.

# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 62
def kill
  shutdown
end
post(delay, *args, &task) click to toggle source

Post a task to be execute run after a given delay (in seconds). If the delay is less than 1/100th of a second the task will be immediately post to the executor.

@param [Float] delay the number of seconds to wait for before executing the task. @param [Array<Object>] args the arguments passed to the task on execution.

@yield the task to be performed.

@return [Concurrent::ScheduledTask, false] IVar representing the task if the post

is successful; false after shutdown.

@raise [ArgumentError] if the intended execution time is not in the future. @raise [ArgumentError] if no block is given.

# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 48
def post(delay, *args, &task)
  raise ArgumentError.new('no block given') unless block_given?
  return false unless running?
  opts = { executor:  @task_executor,
           args:      args,
           timer_set: self }
  task = ScheduledTask.execute(delay, opts, &task) # may raise exception
  task.unscheduled? ? false : task
end

Private Instance Methods

ns_initialize(opts) click to toggle source

Initialize the object.

@param [Hash] opts the options to create the object with. @!visibility private

# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 74
def ns_initialize(opts)
  @queue              = Collection::NonConcurrentPriorityQueue.new(order: :min)
  @task_executor      = Options.executor_from_options(opts) || Concurrent.global_io_executor
  @timer_executor     = SingleThreadExecutor.new
  @condition          = Event.new
  @ruby_pid           = $$ # detects if Ruby has forked
end
ns_post_task(task) click to toggle source

@!visibility private

# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 94
def ns_post_task(task)
  return false unless ns_running?
  ns_reset_if_forked
  if (task.initial_delay) <= 0.01
    task.executor.post { task.process_task }
  else
    @queue.push(task)
    # only post the process method when the queue is empty
    @timer_executor.post(&method(:process_tasks)) if @queue.length == 1
    @condition.set
  end
  true
end
ns_reset_if_forked() click to toggle source
# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 129
def ns_reset_if_forked
  if $$ != @ruby_pid
    @queue.clear
    @condition.reset
    @ruby_pid = $$
  end
end
ns_shutdown_execution() click to toggle source

`ExecutorService` callback called during shutdown.

@!visibility private

# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 122
def ns_shutdown_execution
  ns_reset_if_forked
  @queue.clear
  @timer_executor.kill
  stopped_event.set
end
post_task(task) click to toggle source

Post the task to the internal queue.

@note This is intended as a callback method from ScheduledTask

only. It is not intended to be used directly. Post a task
by using the `SchedulesTask#execute` method.

@!visibility private

# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 89
def post_task(task)
  synchronize { ns_post_task(task) }
end
process_tasks() click to toggle source

Run a loop and execute tasks in the scheduled order and at the approximate scheduled time. If no tasks remain the thread will exit gracefully so that garbage collection can occur. If there are no ready tasks it will sleep for up to 60 seconds waiting for the next scheduled task.

@!visibility private

# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 143
def process_tasks
  loop do
    task = synchronize { @condition.reset; @queue.peek }
    break unless task

    now  = Concurrent.monotonic_time
    diff = task.schedule_time - now

    if diff <= 0
      # We need to remove the task from the queue before passing
      # it to the executor, to avoid race conditions where we pass
      # the peek'ed task to the executor and then pop a different
      # one that's been added in the meantime.
      #
      # Note that there's no race condition between the peek and
      # this pop - this pop could retrieve a different task from
      # the peek, but that task would be due to fire now anyway
      # (because @queue is a priority queue, and this thread is
      # the only reader, so whatever timer is at the head of the
      # queue now must have the same pop time, or a closer one, as
      # when we peeked).
      task = synchronize { @queue.pop }
      task.executor.post { task.process_task }
    else
      @condition.wait([diff, 60].min)
    end
  end
end
remove_task(task) click to toggle source

Remove the given task from the queue.

@note This is intended as a callback method from `ScheduledTask`

only. It is not intended to be used directly. Cancel a task
by using the `ScheduledTask#cancel` method.

@!visibility private

# File lib/concurrent-ruby/concurrent/executor/timer_set.rb, line 115
def remove_task(task)
  synchronize { @queue.delete(task) }
end