LCEL (LangChain Expression Language) 2

Runnable의 실행 방법

Runnable의 실행 방법으로는 3가지가 있습니다:

  • invoke: 동기 실행
  • stream: 스트리밍 실행
  • batch: 배치 실행

비동기로 실행할 수 있는 방법으로도 3가지가 제공됩니다:

  • ainvoke: 비동기 실행
  • astream: 비동기 스트리밍
  • abatch: 비동기 배치

Stream 예제

chain = prompt | model | output_parser

for chunk in chain.stream({"input": "..."}):
    print(chunk, end = "", flush = True)

Batch 예제

chain = prompt | model | output_parser

output = chain.batch([{"input":"..."}, {"input":"..."}])
print(output)

다양한 러너블 연결하기

1. 두개의 체인 연결하기

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model = "gpt-4o-mini", temperature = 0)
output_parser = StrOutputParser()

## 체인 1 : 답변을 단계적으로 생성하기
cot_prompt = ChatPromptTemplate.from_messages([
    ("system", "사용자의 질문에 단계적으로 답변하세요"),
    ("user", "{input}"),
])

cot_chain = cot_prompt | model | output_parser

## 체인 2: 결론 추출하기
summary_prompt = ChatPromptTemplate.from_messages([
    ("system", "단계적으로 생각한 답변에서 결론을 추출하세요"),
    ("user", "{text}"),
])

summary_chain = summary_prompt | model | output_parser

## 체인 3: 체인 연결하기
chain = cot_chain | summary_chain

## 체인 실행하기
output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)

주의: 입력 타입과 출력 타입의 일관성을 유지해야 합니다!

임의의 함수를 러너블로 만들기: RunnableLambda

어떤 변환을 적용하거나 추가 처리를 하는 함수를 러너블로 만들면 체인간 변환을 수행하는 기능을 구현할 수 있습니다.

1. 기본 구현 방식

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_messages([
    ("system", "사용자의 질문에 단계적으로 답변하세요"),
    ("user", "{input}"),
])
model = ChatOpenAI(model = "gpt-4o-mini", temperature = 0)
output_parser = StrOutputParser()

# 임의의 함수를 러너블로 만들기
def add_prefix(text: str) -> str:
    return "결론: " + text

add_prefix_runnable = RunnableLambda(add_prefix)

# 체인 구성
chain = prompt | model | output_parser | add_prefix_runnable

# 체인 실행
output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)

2. 데코레이터를 사용한 구현 방식

from langchain_core.runnables import chain

@chain
def add_prefix(text: str) -> str:
    return "결론: " + text

chain = prompt | model | output_parser | add_prefix

output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)

3. 암시적으로 생성하기

def add_prefix(text: str) -> str:
    return "결론: " + text

chain = prompt | model | output_parser | add_prefix

output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)

Stream 처리 시 주의사항

사용자 함수를 사용할때 stream으로 호출하면 점진적으로 출력되지 않고 처리가 완전히 끝난 시점에 한꺼번에 출력됩니다. 사용자 정의함수는 입력을 한꺼번에 처리하도록 되어있기 때문입니다.

점진적으로 처리하기 위해 제너레이터 함수를 사용해야 합니다:

def upper(input: Iterator[str]) -> Iterator[str]:
    for text in input:
        yield text.upper()

여러 러너블을 병렬로 연결하기

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model = "gpt-4o-mini", temperature = 0)
output_parser = StrOutputParser()

pros_prompt = ChatPromptTemplate.from_messages([
    ("system", "사용자의 진술에서 장점을 찾아서 긍정적으로 답변하세요."),
    ("user", "{input}"),
])

pros_chain = pros_prompt | model | output_parser

cons_prompt = ChatPromptTemplate.from_messages([
    ("system", "사용자의 진술에서 단점을 찾아서 부정적으로 답변하세요."),
    ("user", "{input}"),
])

cons_chain = cons_prompt | model | output_parser

## 병렬 체인 구성
chain = RunnableParallel(
    {"장점": pros_chain},
    {"단점": cons_chain},
)

## 병렬 체인 실행
output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)

# 요약 체인 구성
summary_prompt = ChatPromptTemplate.from_messages([
    ("system", "장점과 단점을 요약하세요."),
    ("user", "{장점} {단점}"),
])

summary_chain = (RunnableParallel(
    {"장점": pros_chain},
    {"단점": cons_chain},
) | summary_prompt | model | output_parser
)

## 암시적으로 병렬 체인 구성하기
summary_chain2 = (
    {"장점": pros_chain, "단점": cons_chain} | summary_prompt | model | output_parser
)

Item Getter 사용

딕셔너리에서 값을 추출하는 함수를 만들 수 있습니다:

from operator import itemgetter

topic_getter = itemgetter("장점")
topic = topic_getter({"장점":"맛있다."})
print(topic)
# > 맛있다.

# 체인 예시
synt_prompt = ChatPromptTemplate.from_messages([
    ("system", "사용자의 진술에서 {topic}에 대한 장점과 단점을 찾아서 긍정적으로 답변하세요."),
    ("user", "장점 :{장점}\n단점 :{단점}"),
])

synt_chain = {
    "장점": pros_chain,
    "단점": cons_chain,
    "topic": itemgetter("topic"),
} | synt_prompt | model | output_parser

output = synt_chain.invoke({"topic": "회사를 다니고 있습니다."})

RunnablePassthrough 활용

RAG 처리 구현

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template('''
                                           다음 문맥만을 고려하여 질문에 대답하세요.
                                           문맥: "{context}"
                                           질문: "{question}"
                                           ''')

model = ChatOpenAI(model = "gpt-4o-mini", temperature = 0)

from langchain_community.retrievers import TravilySearchAPIRetriever

retriever = TravilySearchAPIRetriever(
    api_key = os.getenv("TRAVILY_API_KEY"),
    max_results = 2,
    k=3)

from langchain_core.runnables import RunnablePassthrough

chain = (
    {"context": retriever, "question": RunnablePassthrough()} | prompt | model | output_parser
)

output = chain.invoke({"question": "오늘 날씨가 어때?"})
print(output)

러너블 체인에 값 추가하기: assign

import pprint

chain = {
    "question": RunnablePassthrough(),
    "context": retriever,
} | RunnablePassthrough.assign(
    answer=prompt | model | StrOutputParser()
)

output = chain.invoke({"question": "오늘 날씨가 어때?"})
pprint(output)

## assign은 인스턴스 메서드로도 제공됩니다
chain = RunnableParallel({
    "question": RunnablePassthrough(),
    "context": retriever,
}).assign(answer=prompt | model | StrOutputParser())

output = chain.invoke({"question": "오늘 날씨가 어때?"})
pprint(output)

Pick으로 일부 값만 선택

chain = RunnableParallel({
    "question": RunnablePassthrough(),
    "context": retriever,
}).assign(answer=prompt | model | StrOutputParser()
          ).pick("context", "answer")

output = chain.invoke({"question": "오늘 날씨가 어때?"})
pprint(output)

중간값 출력: astream_events

chain = (
    {"context": retriever, "question": RunnablePassthrough()} | prompt | model | StrOutputParser()
)

async for event in chain.astream_events("서울의 현재 날씨는?", version = "v2"):
    event_kind = event["event"]
    if event_kind == "on_retriever_end":
        print("검색 결과:")
        documents = event["data"]["output"]
        for document in documents:
            print(document)

    elif event_kind == "on_parser_start":
        print("출력 파서 시작")

    elif event_kind == "on_parser_stream":
        chunk = event["data"]["chunk"]
        print(chunk, end = "", flush = True)