class Dynflow::Executors::Sidekiq::Core
Constants
- TELEMETRY_UPDATE_INTERVAL
Attributes
logger[R]
Public Class Methods
new(world, *_args)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core.new
# File lib/dynflow/executors/sidekiq/core.rb, line 25 def initialize(world, *_args) @world = world @logger = world.logger wait_for_orchestrator_lock super schedule_update_telemetry begin_startup! end
Public Instance Methods
begin_startup!()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 78 def begin_startup! WorkerJobs::DrainMarker.perform_async(@world.id) @recovery = true end
execution_status(execution_plan_id = nil)
click to toggle source
TODO: needs thoughs on how to implement it
# File lib/dynflow/executors/sidekiq/core.rb, line 46 def execution_status(execution_plan_id = nil) {} end
feed_pool(work_items)
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 50 def feed_pool(work_items) work_items.each do |new_work| WorkerJobs::PerformWork.set(queue: suggest_queue(new_work)).perform_async(new_work) end end
heartbeat()
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#heartbeat
# File lib/dynflow/executors/sidekiq/core.rb, line 34 def heartbeat super reacquire_orchestrator_lock end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#start_termination
# File lib/dynflow/executors/sidekiq/core.rb, line 39 def start_termination(*args) super release_orchestrator_lock finish_termination end
startup_complete()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 83 def startup_complete logger.info('Performing validity checks') @world.perform_validity_checks logger.info('Finished performing validity checks') @recovery = false end
update_telemetry()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 56 def update_telemetry sidekiq_queues = ::Sidekiq::Stats.new.queues @queues_options.keys.each do |queue| queue_size = sidekiq_queues[queue.to_s] if queue_size Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, queue_size, telemetry_options(queue)) } end end schedule_update_telemetry end
work_finished(work, delayed_events = nil)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#work_finished
# File lib/dynflow/executors/sidekiq/core.rb, line 67 def work_finished(work, delayed_events = nil) # If the work item is sent in reply to a request from the current orchestrator, proceed if work.sender_orchestrator_id == @world.id super else # If we're in recovery, we can drop the work as the execution plan will be resumed during validity checks performed when leaving recovery # If we're not in recovery and receive an event from another orchestrator, it means it survived the queue draining. handle_unknown_work_item(work) unless @recovery end end
Private Instance Methods
fallback_queue()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 92 def fallback_queue :default end
handle_unknown_work_item(work)
click to toggle source
We take a look if an execution lock is already being held by an orchestrator (it should be the current one). If no lock is held we try to resume the execution plan if possible
# File lib/dynflow/executors/sidekiq/core.rb, line 106 def handle_unknown_work_item(work) # We are past recovery now, if we receive an event here, the execution plan will be most likely paused # We can either try to rescue it or turn it over to stopped execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name, id: "execution-plan:#{work.execution_plan_id}").first if execution_lock.nil? plan = @world.persistence.load_execution_plan(work.execution_plan_id) should_resume = !plan.error? || plan.prepare_for_rescue == :running @world.execute(plan.id) if should_resume end end
schedule_update_telemetry()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 96 def schedule_update_telemetry @world.clock.ping(reference, TELEMETRY_UPDATE_INTERVAL, [:update_telemetry]) end
telemetry_options(queue)
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 100 def telemetry_options(queue) { queue: queue.to_s, world: @world.id } end