mirror of
https://gitlab.com/gitlab-org/gitlab-foss.git
synced 2025-07-25 16:03:48 +00:00
139 lines
4.5 KiB
Ruby
139 lines
4.5 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Gitlab
|
|
module BitbucketImport
|
|
module ParallelScheduling
|
|
include Loggable
|
|
include ErrorTracking
|
|
|
|
attr_reader :project, :already_enqueued_cache_key, :job_waiter_cache_key, :job_waiter_remaining_cache_key,
|
|
:page_keyset
|
|
|
|
# The base cache key to use for tracking already enqueued objects.
|
|
ALREADY_ENQUEUED_CACHE_KEY =
|
|
'bitbucket-importer/already-enqueued/%{project}/%{collection}'
|
|
|
|
# The base cache key to use for storing job waiter key
|
|
JOB_WAITER_CACHE_KEY =
|
|
'bitbucket-importer/job-waiter/%{project}/%{collection}'
|
|
|
|
# The base cache key to use for storing job waiter remaining jobs
|
|
JOB_WAITER_REMAINING_CACHE_KEY =
|
|
'bitbucket-importer/job-waiter-remaining/%{project}/%{collection}'
|
|
|
|
# project - An instance of `Project`.
|
|
def initialize(project)
|
|
@project = project
|
|
|
|
@already_enqueued_cache_key =
|
|
format(ALREADY_ENQUEUED_CACHE_KEY, project: project.id, collection: collection_method)
|
|
@job_waiter_cache_key =
|
|
format(JOB_WAITER_CACHE_KEY, project: project.id, collection: collection_method)
|
|
@job_waiter_remaining_cache_key = format(JOB_WAITER_REMAINING_CACHE_KEY, project: project.id,
|
|
collection: collection_method)
|
|
@page_keyset = Gitlab::Import::PageKeyset.new(project, collection_method, 'bitbucket-importer')
|
|
end
|
|
|
|
# The method that will be called for traversing through all the objects to
|
|
# import, yielding them to the supplied block.
|
|
def each_object_to_import
|
|
repo = project.import_source
|
|
|
|
options = collection_options.merge(next_url: page_keyset.current)
|
|
|
|
client.each_page(collection_method, representation_type, repo, options) do |page|
|
|
page.items.each do |object|
|
|
job_waiter.jobs_remaining = Gitlab::Cache::Import::Caching.increment(job_waiter_remaining_cache_key)
|
|
|
|
object = object.to_hash
|
|
|
|
next if already_enqueued?(object)
|
|
|
|
yield object
|
|
|
|
# We mark the object as imported immediately so we don't end up
|
|
# scheduling it multiple times.
|
|
mark_as_enqueued(object)
|
|
end
|
|
|
|
page_keyset.set(page.next) if page.next?
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
# Any options to be passed to the method used for retrieving the data to
|
|
# import.
|
|
def collection_options
|
|
{}
|
|
end
|
|
|
|
def client
|
|
@client ||= Bitbucket::Client.new(project.import_data.credentials)
|
|
end
|
|
|
|
# Returns the ID to use for the cache used for checking if an object has
|
|
# already been enqueued or not.
|
|
#
|
|
# object - The object we may want to import.
|
|
def id_for_already_enqueued_cache(object)
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The Sidekiq worker class used for scheduling the importing of objects in
|
|
# parallel.
|
|
def sidekiq_worker_class
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The name of the method to call to retrieve the data to import.
|
|
def collection_method
|
|
raise NotImplementedError
|
|
end
|
|
|
|
# The name of the method to call to retrieve the representation object
|
|
def representation_type
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def job_waiter
|
|
@job_waiter ||= begin
|
|
key = Gitlab::Cache::Import::Caching.read(job_waiter_cache_key)
|
|
key ||= Gitlab::Cache::Import::Caching.write(job_waiter_cache_key, JobWaiter.generate_key)
|
|
jobs_remaining = Gitlab::Cache::Import::Caching.read(job_waiter_remaining_cache_key).to_i || 0
|
|
|
|
JobWaiter.new(jobs_remaining, key)
|
|
end
|
|
end
|
|
|
|
def already_enqueued?(object)
|
|
id = id_for_already_enqueued_cache(object)
|
|
|
|
Gitlab::Cache::Import::Caching.set_includes?(already_enqueued_cache_key, id)
|
|
end
|
|
|
|
# Marks the given object as "already enqueued".
|
|
def mark_as_enqueued(object)
|
|
id = id_for_already_enqueued_cache(object)
|
|
|
|
Gitlab::Cache::Import::Caching.set_add(already_enqueued_cache_key, id)
|
|
end
|
|
|
|
def calculate_job_delay(job_index)
|
|
runtime = Time.current - job_started_at
|
|
multiplier = (job_index / concurrent_import_jobs_limit.to_f)
|
|
|
|
(multiplier * 1.minute) + 1.second - runtime
|
|
end
|
|
|
|
def job_started_at
|
|
@job_started_at ||= Time.current
|
|
end
|
|
|
|
def concurrent_import_jobs_limit
|
|
Gitlab::CurrentSettings.concurrent_bitbucket_import_jobs_limit
|
|
end
|
|
end
|
|
end
|
|
end
|