250 lines
9.2 KiB
Python
250 lines
9.2 KiB
Python
"""
|
|
Local RAG setup with LangChain, Ollama, and FAISS
|
|
Minimal dependencies, simple code
|
|
"""
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from langchain_community.document_loaders import PyPDFLoader, TextLoader
|
|
from langchain_community.vectorstores import FAISS
|
|
from langchain_huggingface import HuggingFaceEmbeddings
|
|
from langchain_ollama import ChatOllama
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
|
|
|
|
|
class LocalRAG:
|
|
def __init__(self, vectorstore_path="./vectorstore", ollama_model="mistral:7b"):
|
|
"""Initialize local RAG system"""
|
|
self.vectorstore_path = vectorstore_path
|
|
self.ollama_model = ollama_model
|
|
|
|
# Embeddings
|
|
print("Loading embeddings model...")
|
|
self.embeddings = HuggingFaceEmbeddings(
|
|
model_name="sentence-transformers/all-MiniLM-L6-v2"
|
|
)
|
|
|
|
# Text splitter
|
|
self.text_splitter = RecursiveCharacterTextSplitter(
|
|
chunk_size=1500,
|
|
chunk_overlap=300
|
|
)
|
|
|
|
# Ollama LLM
|
|
print(f"Connecting to Ollama (model: {ollama_model})...")
|
|
self.llm = ChatOllama(
|
|
model=ollama_model,
|
|
base_url="http://localhost:11434"
|
|
)
|
|
|
|
# Vector store (load if exists, otherwise None)
|
|
self.vectorstore = None
|
|
self._load_vectorstore()
|
|
|
|
def _load_vectorstore(self):
|
|
"""Load existing vector store if available"""
|
|
index_file = os.path.join(self.vectorstore_path, "index.faiss")
|
|
if os.path.exists(index_file):
|
|
try:
|
|
self.vectorstore = FAISS.load_local(
|
|
self.vectorstore_path,
|
|
self.embeddings,
|
|
allow_dangerous_deserialization=True
|
|
)
|
|
print(f"Loaded existing vector store from {self.vectorstore_path}")
|
|
except Exception as e:
|
|
print(f"Could not load vector store: {e}")
|
|
self.vectorstore = None
|
|
|
|
def add_documents(self, file_paths):
|
|
"""Add documents to the vector store"""
|
|
print(f"\nLoading {len(file_paths)} document(s)...")
|
|
all_docs = []
|
|
|
|
for file_path in file_paths:
|
|
path = Path(file_path)
|
|
if not path.exists():
|
|
print(f"Warning: {file_path} not found, skipping")
|
|
continue
|
|
|
|
# Load document
|
|
if path.suffix.lower() == '.pdf':
|
|
loader = PyPDFLoader(str(path))
|
|
elif path.suffix.lower() in ['.txt', '.md']:
|
|
loader = TextLoader(str(path))
|
|
else:
|
|
print(f"Warning: Unsupported file type {path.suffix}, skipping")
|
|
continue
|
|
|
|
docs = loader.load()
|
|
chunks = self.text_splitter.split_documents(docs)
|
|
all_docs.extend(chunks)
|
|
print(f" - {path.name}: {len(chunks)} chunks")
|
|
|
|
if not all_docs:
|
|
print("No documents loaded!")
|
|
return
|
|
|
|
# Create or update vector store
|
|
print(f"\nCreating embeddings for {len(all_docs)} chunks...")
|
|
if self.vectorstore is None:
|
|
self.vectorstore = FAISS.from_documents(all_docs, self.embeddings)
|
|
else:
|
|
new_store = FAISS.from_documents(all_docs, self.embeddings)
|
|
self.vectorstore.merge_from(new_store)
|
|
|
|
# Save
|
|
os.makedirs(self.vectorstore_path, exist_ok=True)
|
|
self.vectorstore.save_local(self.vectorstore_path)
|
|
print(f"Vector store saved to {self.vectorstore_path}")
|
|
|
|
def list_documents(self):
|
|
"""List all documents in the vector store"""
|
|
if self.vectorstore is None:
|
|
print("No documents in vector store.")
|
|
return []
|
|
|
|
# Get all documents from the vector store
|
|
# We'll retrieve a large number to get all documents
|
|
all_docs = self.vectorstore.similarity_search("", k=10000) # Large k to get all
|
|
|
|
# Extract unique document sources from metadata
|
|
documents = {}
|
|
for doc in all_docs:
|
|
source = doc.metadata.get('source', 'Unknown')
|
|
if source not in documents:
|
|
documents[source] = {
|
|
'source': source,
|
|
'chunks': 0,
|
|
'page': doc.metadata.get('page', None)
|
|
}
|
|
documents[source]['chunks'] += 1
|
|
|
|
# Convert to list and sort
|
|
doc_list = list(documents.values())
|
|
doc_list.sort(key=lambda x: x['source'])
|
|
|
|
print(f"\nDocuments in vector store ({len(doc_list)} unique documents):")
|
|
print("-" * 60)
|
|
for doc_info in doc_list:
|
|
print(f" - {doc_info['source']}")
|
|
print(f" Chunks: {doc_info['chunks']}")
|
|
if doc_info['page'] is not None:
|
|
print(f" Page: {doc_info['page']}")
|
|
|
|
return doc_list
|
|
|
|
def _format_history(self, chat_history):
|
|
"""Format chat history as a string for prompts."""
|
|
lines = []
|
|
for turn in chat_history or []:
|
|
role = (turn.get("role") or "").lower()
|
|
content = (turn.get("content") or "").strip()
|
|
if role == "user":
|
|
lines.append(f"User: {content}")
|
|
elif role == "assistant":
|
|
lines.append(f"Assistant: {content}")
|
|
return "\n".join(lines) if lines else ""
|
|
|
|
def _docs_to_retrieved(self, docs):
|
|
"""Convert document list to retrieved chunks format for API."""
|
|
return [
|
|
{
|
|
"content": doc.page_content,
|
|
"source": doc.metadata.get("source", ""),
|
|
"page": doc.metadata.get("page"),
|
|
}
|
|
for doc in docs
|
|
]
|
|
|
|
def query(self, question, k=8):
|
|
"""Query the RAG system (no conversation history). Returns dict with 'answer' and 'retrieved'."""
|
|
return self.query_with_history(question, chat_history=[], k=k)
|
|
|
|
def query_with_history(self, question, chat_history=None, k=8):
|
|
"""Query the RAG with conversation history: rephrase question using history for retrieval,
|
|
then answer with full conversation + retrieved context in the prompt.
|
|
Returns dict with 'answer' and 'retrieved' (list of chunks with content, source, page).
|
|
"""
|
|
if self.vectorstore is None:
|
|
return {
|
|
"answer": "Error: No documents loaded. Please add documents first.",
|
|
"retrieved": [],
|
|
}
|
|
|
|
history_str = self._format_history(chat_history)
|
|
search_query = question
|
|
|
|
print(f"[RAG] User question: {question!r}")
|
|
|
|
# 1) If we have history, rephrase the question into a standalone query for better retrieval
|
|
if history_str.strip():
|
|
rephrase_prompt = f"""Given this chat history and the latest user question, write a single standalone question that captures what the user is asking. Do not answer it; only output the standalone question. If the latest question is already clear on its own, output it unchanged.
|
|
|
|
Chat history:
|
|
{history_str}
|
|
|
|
Latest user question: {question}
|
|
|
|
Standalone question:"""
|
|
rephrase_response = self.llm.invoke(rephrase_prompt)
|
|
search_query = (rephrase_response.content if hasattr(rephrase_response, "content") else str(rephrase_response)).strip() or question
|
|
print(f"[RAG] Standalone search query (rephrased): {search_query!r}")
|
|
|
|
print(f"[RAG] Query sent to vector store: {search_query!r}")
|
|
|
|
# 2) Retrieve documents using the (rephrased) query
|
|
docs = self.vectorstore.similarity_search(search_query, k=k)
|
|
retrieved = self._docs_to_retrieved(docs)
|
|
context = "\n\n".join([doc.page_content for doc in docs])
|
|
|
|
# 3) Answer using conversation history + retrieved context
|
|
history_block = f"Chat history:\n{history_str}\n\n" if history_str else ""
|
|
answer_prompt = f"""You are an assistant for question-answering. Use the chat history (if any) and the retrieved context below to answer the current question. If you don't know the answer, say so. Keep the conversation coherent.
|
|
|
|
{history_block}Relevant context from documents:
|
|
|
|
{context}
|
|
|
|
Current question: {question}
|
|
|
|
Answer:"""
|
|
response = self.llm.invoke(answer_prompt)
|
|
answer = response.content if hasattr(response, "content") else str(response)
|
|
|
|
return {"answer": answer, "retrieved": retrieved}
|
|
|
|
|
|
def main():
|
|
"""Example usage"""
|
|
print("=" * 60)
|
|
print("Local RAG with LangChain, Ollama, and FAISS")
|
|
print("=" * 60)
|
|
|
|
# Initialize
|
|
rag = LocalRAG(ollama_model="mistral:7b")
|
|
|
|
# Add documents (uncomment and add your file paths)
|
|
# rag.add_documents([
|
|
# "data/dok1.pdf",
|
|
# "data/dok2.pdf",
|
|
# "data/dok3.pdf"
|
|
# ])
|
|
|
|
# List documents
|
|
rag.list_documents()
|
|
|
|
# Query
|
|
question = "What do the documents say about modality for perceived message perception?"
|
|
result = rag.query(question)
|
|
print(f"\nQuestion: {question}")
|
|
print(f"Answer: {result['answer']}")
|
|
if result.get("retrieved"):
|
|
print(f"Retrieved {len(result['retrieved'])} chunks")
|
|
|
|
# print("\nSetup complete! Uncomment the code above to add documents and query.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|