mirror of
https://github.com/gitlabhq/gitlabhq.git
synced 2025-08-06 11:10:08 +00:00
104 lines
3.3 KiB
Ruby
104 lines
3.3 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module ConcurrencyLimit
|
|
class ResumeWorker
|
|
include ApplicationWorker
|
|
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- There is no onward scheduling and this cron handles work from across the
|
|
# application, so there's no useful context to add.
|
|
|
|
BATCH_SIZE = 5_000
|
|
RESCHEDULE_DELAY = 1.second
|
|
|
|
feature_category :scalability
|
|
data_consistency :sticky
|
|
idempotent!
|
|
urgency :low
|
|
|
|
def perform(worker_name = nil)
|
|
if worker_name
|
|
process_worker(worker_name)
|
|
else
|
|
schedule_workers
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def schedule_workers
|
|
workers.each do |worker|
|
|
limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)
|
|
queue_size = queue_size(worker)
|
|
report_prometheus_metrics(worker, queue_size, limit)
|
|
|
|
next unless queue_size > 0
|
|
next if limit < 0 # do not re-queue jobs if circuit-broken
|
|
|
|
self.class.perform_async(worker.name)
|
|
end
|
|
end
|
|
|
|
def process_worker(worker_name)
|
|
worker = worker_name.safe_constantize
|
|
return unless worker
|
|
|
|
limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker)
|
|
queue_size = queue_size(worker)
|
|
current = concurrent_worker_count(worker)
|
|
|
|
return unless queue_size > 0
|
|
return if limit < 0 # do not re-queue jobs if circuit-broken
|
|
|
|
Gitlab::SidekiqLogging::ConcurrencyLimitLogger.instance.worker_stats_log(
|
|
worker.name, limit, queue_size, current
|
|
)
|
|
|
|
processing_limit = if limit > 0
|
|
limit - current
|
|
else
|
|
BATCH_SIZE
|
|
end
|
|
|
|
return unless processing_limit > 0
|
|
|
|
resume_processing!(worker, limit: processing_limit)
|
|
cleanup_stale_trackers(worker)
|
|
|
|
queue_remaining_count = queue_size - processing_limit
|
|
self.class.perform_in(RESCHEDULE_DELAY, worker_name) if queue_remaining_count > 0
|
|
end
|
|
|
|
def concurrent_worker_count(worker)
|
|
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.concurrent_worker_count(worker.name)
|
|
end
|
|
|
|
def queue_size(worker)
|
|
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.queue_size(worker.name)
|
|
end
|
|
|
|
def cleanup_stale_trackers(worker)
|
|
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.cleanup_stale_trackers(worker.name)
|
|
end
|
|
|
|
def resume_processing!(worker, limit:)
|
|
Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.resume_processing!(worker.name, limit: limit)
|
|
end
|
|
|
|
def workers
|
|
Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.workers
|
|
end
|
|
|
|
def report_prometheus_metrics(worker, queue_size, limit)
|
|
queue_size_metric = Gitlab::Metrics.gauge(:sidekiq_concurrency_limit_queue_jobs,
|
|
'Number of jobs queued by the concurrency limit middleware.',
|
|
{},
|
|
:max)
|
|
queue_size_metric.set({ worker: worker.name }, queue_size)
|
|
|
|
limit_metric = Gitlab::Metrics.gauge(:sidekiq_concurrency_limit_max_concurrent_jobs,
|
|
'Max number of concurrent running jobs.',
|
|
{})
|
|
limit_metric.set({ worker: worker.name }, limit || BATCH_SIZE)
|
|
end
|
|
end
|
|
end
|