| import os |
| from dotenv import load_dotenv |
|
|
| from .ConnectorStrategy import ConnectorStrategy |
|
|
| from pinecone import Pinecone, ServerlessSpec |
| from langchain_openai import OpenAIEmbeddings |
| from langchain_pinecone import PineconeVectorStore |
| from langchain_core.documents import Document |
|
|
| import unicodedata |
| import time |
|
|
| class PineconeConnector(ConnectorStrategy): |
| def __init__(self): |
|
|
| load_dotenv() |
|
|
| pinecone_api_key = os.environ.get("PINECONE_API_KEY") |
|
|
| self.index_name = os.environ.get("PINECONE_INDEX_NAME") |
| self.namespace = os.environ.get("PINECONE_NAMESPACE") |
|
|
|
|
| pc = Pinecone(api_key=pinecone_api_key) |
|
|
| existing_indexes = [index_info["name"] for index_info in pc.list_indexes()] |
|
|
| if self.index_name not in existing_indexes: |
| pc.create_index( |
| name=self.index_name, |
| dimension=3072, |
| metric="cosine", |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), |
| ) |
| while not pc.describe_index(self.index_name).status["ready"]: |
| time.sleep(1) |
|
|
| self.index = pc.Index(self.index_name) |
|
|
| |
| def getDocs(self): |
| |
| |
| docs_names = [] |
| for ids in self.index.list(namespace=self.namespace): |
| for id in ids: |
| name_doc = "_".join(id.split("_")[:-1]) |
| if name_doc not in docs_names: |
| docs_names.append(name_doc) |
|
|
| return docs_names |
| |
| |
| def addDoc(self, filename, text_chunks, embedding): |
| try: |
| vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace) |
|
|
| file_name = filename.split(".")[0].replace(" ","_").replace("-","_").replace(".","_").replace("/","_").replace("\\","_").strip() |
|
|
| documents = [] |
| uuids = [] |
|
|
| for i, chunk in enumerate(text_chunks): |
| clean_filename = remove_non_standard_ascii(file_name) |
| uuid = f"{clean_filename}_{i}" |
|
|
| document = Document( |
| page_content=chunk, |
| metadata={ "filename":filename, "chunk_id":uuid }, |
| ) |
|
|
| uuids.append(uuid) |
| documents.append(document) |
| |
|
|
| vector_store.add_documents(documents=documents, ids=uuids) |
|
|
| return {"filename_id":clean_filename} |
| |
| except Exception as e: |
| print(e) |
| return False |
| |
| def retriever(self, query, embedding): |
|
|
| vector_store = PineconeVectorStore(index=self.index, embedding=embedding,namespace=self.namespace) |
|
|
| retriever = vector_store.as_retriever( |
| search_type="similarity_score_threshold", |
| search_kwargs={"k": 3, "score_threshold": 0.6}, |
| ) |
|
|
| return retriever.invoke(query) |
| |
|
|
| def remove_non_standard_ascii(input_string: str) -> str: |
| normalized_string = unicodedata.normalize('NFKD', input_string) |
| return ''.join(char for char in normalized_string if 'a' <= char <= 'z' or 'A' <= char <= 'Z' or char.isdigit() or char in ' .,!?') |
|
|
|
|