■ RunnableSequence 클래스의 astream_events 메소드에서 마지막 비스트리밍 단계 이후 비동기 스트리밍 이벤트를 수신하는 방법을 보여준다.
▶ main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
import asyncio import os from langchain_openai import ChatOpenAI from langchain_core.output_parsers import JsonOutputParser async def main(): os.environ["OPENAI_API_KEY"] = "<OPENAI_API_KEY>" chatOpenAI = ChatOpenAI(model = "gpt-3.5-turbo-0125") jsonOutputParser = JsonOutputParser() def getCountryNameList(inputDictionary): """입력 스트림에서 작동하지 않고 스트리밍을 중단하는 함수이다.""" if not isinstance(inputDictionary, dict): return "" if "countries" not in inputDictionary: return "" countryList = inputDictionary["countries"] if not isinstance(countryList, list): return "" countryNameList = [country.get("name") for country in countryList if isinstance(country, dict)] return countryNameList runnableSequence = chatOpenAI | jsonOutputParser | getCountryNameList requestString = """output a list of the countries france, spain and japan and their populations in JSON format. \ Use a dict with an outer key of "countries" which contains a list of countries. \ Each country should have the key `name` and `population`""" async for chunkList in runnableSequence.astream(requestString): print(chunkList, flush = True) eventCount = 0 async for eventDictionary in runnableSequence.astream_events(requestString, version = "v2"): kind = eventDictionary["event"] if kind == "on_chat_model_stream": content = eventDictionary["data"]["chunk"].content print(f"Chat model chunk : {content}", flush = True) if kind == "on_parser_stream": text = eventDictionary["data"]["chunk"] print(f"Parser chunk : {text}", flush = True) eventCount += 1 if eventCount > 30: print("...") break asyncio.run(main()) """ Chat model chunk : Chat model chunk : { Parser chunk : {} Chat model chunk : Chat model chunk : " Chat model chunk : countries Chat model chunk : ": Chat model chunk : [ Parser chunk : {'countries': []} Chat model chunk : Chat model chunk : { Parser chunk : {'countries': [{}]} Chat model chunk : Chat model chunk : " Chat model chunk : name Chat model chunk : ": Chat model chunk : " Parser chunk : {'countries': [{'name': ''}]} Chat model chunk : France Parser chunk : {'countries': [{'name': 'France'}]} Chat model chunk : ", Chat model chunk : Chat model chunk : " Chat model chunk : population Chat model chunk : ": Chat model chunk : Chat model chunk : 652 ... """ |
▶ requirements.txt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
aiohttp==3.9.5 aiosignal==1.3.1 annotated-types==0.7.0 anyio==4.4.0 async-timeout==4.0.3 attrs==23.2.0 certifi==2024.6.2 charset-normalizer==3.3.2 dataclasses-json==0.6.7 distro==1.9.0 exceptiongroup==1.2.1 faiss-gpu==1.7.2 frozenlist==1.4.1 greenlet==3.0.3 h11==0.14.0 httpcore==1.0.5 httpx==0.27.0 idna==3.7 jsonpatch==1.33 jsonpointer==3.0.0 langchain==0.2.5 langchain-community==0.2.5 langchain-core==0.2.8 langchain-openai==0.1.8 langchain-text-splitters==0.2.1 langsmith==0.1.79 marshmallow==3.21.3 multidict==6.0.5 mypy-extensions==1.0.0 numpy==1.26.4 openai==1.34.0 orjson==3.10.5 packaging==24.1 pydantic==2.7.4 pydantic_core==2.18.4 PyYAML==6.0.1 regex==2024.5.15 requests==2.32.3 sniffio==1.3.1 SQLAlchemy==2.0.30 tenacity==8.4.1 tiktoken==0.7.0 tqdm==4.66.4 typing-inspect==0.9.0 typing_extensions==4.12.2 urllib3==2.2.2 yarl==1.9.4 |
※ pip install langchain langchain-community langchain-openai faiss-gpu 명령을 실행했다.