class Dynflow::World

Constants

TriggerResult

Attributes

action_classes[R]
auto_rescue[R]
auto_validity_check[R]
client_dispatcher[R]
clock[R]
config[R]
connector[R]
coordinator[R]
dead_letter_handler[R]
delayed_executor[R]
execution_plan_cleaner[R]
executor[R]
executor_dispatcher[R]
id[R]
logger_adapter[R]
meta[R]
middleware[R]
persistence[R]
subscription_index[R]
terminated[R]
termination_timeout[R]
throttle_limiter[R]
transaction_adapter[R]
validity_check_timeout[R]

Public Class Methods

new(config) click to toggle source
# File lib/dynflow/world.rb, line 15
def initialize(config)
  @config = Config::ForWorld.new(config, self)

  # Set the telemetry instance as soon as possible
  Dynflow::Telemetry.set_adapter @config.telemetry_adapter
  Dynflow::Telemetry.register_metrics!

  @id                     = SecureRandom.uuid
  @clock                  = spawn_and_wait(Clock, 'clock')
  @logger_adapter         = @config.logger_adapter
  @config.validate
  @transaction_adapter    = @config.transaction_adapter
  @persistence            = Persistence.new(self, @config.persistence_adapter,
                                            :backup_deleted_plans => @config.backup_deleted_plans,
                                            :backup_dir => @config.backup_dir)
  @coordinator            = Coordinator.new(@config.coordinator_adapter)
  @executor               = @config.executor
  @action_classes         = @config.action_classes
  @auto_rescue            = @config.auto_rescue
  @exit_on_terminate      = Concurrent::AtomicBoolean.new(@config.exit_on_terminate)
  @connector              = @config.connector
  @middleware             = Middleware::World.new
  @middleware.use Middleware::Common::Transaction if @transaction_adapter
  @client_dispatcher      = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age)
  @dead_letter_handler    = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers)
  @auto_validity_check    = @config.auto_validity_check
  @validity_check_timeout = @config.validity_check_timeout
  @throttle_limiter       = @config.throttle_limiter
  @terminated             = Concurrent.event
  @termination_timeout    = @config.termination_timeout
  calculate_subscription_index

  if executor
    @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore)
    executor.initialized.wait
  end
  update_register
  perform_validity_checks if auto_validity_check

  @termination_barrier = Mutex.new
  @before_termination_hooks = Queue.new

  if @config.auto_terminate
    at_exit do
      @exit_on_terminate.make_false # make sure we don't terminate twice
      self.terminate.wait
    end
  end
  post_initialization
end

Public Instance Methods

action_logger() click to toggle source
# File lib/dynflow/world.rb, line 107
def action_logger
  logger_adapter.action_logger
end
auto_execute() click to toggle source

24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven't reported any error yet (usually when no executor was available by the time of planning or terminating)

# File lib/dynflow/world.rb, line 256
def auto_execute
  coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do
    planned_execution_plans =
        self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) }
    planned_execution_plans.map do |ep|
      if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty?
        execute(ep.id)
      end
    end.compact
  end
rescue Coordinator::LockError => e
  logger.info "auto-executor lock already aquired: #{e.message}"
  []
end
before_termination(&block) click to toggle source
# File lib/dynflow/world.rb, line 77
def before_termination(&block)
  @before_termination_hooks << block
end
delay(*args) click to toggle source
# File lib/dynflow/world.rb, line 185
def delay(*args)
  delay_with_caller(nil, *args)
end
delay_with_caller(caller_action, action_class, delay_options, *args) click to toggle source
# File lib/dynflow/world.rb, line 189
def delay_with_caller(caller_action, action_class, delay_options, *args)
  raise 'No action_class given' if action_class.nil?
  execution_plan = ExecutionPlan.new(self)
  execution_plan.delay(caller_action, action_class, delay_options, *args)
  Scheduled[execution_plan.id]
end
event(execution_plan_id, step_id, event, done = Concurrent.future) click to toggle source
# File lib/dynflow/world.rb, line 216
def event(execution_plan_id, step_id, event, done = Concurrent.future)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event], done, false)
end
execute(execution_plan_id, done = Concurrent.future) click to toggle source

@return [Concurrent::Edge::Future] containing execution_plan when finished raises when ExecutionPlan is not accepted for execution

# File lib/dynflow/world.rb, line 212
def execute(execution_plan_id, done = Concurrent.future)
  publish_request(Dispatcher::Execution[execution_plan_id], done, true)
end
get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent.future) click to toggle source
# File lib/dynflow/world.rb, line 228
def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent.future)
  publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
end
logger() click to toggle source
# File lib/dynflow/world.rb, line 103
def logger
  logger_adapter.dynflow_logger
end
ping(world_id, timeout, done = Concurrent.future) click to toggle source
# File lib/dynflow/world.rb, line 220
def ping(world_id, timeout, done = Concurrent.future)
  publish_request(Dispatcher::Ping[world_id, true], done, false, timeout)
end
ping_without_cache(world_id, timeout, done = Concurrent.future) click to toggle source
# File lib/dynflow/world.rb, line 224
def ping_without_cache(world_id, timeout, done = Concurrent.future)
  publish_request(Dispatcher::Ping[world_id, false], done, false, timeout)
end
plan(action_class, *args) click to toggle source
# File lib/dynflow/world.rb, line 196
def plan(action_class, *args)
  ExecutionPlan.new(self).tap do |execution_plan|
    execution_plan.prepare(action_class)
    execution_plan.plan(*args)
  end
end
plan_with_caller(caller_action, action_class, *args) click to toggle source
# File lib/dynflow/world.rb, line 203
def plan_with_caller(caller_action, action_class, *args)
  ExecutionPlan.new(self).tap do |execution_plan|
    execution_plan.prepare(action_class, caller_action: caller_action)
    execution_plan.plan(*args)
  end
end
post_initialization() click to toggle source

performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when #auto_validity_check if false, as it should be called after `perform_validity_checks` method)

# File lib/dynflow/world.rb, line 69
def post_initialization
  @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock)
  @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock)
  update_register
  @delayed_executor.start if @delayed_executor && !@delayed_executor.started?
  self.auto_execute if @config.auto_execute
end
publish_request(request, done, wait_for_accepted, timeout = nil) click to toggle source
# File lib/dynflow/world.rb, line 232
def publish_request(request, done, wait_for_accepted, timeout = nil)
  accepted = Concurrent.future
  accepted.rescue do |reason|
    done.fail reason if reason
  end
  client_dispatcher.ask([:publish_request, done, request, timeout], accepted)
  accepted.wait if wait_for_accepted
  done
rescue => e
  accepted.fail e
end
registered_world() click to toggle source
# File lib/dynflow/world.rb, line 95
def registered_world
  if executor
    Coordinator::ExecutorWorld.new(self)
  else
    Coordinator::ClientWorld.new(self)
  end
end
reload!() click to toggle source

reload actions classes, intended only for devel

# File lib/dynflow/world.rb, line 116
def reload!
  # TODO what happens with newly loaded classes
  @action_classes = @action_classes.map do |klass|
    begin
      Utils.constantize(klass.to_s)
    rescue NameError
      nil # ignore missing classes
    end
  end.compact
  middleware.clear_cache!
  calculate_subscription_index
end
subscribed_actions(action_class) click to toggle source
# File lib/dynflow/world.rb, line 111
def subscribed_actions(action_class)
  @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : []
end
terminate(future = Concurrent.future) click to toggle source
# File lib/dynflow/world.rb, line 244
def terminate(future = Concurrent.future)
  start_termination.tangle(future)
  future
end
terminating?() click to toggle source
# File lib/dynflow/world.rb, line 249
def terminating?
  defined?(@terminating)
end
trigger(action_class = nil, *args, &block) click to toggle source

@return [TriggerResult] blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block

# File lib/dynflow/world.rb, line 168
def trigger(action_class = nil, *args, &block)
  if action_class.nil?
    raise 'Neither action_class nor a block given' if block.nil?
    execution_plan = block.call(self)
  else
    execution_plan = plan(action_class, *args)
  end
  planned = execution_plan.state == :planned

  if planned
    done = execute(execution_plan.id, Concurrent.future)
    Triggered[execution_plan.id, done]
  else
    PlaningFailed[execution_plan.id, execution_plan.errors.first.exception]
  end
end
try_spawn(what, lock_class = nil) click to toggle source
# File lib/dynflow/world.rb, line 271
def try_spawn(what, lock_class = nil)
  object = nil
  return nil if !executor || (object = @config.public_send(what)).nil?

  coordinator.acquire(lock_class.new(self)) if lock_class
  object.spawn.wait
  object
rescue Coordinator::LockError => e
  nil
end
update_register() click to toggle source
# File lib/dynflow/world.rb, line 81
def update_register
  @meta                     ||= @config.meta
  @meta['queues']           = @config.queues if @executor
  @meta['delayed_executor'] = true if @delayed_executor
  @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner
  @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time
  if @already_registered
    coordinator.update_record(registered_world)
  else
    coordinator.register_world(registered_world)
    @already_registered = true
  end
end

Private Instance Methods

calculate_subscription_index() click to toggle source
# File lib/dynflow/world.rb, line 339
def calculate_subscription_index
  @subscription_index =
      action_classes.each_with_object(Hash.new { |h, k| h[k] = [] }) do |klass, index|
        next unless klass.subscribe
        Array(klass.subscribe).each do |subscribed_class|
          index[Utils.constantize(subscribed_class.to_s)] << klass
        end
      end.tap { |o| o.freeze }
end
run_before_termination_hooks() click to toggle source
# File lib/dynflow/world.rb, line 349
def run_before_termination_hooks
  until @before_termination_hooks.empty?
    hook_run = Concurrent.future do
      begin
        @before_termination_hooks.pop.call
      rescue => e
        logger.error e
      end
    end
    logger.error "timeout running before_termination_hook" unless hook_run.wait(termination_timeout)
  end
end
spawn_and_wait(klass, name, *args) click to toggle source
# File lib/dynflow/world.rb, line 362
def spawn_and_wait(klass, name, *args)
  initialized = Concurrent.future
  actor = klass.spawn(name: name, args: args, initialized: initialized)
  initialized.wait
  return actor
end
start_termination() click to toggle source
# File lib/dynflow/world.rb, line 284
def start_termination
  @termination_barrier.synchronize do
    return @terminating if @terminating
    termination_future ||= Concurrent.future do
      begin
        run_before_termination_hooks

        if delayed_executor
          logger.info "start terminating delayed_executor..."
          delayed_executor.terminate.wait(termination_timeout)
        end

        logger.info "start terminating throttle_limiter..."
        throttle_limiter.terminate.wait(termination_timeout)

        if executor
          connector.stop_receiving_new_work(self, termination_timeout)

          logger.info "start terminating executor..."
          executor.terminate.wait(termination_timeout)

          logger.info "start terminating executor dispatcher..."
          executor_dispatcher_terminated = Concurrent.future
          executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated])
          executor_dispatcher_terminated.wait(termination_timeout)
        end

        logger.info "start terminating client dispatcher..."
        client_dispatcher_terminated = Concurrent.future
        client_dispatcher.ask([:start_termination, client_dispatcher_terminated])
        client_dispatcher_terminated.wait(termination_timeout)

        logger.info "stop listening for new events..."
        connector.stop_listening(self, termination_timeout)

        if @clock
          logger.info "start terminating clock..."
          clock.ask(:terminate!).wait(termination_timeout)
        end

        coordinator.delete_world(registered_world)
        @terminated.complete
        true
      rescue => e
        logger.fatal(e)
      end
    end
    @terminating = Concurrent.future do
      termination_future.wait(termination_timeout)
    end.on_completion do
      Thread.new { Kernel.exit } if @exit_on_terminate.true?
    end
  end
end