class Dynflow::World

Constants

TriggerResult

Attributes

action_classes[R]
auto_rescue[R]
auto_validity_check[R]
client_dispatcher[R]
clock[R]
config[R]
connector[R]
coordinator[R]
dead_letter_handler[R]
delayed_executor[R]
execution_plan_cleaner[R]
executor[R]
executor_dispatcher[R]
id[R]
logger_adapter[R]
meta[R]
middleware[R]
persistence[R]
subscription_index[R]
terminated[R]
termination_timeout[R]
throttle_limiter[R]
transaction_adapter[R]
validity_check_timeout[R]

Public Class Methods

new(config) click to toggle source
# File lib/dynflow/world.rb, line 16
def initialize(config)
  @config = Config::ForWorld.new(config, self)

  # Set the telemetry instance as soon as possible
  Dynflow::Telemetry.set_adapter @config.telemetry_adapter
  Dynflow::Telemetry.register_metrics!

  @id                     = SecureRandom.uuid
  @logger_adapter         = @config.logger_adapter
  @clock                  = spawn_and_wait(Clock, 'clock', logger)
  @config.validate
  @transaction_adapter    = @config.transaction_adapter
  @persistence            = Persistence.new(self, @config.persistence_adapter,
                                            :backup_deleted_plans => @config.backup_deleted_plans,
                                            :backup_dir => @config.backup_dir)
  @coordinator            = Coordinator.new(@config.coordinator_adapter)
  if @config.executor
    @executor = Executors::Parallel.new(self,
                                        executor_class: @config.executor,
                                        heartbeat_interval: @config.executor_heartbeat_interval,
                                        queues_options: @config.queues)
  end
  @action_classes         = @config.action_classes
  @auto_rescue            = @config.auto_rescue
  @exit_on_terminate      = Concurrent::AtomicBoolean.new(@config.exit_on_terminate)
  @connector              = @config.connector
  @middleware             = Middleware::World.new
  @middleware.use Middleware::Common::Transaction if @transaction_adapter
  @client_dispatcher      = spawn_and_wait(Dispatcher::ClientDispatcher, "client-dispatcher", self, @config.ping_cache_age)
  @dead_letter_handler    = spawn_and_wait(DeadLetterSilencer, 'default_dead_letter_handler', @config.silent_dead_letter_matchers)
  @auto_validity_check    = @config.auto_validity_check
  @validity_check_timeout = @config.validity_check_timeout
  @throttle_limiter       = @config.throttle_limiter
  @terminated             = Concurrent::Promises.resolvable_event
  @termination_timeout    = @config.termination_timeout
  calculate_subscription_index

  if executor
    @executor_dispatcher = spawn_and_wait(Dispatcher::ExecutorDispatcher, "executor-dispatcher", self, @config.executor_semaphore)
    executor.initialized.wait
  end
  update_register
  perform_validity_checks if auto_validity_check

  @termination_barrier = Mutex.new
  @before_termination_hooks = Queue.new

  if @config.auto_terminate
    at_exit do
      @exit_on_terminate.make_false # make sure we don't terminate twice
      self.terminate.wait
    end
  end
  post_initialization
end

Public Instance Methods

action_logger() click to toggle source
# File lib/dynflow/world.rb, line 113
def action_logger
  logger_adapter.action_logger
end
auto_execute() click to toggle source

24119 - ensure delayed executor is preserved after invalidation executes plans that are planned/paused and haven't reported any error yet (usually when no executor was available by the time of planning or terminating)

# File lib/dynflow/world.rb, line 265
def auto_execute
  coordinator.acquire(Coordinator::AutoExecuteLock.new(self)) do
    planned_execution_plans =
        self.persistence.find_execution_plans filters: { 'state' => %w(planned paused), 'result' => (ExecutionPlan.results - [:error]).map(&:to_s) }
    planned_execution_plans.map do |ep|
      if coordinator.find_locks(Dynflow::Coordinator::ExecutionLock.unique_filter(ep.id)).empty?
        execute(ep.id)
      end
    end.compact
  end
rescue Coordinator::LockError => e
  logger.info "auto-executor lock already aquired: #{e.message}"
  []
end
before_termination(&block) click to toggle source
# File lib/dynflow/world.rb, line 83
def before_termination(&block)
  @before_termination_hooks << block
end
delay(action_class, delay_options, *args) click to toggle source
# File lib/dynflow/world.rb, line 191
def delay(action_class, delay_options, *args)
  delay_with_options(action_class: action_class, args: args, delay_options: delay_options)
end
delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil) click to toggle source
# File lib/dynflow/world.rb, line 195
def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_action: nil)
  raise 'No action_class given' if action_class.nil?
  execution_plan = ExecutionPlan.new(self, id)
  execution_plan.delay(caller_action, action_class, delay_options, *args)
  Scheduled[execution_plan.id]
end
event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future) click to toggle source
# File lib/dynflow/world.rb, line 221
def event(execution_plan_id, step_id, event, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event], done, false)
end
execute(execution_plan_id, done = Concurrent::Promises.resolvable_future) click to toggle source

@return [Concurrent::Promises::ResolvableFuture] containing execution_plan when finished raises when ExecutionPlan is not accepted for execution

# File lib/dynflow/world.rb, line 217
def execute(execution_plan_id, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Execution[execution_plan_id], done, true)
end
get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future) click to toggle source
# File lib/dynflow/world.rb, line 237
def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
end
logger() click to toggle source
# File lib/dynflow/world.rb, line 109
def logger
  logger_adapter.dynflow_logger
end
ping(world_id, timeout, done = Concurrent::Promises.resolvable_future) click to toggle source
# File lib/dynflow/world.rb, line 229
def ping(world_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Ping[world_id, true], done, false, timeout)
end
ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future) click to toggle source
# File lib/dynflow/world.rb, line 233
def ping_without_cache(world_id, timeout, done = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Ping[world_id, false], done, false, timeout)
end
plan(action_class, *args) click to toggle source
# File lib/dynflow/world.rb, line 202
def plan(action_class, *args)
  plan_with_options(action_class: action_class, args: args)
end
plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future) click to toggle source
# File lib/dynflow/world.rb, line 225
def plan_event(execution_plan_id, step_id, event, time, accepted = Concurrent::Promises.resolvable_future)
  publish_request(Dispatcher::Event[execution_plan_id, step_id, event, time], accepted, false)
end
plan_with_options(action_class:, args:, id: nil, caller_action: nil) click to toggle source
# File lib/dynflow/world.rb, line 206
def plan_with_options(action_class:, args:, id: nil, caller_action: nil)
  ExecutionPlan.new(self, id).tap do |execution_plan|
    coordinator.acquire(Coordinator::PlanningLock.new(self, execution_plan.id)) do
      execution_plan.prepare(action_class, caller_action: caller_action)
      execution_plan.plan(*args)
    end
  end
end
post_initialization() click to toggle source

performs steps once the executor is ready and invalidation of previous worls is finished. Needs to be indempotent, as it can be called several times (expecially when auto_validity_check if false, as it should be called after `perform_validity_checks` method)

# File lib/dynflow/world.rb, line 75
def post_initialization
  @delayed_executor ||= try_spawn(:delayed_executor, Coordinator::DelayedExecutorLock)
  @execution_plan_cleaner ||= try_spawn(:execution_plan_cleaner, Coordinator::ExecutionPlanCleanerLock)
  update_register
  @delayed_executor.start if @delayed_executor && !@delayed_executor.started?
  self.auto_execute if @config.auto_execute
end
publish_request(request, done, wait_for_accepted, timeout = nil) click to toggle source
# File lib/dynflow/world.rb, line 241
def publish_request(request, done, wait_for_accepted, timeout = nil)
  accepted = Concurrent::Promises.resolvable_future
  accepted.rescue do |reason|
    done.reject reason if reason
  end
  client_dispatcher.ask([:publish_request, done, request, timeout], accepted)
  accepted.wait if wait_for_accepted
  done
rescue => e
  accepted.reject e
end
registered_world() click to toggle source
# File lib/dynflow/world.rb, line 101
def registered_world
  if executor
    Coordinator::ExecutorWorld.new(self)
  else
    Coordinator::ClientWorld.new(self)
  end
end
reload!() click to toggle source

reload actions classes, intended only for devel

# File lib/dynflow/world.rb, line 122
def reload!
  # TODO what happens with newly loaded classes
  @action_classes = @action_classes.map do |klass|
    begin
      Utils.constantize(klass.to_s)
    rescue NameError
      nil # ignore missing classes
    end
  end.compact
  middleware.clear_cache!
  calculate_subscription_index
end
subscribed_actions(action_class) click to toggle source
# File lib/dynflow/world.rb, line 117
def subscribed_actions(action_class)
  @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : []
end
terminate(future = Concurrent::Promises.resolvable_future) click to toggle source
# File lib/dynflow/world.rb, line 253
def terminate(future = Concurrent::Promises.resolvable_future)
  start_termination.tangle(future)
  future
end
terminating?() click to toggle source
# File lib/dynflow/world.rb, line 258
def terminating?
  defined?(@terminating)
end
trigger(action_class = nil, *args, &block) click to toggle source

@return [TriggerResult] blocks until action_class is planned if no arguments given, the plan is expected to be returned by a block

# File lib/dynflow/world.rb, line 174
def trigger(action_class = nil, *args, &block)
  if action_class.nil?
    raise 'Neither action_class nor a block given' if block.nil?
    execution_plan = block.call(self)
  else
    execution_plan = plan(action_class, *args)
  end
  planned = execution_plan.state == :planned

  if planned
    done = execute(execution_plan.id, Concurrent::Promises.resolvable_future)
    Triggered[execution_plan.id, done]
  else
    PlaningFailed[execution_plan.id, execution_plan.errors.first.exception]
  end
end
try_spawn(what, lock_class = nil) click to toggle source
# File lib/dynflow/world.rb, line 280
def try_spawn(what, lock_class = nil)
  object = nil
  return nil if !executor || (object = @config.public_send(what)).nil?

  coordinator.acquire(lock_class.new(self)) if lock_class
  object.spawn.wait
  object
rescue Coordinator::LockError => e
  nil
end
update_register() click to toggle source
# File lib/dynflow/world.rb, line 87
def update_register
  @meta                     ||= @config.meta
  @meta['queues']           = @config.queues if @executor
  @meta['delayed_executor'] = true if @delayed_executor
  @meta['execution_plan_cleaner'] = true if @execution_plan_cleaner
  @meta['last_seen'] = Dynflow::Dispatcher::ClientDispatcher::PingCache.format_time
  if @already_registered
    coordinator.update_record(registered_world)
  else
    coordinator.register_world(registered_world)
    @already_registered = true
  end
end

Private Instance Methods

calculate_subscription_index() click to toggle source
# File lib/dynflow/world.rb, line 349
def calculate_subscription_index
  @subscription_index =
      action_classes.each_with_object(Hash.new { |h, k| h[k] = [] }) do |klass, index|
        next unless klass.subscribe
        Array(klass.subscribe).each do |subscribed_class|
          index[Utils.constantize(subscribed_class.to_s)] << klass
        end
      end.tap { |o| o.freeze }
end
run_before_termination_hooks() click to toggle source
# File lib/dynflow/world.rb, line 359
def run_before_termination_hooks
  until @before_termination_hooks.empty?
    hook_run = Concurrent::Promises.future do
      begin
        @before_termination_hooks.pop.call
      rescue => e
        logger.error e
      end
    end
    logger.error "timeout running before_termination_hook" unless hook_run.wait(termination_timeout)
  end
end
spawn_and_wait(klass, name, *args) click to toggle source
# File lib/dynflow/world.rb, line 372
def spawn_and_wait(klass, name, *args)
  initialized = Concurrent::Promises.resolvable_future
  actor = klass.spawn(name: name, args: args, initialized: initialized)
  initialized.wait
  return actor
end
start_termination() click to toggle source
# File lib/dynflow/world.rb, line 293
def start_termination
  @termination_barrier.synchronize do
    return @terminating if @terminating
    termination_future ||= Concurrent::Promises.future do
      begin
        run_before_termination_hooks

        if delayed_executor
          logger.info "start terminating delayed_executor..."
          delayed_executor.terminate.wait(termination_timeout)
        end

        logger.info "start terminating throttle_limiter..."
        throttle_limiter.terminate.wait(termination_timeout)

        if executor
          connector.stop_receiving_new_work(self, termination_timeout)

          logger.info "start terminating executor..."
          executor.terminate.wait(termination_timeout)

          logger.info "start terminating executor dispatcher..."
          executor_dispatcher_terminated = Concurrent::Promises.resolvable_future
          executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated])
          executor_dispatcher_terminated.wait(termination_timeout)
        end

        logger.info "start terminating client dispatcher..."
        client_dispatcher_terminated = Concurrent::Promises.resolvable_future
        client_dispatcher.ask([:start_termination, client_dispatcher_terminated])
        client_dispatcher_terminated.wait(termination_timeout)

        logger.info "stop listening for new events..."
        connector.stop_listening(self, termination_timeout)

        if @clock
          logger.info "start terminating clock..."
          clock.ask(:terminate!).wait(termination_timeout)
        end

        coordinator.delete_world(registered_world)
        @terminated.resolve
        true
      rescue => e
        logger.fatal(e)
      end
    end
    @terminating = Concurrent::Promises.future do
      termination_future.wait(termination_timeout)
    end.on_resolution do
      @terminated.resolve
      Thread.new { Kernel.exit } if @exit_on_terminate.true?
    end
  end
end