12 KiB
Usage
Creating a migration
Migrations are similiar to database migrations: they create collections, update schemas, run backfills, etc.
See migrations for more details.
A migration worker applies migrations for the active connection. See Migrations.
If you want to run the worker manually, execute:
Ai::ActiveContext::MigrationWorker.new.perform
Registering a queue
Queues keep track of items needing to be processed in bulk asynchronously. A queue definition has a unique key which registers queues based on the number of shards defined. Each shard creates a queue.
To create a new queue: add a file, extend ActiveContext::Concerns::Queue
and define number_of_shards
:
# frozen_string_literal: true
module Ai
module Context
module Queues
class MergeRequest
class << self
def number_of_shards
2
end
end
include ActiveContext::Concerns::Queue
end
end
end
end
To access the unique queues:
ActiveContext.queues
=> #<Set: {"ai_context_queues:{merge_request}"}>
To view sharded queues:
ActiveContext.raw_queues
=> [#<Ai::Context::Queues::MergeRequest:0x0000000177cdf460 @shard=0>,
#<Ai::Context::Queues::MergeRequest:0x0000000177cdf370 @shard=1>]
Adding a new reference type
Create a class under lib/active_context/references/
and inherit from the Reference
class and define the following methods:
Class methods required:
serialize_data
: defines a string representation of the reference object
Instance methods required:
init
: reads fromserialized_args
as_indexed_json
oras_indexed_jsons
: a hash or array of hashes containing the data representation of the objectoperation
: determines the operation which can be one ofupsert
,update
ordelete
. See operation types for more details.identifier
: unique identifier
Optional methods:
unique_identifiers
: array of identifiers to build a unique identifier for every document. For example,[identifier, branch_name]
. Defaults to[identifier]
Preprocessors
Existing preprocessors are
Preload
: preloads from the database to prevent N+1 queriesChunking
: splits content into chunks and assigns them toref.documents
Embeddings
: generates embeddings for every document in bulk
Preload
Requires model_klass
and model_klass
to define preload_indexing_data
.
add_preprocessor :preload do |refs|
preload(refs)
end
Chunking
Requires passing chunker
instance, chunk_on
method to define the content to chunk on and the field
to assign the content to.
add_preprocessor :chunking do |refs|
chunker = Chunkers::BySize.new(chunk_size: 1000, overlap: 20)
chunk(refs: refs, chunker: chunker, chunk_on: :title_and_description, field: :content)
end
def title_and_description
"Title: #{database_record.title}\n\nDescription: #{database_record.description}"
end
Chunkers use the ::ActiveContext::Concerns::Chunker
concern and should define a chunks
method. The only existing chunker is BySize
.
Embeddings
Generates embeddings either by specifying a content method or by specifying a content field on existing documents.
When documents with a populated content field already exists:
add_preprocessor :embeddings do |refs|
apply_embeddings(refs: refs, content_field: :content)
end
When the ref doesn't have existing documents:
add_preprocessor :embeddings do |refs|
apply_embeddings(refs: refs, content_method: :title_and_description)
end
def title_and_description
"Title: #{database_record.title}\n\nDescription: #{database_record.description}"
end
See how to set initial embedding model and how to migrate from one embedding model to another.
Operation types
upsert
Creates or updates documents, handling cases where a single reference has less documents than before by performing a delete cleanup operation.
The document content can be full or partial json.
update
Updates documents that already exist.
The document content can be full or partial json.
delete
Deletes all documents belonging to a reference.
Examples
Example for a reference reading from a database relation, with preloading and bulk embedding generation:
# frozen_string_literal: true
module Ai
module Context
module References
class MergeRequest < ::ActiveContext::Reference
add_preprocessor :preload do |refs|
preload(refs)
end
add_preprocessor :embeddings do |refs|
apply_embeddings(refs: refs, target_field: :embeddings, content_method: :title_and_description)
end
def self.model_klass
::MergeRequest
end
def self.serialize_data(merge_request)
{ identifier: merge_request.id }
end
attr_accessor :identifier, :embedding
attr_writer :database_record
def init
@identifier, _ = serialized_args
end
def serialized_attributes
[identifier]
end
def title_and_description
"Title: #{database_record.title}\n\nDescription: #{database_record.description}"
end
def shared_attributes
{
iid: database_record.iid,
namespace_id: database_record.project.id,
traversal_ids: database_record.project.elastic_namespace_ancestry
}
end
def model_klass
self.class.model_klass
end
def database_record
@database_record ||= model_klass.find_by_id(identifier)
end
def operation
database_record ? :upsert : :delete
end
end
end
end
end
Example for code embeddings:
# frozen_string_literal: true
module Ai
module Context
module References
class CodeEmbeddings < ::ActiveContext::Reference
add_preprocessor :chunk_full_file_by_size do |refs|
chunker = Chunkers::BySize.new
chunk(refs: refs, chunker: chunker, chunk_on: :blob_content)
end
attr_accessor :project_id, :identifier, :repository, :blob
def init
@project_id, @identifier = serialized_args
@repository = Project.find(project_id).repository
@blob = Gitlab::Git::Blob.raw(repository, identifier)
end
def serialized_attributes
[project_id, identifier]
end
def blob_content
blob.data
end
def operation
blob.data ? :upsert : :delete
end
def shared_attributes
{
project_id: project_id
}
end
end
end
end
end
Adding a new collection
A collection maps data to references and specifies a queue to track its references.
To add a new collection:
- Create a new file in the appropriate directory
- Define a class that
includes ActiveContext::Concerns::Collection
- Implement the
self.queue
class method to return the associated queue - Implement the
self.reference_klass
orself.reference_klasses
class method to return the references for an object - Implement the
self.routing(object)
class method to determine how an object should be routed - Implement the
self.ids_to_objects(ids)
class method to convert ids into objects for redaction.
Example:
# frozen_string_literal: true
module Ai
module Context
module Collections
class MergeRequest
include ActiveContext::Concerns::Collection
def self.collection_name
'gitlab_active_context_merge_requests'
end
def self.queue
Queues::MergeRequest
end
def self.reference_klass
References::MergeRequest
end
def self.routing(object)
object.project.root_ancestor.id
end
def self.ids_to_objects(ids)
::MergeRequest.id_in(ids)
end
end
end
end
end
Adding documents to the vector store
Adding references to the queue can be done a few ways:
The prefered method:
Ai::Context::Collections::MergeRequest.track!(MergeRequest.first)
Ai::Context::Collections::MergeRequest.track!(MergeRequest.take(10))
Passing a collection:
ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest)
Passing a collection and queue:
ActiveContext.track!(MergeRequest.first, collection: Ai::Context::Collections::MergeRequest, queue: Ai::Context::Queues::Default)
Building a reference:
ref = Ai::Context::References::CodeEmbeddings.new(collection_id: collection.id, routing: project.root_ancestor.id, project_id: project.id, identifier: blob.id)
Ai::Context::Collections::CodeEmbeddings.track!(ref)
ref = Ai::Context::References::CodeEmbeddings.new(collection_id: 24, routing: 24, project_id: 1, identifier: "9ab45314044d664a3b8ac1e05777411482bd0564")
Ai::Context::Collections::CodeEmbeddings.track!(ref)
Building a reference and passing a queue:
ref = Ai::Context::References::MergeRequest.new(collection_id: collection.id, routing: project.root_ancestor.id, identifier: 1)
ActiveContext.track!(ref, queue: Ai::Context::Queues::MergeRequest)
To view all tracked references:
ActiveContext::Queues.all_queued_items
Once references are tracked, they will be executed asyncronously. See Async Processing.
To execute all refs from all refs sync, run
ActiveContext.execute_all_queues!
To clear a queue:
Ai::Context::Queues::MergeRequest.clear_tracking!
Synchronising data
The track!
method adds documents to the vector stores and can be called from anywhere: a service, a callback, event, etc.
The ::ActiveContext::Concerns::Syncable
concern can be added to ActiveRecord models to update a collection on callbacks.
For example, we can add the concern to the MergeRequest model to track merge request refs on create, update and destroy:
include ::ActiveContext::Concerns::Syncable
sync_with_active_context on: :create, using: ->(record) { record.track_merge_request! }
sync_with_active_context on: :update, condition: -> { (saved_change_to_title? || saved_change_to_description?) }, using: ->(record) { record.track_merge_request! }
sync_with_active_context on: :destroy, using: ->(record) { record.track_merge_request! }
def track_merge_request!
Ai::Context::Collections::MergeRequest.track!(self)
end
def syncable?
# some condition to determine whether to track an MR record
end
We can also keep merge requests up to date if an associated record is updated using the same approach. Say a merge request document contains project.visibility_level
, we can add the following to the projects model to update its associated merge requests:
include ::ActiveContext::Concerns::Syncable
sync_with_active_context on: :update,
condition: -> { saved_change_to_visibility_level? },
using: ->(project) { Ai::Context::Collections::MergeRequest.track!(project.merge_requests) }
def syncable?
# some condition to determine whether or not the project is being indexed
end
Performing a search
Example: Find all documents in a project
query = ActiveContext::Query.filter(project_id: 1).limit(1)
results = Ai::Context::Collections::MergeRequest.search(user: current_user, query: query)
results.to_a
Example: Find document closest to a given embedding
target_embedding = ::ActiveContext::Embeddings.generate_embeddings("some text")
query = ActiveContext::Query.filter(project_id: 1).knn(target: "embeddings", vector: target_embedding, k: 1)
results = Ai::Context::Collections::MergeRequest.search(user: current_user, query: query)
results.to_a