langchain / / 2024. 6. 27. 06:51

[langchain] 앙상블 검색기 elasticsearch 7에 적용

Elasticsearch에서 사용

elasticsearch에서 한국어 형태소 분석기인 nori analyzer와 함께 EnsembleRetriever를 사용해보자.

여기서도 동일하게 bm25 검색과 vector store 검색을 동시에 사용하는 EnsembleRetriever를 사용할 것이다.

현재 테스트하고 있는 elasticsearch 버전은 7.17.5이다. 최신 버전인 8.x에서는 특별한 문제없이 잘 동작하는 것을 확인했는데, 현재는 7.x에서 사용해야 하는 상황이라 7.x 버전으로 테스트를 해본다.

langchain-elasticsearch, elasticsearch 패키지 설치

pipenv install langchain-elasticsearch elasticsearch

bm25 검색

Embedding

아래 데이터를 elasticsearch에 넣어보자.

import os

from dotenv import load_dotenv
from langchain.schema import Document
from langchain_community.retrievers import ElasticSearchBM25Retriever
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai.embeddings import OpenAIEmbeddings

load_dotenv()

text_list = [
    "Galaxy S9의 특징은 저렴하다는 것이다",
    "Galaxy S9의 배터리는 3000 mAh이다",
    "Galaxy S10의 카메라는 Triple rear cameras이다. ",
    "Galaxy S20의 Display는 6.2-inch Dynamic AMOLED이다.",
    "Galaxy S20의 저장공간은 128G이다",
    "Galaxy S21의 Ram은 8GB이다",
]

index_name = "test_docs"
embeddings = OpenAIEmbeddings()
vector_store = ElasticsearchStore(
    embedding=embeddings,
    index_name=index_name,
    es_url=os.getenv("ELASTICSEARCH_URL"),
)

위의 코드를 실행하면 test_docs에 text_list가 임베딩 되어 들어간다. elasticsearch에 잘 들어갔는지 확인해보자.

GET test_docs/_search
{
  "_source": ["text"]
}

검색결과

{
  "took" : 28,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test_docs",
        "_type" : "_doc",
        "_id" : "5a758682-e024-457b-8158-3c5f4c2c45de",
        "_score" : 1.0,
        "_source" : {
          "text" : "Galaxy S9의 특징은 저렴하다는 것이다"
        }
      },
      {
        "_index" : "test_docs",
        "_type" : "_doc",
        "_id" : "f76d5d44-5e14-4e17-8f31-63cfb2a3666a",
        "_score" : 1.0,
        "_source" : {
          "text" : "Galaxy S9의 배터리는 3000 mAh이다"
        }
      },
      {
        "_index" : "test_docs",
        "_type" : "_doc",
        "_id" : "81127cbc-1ede-4277-865c-ef0dbc09d105",
        "_score" : 1.0,
        "_source" : {
          "text" : "Galaxy S10의 카메라는 Triple rear cameras이다. "
        }
      },
      {
        "_index" : "test_docs",
        "_type" : "_doc",
        "_id" : "f4024027-ca39-440d-aa94-9af0558d2565",
        "_score" : 1.0,
        "_source" : {
          "text" : "Galaxy S20의 Display는 6.2-inch Dynamic AMOLED이다."
        }
      },
      {
        "_index" : "test_docs",
        "_type" : "_doc",
        "_id" : "b34e3186-9f06-4133-bb79-25ae6bef461c",
        "_score" : 1.0,
        "_source" : {
          "text" : "Galaxy S20의 저장공간은 128G이다"
        }
      },
      {
        "_index" : "test_docs",
        "_type" : "_doc",
        "_id" : "54955e90-e326-4f5d-988d-a04ce69df337",
        "_score" : 1.0,
        "_source" : {
          "text" : "Galaxy S21의 Ram은 8GB이다"
        }
      }
    ]
  }
}

검색

이제 bm25로 검색을 해보자.

from langchain_community.retrievers import ElasticSearchBM25Retriever

elasticsearch_client = vector_store.client
bm25_retriever = ElasticSearchBM25Retriever(
    client=elasticsearch_client, index_name=index_name
)
bm25_docs = bm25_retriever.invoke("갤럭시 S21의 특징은?")

print(f"####################### bm25 검색 결과 #################")
for doc in bm25_docs:
    print(doc.page_content)

실행해보면 아무 결과도 나오지 않는다. 왜 안나오는 걸까? 원인 파악을 위해 elastic_search_bm25.py 소스를 확인해보았다.

[elastic_search_bm25.py 소스]

def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        query_dict = {"query": {"match": {"content": query}}}
        res = self.client.search(index=self.index_name, body=query_dict)

        docs = []
        for r in res["hits"]["hits"]:
            docs.append(Document(page_content=r["_source"]["content"]))
        return docs

elasticsearch에 검색을 할 때 content 필드에 query로 검색을 하는 것을 알 수 있다. 하지만 위에서 만든 test_docs 인덱스에는 content 필드가 없다. GET test_docs/_mapping으로 확인해보면 text 필드밖에 보이지 않는다.

그래서 생각한 방법이 text 필드에 데이터가 들아갈 때 content 필드에 동일하게 복사를 하면 된다.
그래서 아래와 같이 elasticsearch pipeline을 적용했다. 즉, text 필드의 값을 content 필드에 복사하는 방식이다.

참고)

아래의 pipeline을 생성하지 않고 _get_relevant_documents를 할 때 content가 아닌 text 필드를 검색하게 수정해도 된다.

PUT /_ingest/pipeline/copy_text_to_content_pipeline
{
  "description": "text 필드를 content에 복사한다",
  "processors": [
    {
      "set": {
        "field": "content",
        "value": "{{text}}"
      }
    }
  ]
}

그리고 test_docs 인덱스에 nori analyzer가 적용될 수 있도록 아래와 같이 인덱스(test_docs)를 재생성했다. (기존 test_docs 삭제 후 재생성)

PUT test_docs
{
  "settings": {
    "analysis": {
      "analyzer": {
        "nori" : {
          "tokenizer" : "nori_tokenizer"
        }
      }
    },
    "default_pipeline": "copy_text_to_content_pipeline"
  },
  "mappings": {
    "properties": {
        "text" : {
          "type" : "text",
          "analyzer" : "nori",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "content": {
          "type": "text",
          "analyzer": "nori"
        },
        "vector" : {
          "type" : "dense_vector",
          "dims" : 1536
        }
      }
    }
  }
}

그리고 임베딩을 새로 해주자.

# embedding
for text in text_list:
    doc = [Document(page_content=text)]
    vector_store.add_documents(doc, add_to_docstore=True)

elasticsearch를 조회해보면 6건이 새로 들어가 있는 것을 확인할 수 있다. 그리고 content에도 text와 동일한 데이터가 들어가 있는 것을 확인할 수 있다.

아래 쿼리로 조회해볼 수 있다.

GET test_docs/_search
{
  "_source": ["text", "content"]
}

다시 검색

다시 검색을 해보자.

# bm25 검색
elasticsearch_client = vector_store.client
bm25_retriever = ElasticSearchBM25Retriever(
    client=elasticsearch_client, index_name=index_name
)

bm25_docs = bm25_retriever.invoke("갤럭시 S21의 특징은?")
print(f"####################### bm25 검색 결과 #################")
for doc in bm25_docs:
    print(doc.page_content)

검색결과

####################### bm25 검색 결과 #################
Galaxy S21의 Ram은 8GB이다
Galaxy S9의 특징은 저렴하다는 것이다
Galaxy S20의 저장공간은 128G이다
Galaxy S9의 배터리는 3000 mAh이다
Galaxy S10의 카메라는 Triple rear cameras이다. 
Galaxy S20의 Display는 6.2-inch Dynamic AMOLED이다.

검색이 잘 되는 것을 확인할 수 있다. 하지만 한가지 문제가 더 있다. kwargs={"k": 1} 으로 1건만 추출하는 방법이 적용되지 않는다. 예를 들면 3건만 검색하고 싶은데 항상 10건이 검색이 된다. (elasticsearch의 기본값: 10건)

그래서 아래와 같이 custom_elastic_search_bm25.py 클래스를 생성한다.

from typing import List

from langchain_community.retrievers import ElasticSearchBM25Retriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document


class CustomElasticSearchBM25Retriever(ElasticSearchBM25Retriever):
    search_args = {}

    def __init__(self, **search_args):
        super().__init__(**search_args)
        self.search_args = search_args

    def _get_relevant_documents(
        self, query: str, run_manager: CallbackManagerForRetrieverRun, **kwargs
    ) -> List[Document]:
        query_dict = {"query": {"match": {"content": query}}}

        size = -1
        if "search_args" in self.search_args and "k" in self.search_args["search_args"]:
            size = self.search_args["search_args"]["k"]

        res = self.client.search(index=self.index_name, body=query_dict)

        docs = []
        for i, r in enumerate(res["hits"]["hits"]):
            docs.append(Document(page_content=r["_source"]["content"]))
            if -1 < size <= i + 1:
                break
        return docs

그리고 호출할 때 아래와 같이 위에서 생성한 클래스로 생성을 한다.

bm25_retriever = CustomElasticSearchBM25Retriever(
    client=elasticsearch_client, index_name=index_name, search_args={"k": 3}
)
bm25_docs = bm25_retriever.invoke(question)

print(f"####################### custom bm25 검색 결과 #################")
for doc in bm25_docs:
    print(doc.page_content)

이를 실행해 보면 아래와 같이 3건만 출력이 된다.

####################### custom bm25 검색 결과 #################
Galaxy S21의 Ram은 8GB이다
Galaxy S9의 특징은 저렴하다는 것이다
Galaxy S20의 저장공간은 128G이다

vector store 검색

다음으로 vector store 검색을 해보자.

# vector store 검색
retriever = vector_store.as_retriever(search_kwargs={"k": 1})
vector_result = retriever.invoke(question)
print("====== vector store result ========")
for i, doc in enumerate(vector_result):
    print(f"[문서 {i}] {doc.page_content.replace('\n', ' ')}")

실행하면 오류가 난다. 이것은 elasticsearch 7.x에서 발생하는 오류이며 8.x에서는 잘 실행이 된다.

오류 내용

elasticsearch.BadRequestError: BadRequestError(400, 'parsing_exception', 'Unknown key for a START_OBJECT in [knn].')

langchain에서 기본적으로 knn 검색을 하려고 시도한다. 여기서 사용하는 elasticsearch 버전은 7.x이며 vector store 검색을 사용하려면 knn 플러그인을 설치하거나 vector 검색 쿼리를 변경하는 방법이 있다.

여기서는 후자인 쿼리를 변경해보겠다.

일단 custom_elastic_search_store.py를 생성하고 similarity_search를 수정한다.

from typing import List, Optional, Dict, Any, Callable

from langchain_core.documents import Document
from langchain_elasticsearch.vectorstores import (
    ElasticsearchStore,
    _hits_to_docs_scores,
)


class CustomElasticSearchStore(ElasticsearchStore):
    def custom_script_query(self, query_body: dict, query: str):
        query_vector = query_body["knn"]["query_vector"]
        _filter = query_body["knn"]["filter"]
        must_clauses = []

        # metadata가 있을 경우 추가한다.
        if _filter:
            for key, value in _filter.items():
                must_clauses.append({"match": {f"metadata.{key}.keyword": f"{value}"}})
        else:
            must_clauses.append({"match_all": {}})

        return {
            "query": {
                "script_score": {
                    "query": {
                        "bool": {"must": must_clauses},
                    },
                    "script": {
                        "source": "cosineSimilarity(params.query_vector, 'vector') + 1.0",
                        "params": {"query_vector": query_vector},
                    },
                }
            }
        }

    def similarity_search(
        self,
        query: str,
        k: int = 4,
        fetch_k: int = 50,
        filter: Optional[List[dict]] = None,
        *,
        custom_query: Optional[
            Callable[[Dict[str, Any], Optional[str]], Dict[str, Any]]
        ] = None,
        doc_builder: Optional[Callable[[Dict], Document]] = None,
        **kwargs: Any,
    ) -> List[Document]:
        hits = self._store.search(
            query=query,
            k=k,
            num_candidates=fetch_k,
            filter=filter,
            custom_query=self.custom_script_query,
        )
        docs = _hits_to_docs_scores(
            hits=hits,
            content_field=self.query_field,
        )
        return [doc for doc, _score in docs]

그리고 호출할 때 아래와 같이 사용한다. 검색결과는 1건을 가져오도록 한다.

search_kwargs = {"k": 1, "filter": {}}
# search_kwargs["filter"] = {"project_id": "PROJECT_TEST"} # metadata를 넘길 경우는 이렇게 사용한다.
retriever = vector_store.as_retriever(search_kwargs=search_kwargs)
vector_result = retriever.invoke(question)
print("====== custom vector store result ========")
for i, doc in enumerate(vector_result):
    print(f"[문서 {i}] {doc.page_content.replace('\n', ' ')}")

실행결과는 아래와 같다.

====== vector store result ========
[문서 0] Galaxy S20의 Display는 6.2-inch Dynamic AMOLED이다.

참고

반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유