An indexing pipeline builds indexes derived from source data. The index should always be converging to the current version of source data. In other words, once a new version of source data is processed by the pipeline, all data derived from previous versions should no longer exist in the target index storage. This is called data consistency requirement for an indexing pipeline.

If you like our work, it would mean a lot to us if you could support Cocoindex on Github with a star. Thank you so much with a warm coconut hug 🥥🤗.

The Challenge of Concurrent Updates

Compared to building single-process in-memory transformation logic, indexing pipeline has additional challenges in two aspects, to maintain data consistency:

  1. Temporally, it's long-running, i.e. the execution may span across multiple processes. e.g. an early execution may terminate in the middle, due to a SIGTERM, or a failure to commit to the target storage or a reboot. When the process restarts later, it should pick up existing states, and keep moving forward to update the target storage to desired status. This is both tedious and error-prone: for example, if a source data is only partially processed, and it gets updated before the restarted pipeline starts to process it again, how can we make sure states derived from the old version are properly cleared?

  2. Spatially, execution happens concurrently, sometimes distributed across multiple machines for large workloads. This brings up the problem caused by out-of-order processing. Consider a scenario where your source data gets updated rapidly:

    1. Version 1 (V1) exists in the system
    2. Version 2 (V2) and Version 3 (V3) arrive within a second of each other
    3. V2 takes longer to process than V3 Without proper handling, you might end up with V2 data in your index even though V3 is the latest version - effectively rolling back to stale data.

Common Issues and Gaps

Data Exposure Risks

Data inconsistency in indexing systems can lead to several critical issues:

System Integrity

Security and Compliance

AI System Reliability

Business Operations

Approaches for Maintaining Data Consistency

1. Tracking source-to-target Key Mappings

If all your state management and storage systems are within a single transactional system (e.g., PostgreSQL), you can leverage database transactions for consistency. However, this is often not the case in real-world scenarios. For example, you may use an internal storage (e.g. PostgreSQL) to track per-source metadata, and uses an external vector store (e.g. Pinecone or Milvus) to store vector embeddings.

There're extra challenges around this:

So we need to carefully design the order of write operations during commit, to avoid data leakage even when the process is only partially executed:

  1. Add new target keys to the internal store
  2. Write to target storage
    • Create new records
    • Delete old records Note that for any target storage that supports atomic writing, this allows all updates derived from the same source key to be written atomically. This avoids mixed-version data on query time.
  3. Remove no-longer-exist target keys from the internal store

This ensures a key invariant: keys tracked in the internal storage is always a superset of those really exist in the target store. This makes sure all the data in the target store is always tracked so never leaked.

2. Ordinal Bookkeeping

Ordinals are unique identifiers that establish a strict ordering of data updates in your pipeline. They help bookkeep the sequence and timing of changes to ensure consistency. Common examples include:

Bookkeep ordinals carefully in two key stages:

Ordinal bookkeeping happens in Phase 1, which rejects out-of-order commits - they won't arise any updates in Phase 1, and Phase 2 and 3 won't be executed at all.

3. Versioned Unique Key Assignment with Soft Deletion

When there's multiple ongoing processings happen on the same source key, updates writing to the target storage (Phase 2 above) may be out of order, which causes results derived from old versions overwriting newer ones.

To avoid this, we need to:

Specifically:

Besides, we need an offline GC process to garbage collect rows in the target storage with the deleted field set. Because of this, the deleted field could be a timestamp of the deletion time, hence we can decide when to GC a specific version based on it.

How we solved these at CocoIndex

The discussions above only cover a part of the complexity of building and maintaining long-running or distributed indexing pipelines. There're other additional complexities, compared to building single-process in-memory transformation logic, such as:

CocoIndex framework aims at handling these complexities so users can focus on pure transformations:

By exposing a data-driven programming paradigm, CocoIndex allows developers to focus on their core business logic while maintaining data consistency and freshness.

If you like our work, Please support us - 🥥 Cocoindex on Github with a star. Thank you so much with a warm coconut hug 🥥🤗.