LCEL (LangChain Expression Language) 2
Runnable의 실행 방법
Runnable의 실행 방법으로는 3가지가 있습니다:
- invoke: 동기 실행
- stream: 스트리밍 실행
- batch: 배치 실행
비동기로 실행할 수 있는 방법으로도 3가지가 제공됩니다:
- ainvoke: 비동기 실행
- astream: 비동기 스트리밍
- abatch: 비동기 배치
Stream 예제
chain = prompt | model | output_parser
for chunk in chain.stream({"input": "..."}):
print(chunk, end = "", flush = True)
Batch 예제
chain = prompt | model | output_parser
output = chain.batch([{"input":"..."}, {"input":"..."}])
print(output)
다양한 러너블 연결하기
1. 두개의 체인 연결하기
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model = "gpt-4o-mini", temperature = 0)
output_parser = StrOutputParser()
## 체인 1 : 답변을 단계적으로 생성하기
cot_prompt = ChatPromptTemplate.from_messages([
("system", "사용자의 질문에 단계적으로 답변하세요"),
("user", "{input}"),
])
cot_chain = cot_prompt | model | output_parser
## 체인 2: 결론 추출하기
summary_prompt = ChatPromptTemplate.from_messages([
("system", "단계적으로 생각한 답변에서 결론을 추출하세요"),
("user", "{text}"),
])
summary_chain = summary_prompt | model | output_parser
## 체인 3: 체인 연결하기
chain = cot_chain | summary_chain
## 체인 실행하기
output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)
주의: 입력 타입과 출력 타입의 일관성을 유지해야 합니다!
임의의 함수를 러너블로 만들기: RunnableLambda
어떤 변환을 적용하거나 추가 처리를 하는 함수를 러너블로 만들면 체인간 변환을 수행하는 기능을 구현할 수 있습니다.
1. 기본 구현 방식
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_messages([
("system", "사용자의 질문에 단계적으로 답변하세요"),
("user", "{input}"),
])
model = ChatOpenAI(model = "gpt-4o-mini", temperature = 0)
output_parser = StrOutputParser()
# 임의의 함수를 러너블로 만들기
def add_prefix(text: str) -> str:
return "결론: " + text
add_prefix_runnable = RunnableLambda(add_prefix)
# 체인 구성
chain = prompt | model | output_parser | add_prefix_runnable
# 체인 실행
output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)
2. 데코레이터를 사용한 구현 방식
from langchain_core.runnables import chain
@chain
def add_prefix(text: str) -> str:
return "결론: " + text
chain = prompt | model | output_parser | add_prefix
output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)
3. 암시적으로 생성하기
def add_prefix(text: str) -> str:
return "결론: " + text
chain = prompt | model | output_parser | add_prefix
output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)
Stream 처리 시 주의사항
사용자 함수를 사용할때 stream으로 호출하면 점진적으로 출력되지 않고 처리가 완전히 끝난 시점에 한꺼번에 출력됩니다. 사용자 정의함수는 입력을 한꺼번에 처리하도록 되어있기 때문입니다.
점진적으로 처리하기 위해 제너레이터 함수를 사용해야 합니다:
def upper(input: Iterator[str]) -> Iterator[str]:
for text in input:
yield text.upper()
여러 러너블을 병렬로 연결하기
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model = "gpt-4o-mini", temperature = 0)
output_parser = StrOutputParser()
pros_prompt = ChatPromptTemplate.from_messages([
("system", "사용자의 진술에서 장점을 찾아서 긍정적으로 답변하세요."),
("user", "{input}"),
])
pros_chain = pros_prompt | model | output_parser
cons_prompt = ChatPromptTemplate.from_messages([
("system", "사용자의 진술에서 단점을 찾아서 부정적으로 답변하세요."),
("user", "{input}"),
])
cons_chain = cons_prompt | model | output_parser
## 병렬 체인 구성
chain = RunnableParallel(
{"장점": pros_chain},
{"단점": cons_chain},
)
## 병렬 체인 실행
output = chain.invoke({"input": "오늘 날씨가 어때?"})
print(output)
# 요약 체인 구성
summary_prompt = ChatPromptTemplate.from_messages([
("system", "장점과 단점을 요약하세요."),
("user", "{장점} {단점}"),
])
summary_chain = (RunnableParallel(
{"장점": pros_chain},
{"단점": cons_chain},
) | summary_prompt | model | output_parser
)
## 암시적으로 병렬 체인 구성하기
summary_chain2 = (
{"장점": pros_chain, "단점": cons_chain} | summary_prompt | model | output_parser
)
Item Getter 사용
딕셔너리에서 값을 추출하는 함수를 만들 수 있습니다:
from operator import itemgetter
topic_getter = itemgetter("장점")
topic = topic_getter({"장점":"맛있다."})
print(topic)
# > 맛있다.
# 체인 예시
synt_prompt = ChatPromptTemplate.from_messages([
("system", "사용자의 진술에서 {topic}에 대한 장점과 단점을 찾아서 긍정적으로 답변하세요."),
("user", "장점 :{장점}\n단점 :{단점}"),
])
synt_chain = {
"장점": pros_chain,
"단점": cons_chain,
"topic": itemgetter("topic"),
} | synt_prompt | model | output_parser
output = synt_chain.invoke({"topic": "회사를 다니고 있습니다."})
RunnablePassthrough 활용
RAG 처리 구현
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_template('''
다음 문맥만을 고려하여 질문에 대답하세요.
문맥: "{context}"
질문: "{question}"
''')
model = ChatOpenAI(model = "gpt-4o-mini", temperature = 0)
from langchain_community.retrievers import TravilySearchAPIRetriever
retriever = TravilySearchAPIRetriever(
api_key = os.getenv("TRAVILY_API_KEY"),
max_results = 2,
k=3)
from langchain_core.runnables import RunnablePassthrough
chain = (
{"context": retriever, "question": RunnablePassthrough()} | prompt | model | output_parser
)
output = chain.invoke({"question": "오늘 날씨가 어때?"})
print(output)
러너블 체인에 값 추가하기: assign
import pprint
chain = {
"question": RunnablePassthrough(),
"context": retriever,
} | RunnablePassthrough.assign(
answer=prompt | model | StrOutputParser()
)
output = chain.invoke({"question": "오늘 날씨가 어때?"})
pprint(output)
## assign은 인스턴스 메서드로도 제공됩니다
chain = RunnableParallel({
"question": RunnablePassthrough(),
"context": retriever,
}).assign(answer=prompt | model | StrOutputParser())
output = chain.invoke({"question": "오늘 날씨가 어때?"})
pprint(output)
Pick으로 일부 값만 선택
chain = RunnableParallel({
"question": RunnablePassthrough(),
"context": retriever,
}).assign(answer=prompt | model | StrOutputParser()
).pick("context", "answer")
output = chain.invoke({"question": "오늘 날씨가 어때?"})
pprint(output)
중간값 출력: astream_events
chain = (
{"context": retriever, "question": RunnablePassthrough()} | prompt | model | StrOutputParser()
)
async for event in chain.astream_events("서울의 현재 날씨는?", version = "v2"):
event_kind = event["event"]
if event_kind == "on_retriever_end":
print("검색 결과:")
documents = event["data"]["output"]
for document in documents:
print(document)
elif event_kind == "on_parser_start":
print("출력 파서 시작")
elif event_kind == "on_parser_stream":
chunk = event["data"]["chunk"]
print(chunk, end = "", flush = True)