LlamaIndex 的数据摄入管道是一种全新且改进的方法,用于高效摄入和管理数据文档。
这一管道特别适用于需要将输入文档经过一系列处理步骤(如切分、嵌入等)后才进行索引建立的情况,比如我们的 RAG 管道就是这样。
此外,它还具备缓存、文档存储管理、向量存储更新等实用功能。
数据变换
数据变换是数据摄入管道的核心组件。
每个数据变换都会接收一个节点列表,经过一系列必要的修改后,输出另一个节点列表。数据摄入管道就是由这些有序的数据变换步骤组成的。
我们在初始化管道时即定义这些数据变换。
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
]
)
我们可以使用的数据变换包括:
- 文本分割器(TextSplitter)
- 节点解析器(NodeParser)
- 元数据提取器(MetadataExtractor)
- 任何嵌入模型(Any embedding model)
我们也可以根据需要创建自定义的数据变换。相关指南即将发布。
缓存机制
在管道运行过程中,一个变换的输出会成为下一个变换的输入。
管道会缓存节点列表和变换对。因此,如果我们对相同的节点列表进行重复的变换操作,管道会直接从缓存中获取结果。
我们可以通过执行 ingest_cache.clear()
来清除缓存。
此外,我们还可以利用 Redis 等服务来实现更高级的缓存机制:
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
cache=IngestionCache(
cache=RedisCache(
redis_uri="redis://127.0.0.1:6379", collection="test_cache"
)
),
)
文档管理策略
为了避免对同一文档重复执行变换,数据摄入管道利用文档的 ID 和内容哈希来识别并处理重复文档。
启用文档管理功能,需要将文档存储系统(docstore)集成到管道中。
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
docstore=SimpleDocumentStore()
)
系统会比较文档存储中已有文档的哈希值和输入文档的哈希值,从而筛选出不需要变换的文档。
文档管理主要有三种策略:
- 仅检查文档重复情况
- 实现文档的更新操作
- 在更新文档的同时删除旧文档。
更多细节的指南即将发布。
集成向量存储
如果向管道中添加了向量存储(vectorstore),管道会自动将经过一系列变换后的最终输出节点加入到这个向量存储中。
import qdrant_client
client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
vector_store=vector_store,
)
# Ingest directly into a vector db
pipeline.run(documents=[Document.example()])
接着,我们可以利用这个已填充的向量存储来创建向量存储索引。
from llama_index import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(vector_store)
为了实现这一点,变换序列中的最后一个步骤必须是一个嵌入(embedding)变换。
数据持久化
数据摄入管道支持将缓存和文档存储持久化到一个文件夹中(默认路径为 ./pipeline_storage
)。
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
]
)
# Now let's save the pipeline (cache and docstore)
pipeline.persist('./pipeline_storage')
定义好管道后,我们需要像下面这样从存储中加载它:
pipeline.load('./pipeline_storage')
这样一来,当我们运行管道时,它会先利用已有的缓存数据,同时跳过文档存储中已存在的同一文档。
需要注意的是,如果我们使用的是远程缓存或文档存储系统,例如 Redis,那么上述的持久化步骤就不必要了。
结论
综上所述,LlamaIndex 的新型数据摄入管道不仅提高了文档摄入和管理的效率,还使得对文档执行一系列变换变得直观且高效。
欲了解更多,请查看官方文档:摄入管道
感谢您的阅读,敬请期待更多内容。
我会定期在 Twitter 上分享关于这些主题及我正在探索的内容。欢迎在 Twitter 上关注我。