class Dynflow::Executors::Sidekiq::Core
Constants
- TELEMETRY_UPDATE_INTERVAL
Attributes
logger[R]
Public Class Methods
new(world, *_args)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core::new
# File lib/dynflow/executors/sidekiq/core.rb, line 26 def initialize(world, *_args) @world = world @logger = world.logger wait_for_orchestrator_lock super schedule_update_telemetry begin_startup! end
Public Instance Methods
begin_startup!()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 79 def begin_startup! WorkerJobs::DrainMarker.perform_async(@world.id) @recovery = true end
execution_status(execution_plan_id = nil)
click to toggle source
TODO: needs thoughs on how to implement it
# File lib/dynflow/executors/sidekiq/core.rb, line 47 def execution_status(execution_plan_id = nil) {} end
feed_pool(work_items)
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 51 def feed_pool(work_items) work_items.each do |new_work| WorkerJobs::PerformWork.set(queue: suggest_queue(new_work)).perform_async(new_work) end end
heartbeat()
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#heartbeat
# File lib/dynflow/executors/sidekiq/core.rb, line 35 def heartbeat super reacquire_orchestrator_lock end
start_termination(*args)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#start_termination
# File lib/dynflow/executors/sidekiq/core.rb, line 40 def start_termination(*args) super release_orchestrator_lock finish_termination end
startup_complete()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 84 def startup_complete logger.info('Performing validity checks') @world.perform_validity_checks logger.info('Finished performing validity checks') @recovery = false end
update_telemetry()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 57 def update_telemetry sidekiq_queues = ::Sidekiq::Stats.new.queues @queues_options.keys.each do |queue| queue_size = sidekiq_queues[queue.to_s] if queue_size Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, queue_size, telemetry_options(queue)) } end end schedule_update_telemetry end
work_finished(work, delayed_events = nil)
click to toggle source
Calls superclass method
Dynflow::Executors::Abstract::Core#work_finished
# File lib/dynflow/executors/sidekiq/core.rb, line 68 def work_finished(work, delayed_events = nil) # If the work item is sent in reply to a request from the current orchestrator, proceed if work.sender_orchestrator_id == @world.id super else # If we're in recovery, we can drop the work as the execution plan will be resumed during validity checks performed when leaving recovery # If we're not in recovery and receive an event from another orchestrator, it means it survived the queue draining. handle_unknown_work_item(work) unless @recovery end end
Private Instance Methods
fallback_queue()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 93 def fallback_queue :default end
handle_unknown_work_item(work)
click to toggle source
We take a look if an execution lock is already being held by an orchestrator (it should be the current one). If no lock is held we try to resume the execution plan if possible
# File lib/dynflow/executors/sidekiq/core.rb, line 107 def handle_unknown_work_item(work) # We are past recovery now, if we receive an event here, the execution plan will be most likely paused # We can either try to rescue it or turn it over to stopped execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name, id: "execution-plan:#{work.execution_plan_id}").first if execution_lock.nil? plan = @world.persistence.load_execution_plan(work.execution_plan_id) should_resume = !plan.error? || plan.prepare_for_rescue == :running @world.execute(plan.id) if should_resume end end
schedule_update_telemetry()
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 97 def schedule_update_telemetry @world.clock.ping(reference, TELEMETRY_UPDATE_INTERVAL, [:update_telemetry]) end
telemetry_options(queue)
click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 101 def telemetry_options(queue) { queue: queue.to_s, world: @world.id } end