import json
from pydantic import BaseModel
from fastapi import APIRouter
from fastapi import Path
from fastapi.responses import StreamingResponse
from fastapi import FastAPI
class TodoItem(BaseModel):
id : int
item : str
class Config:
json_schema_extra = {
"example" : {
"id" : 1,
"item" : "Example Schema!"
}
}
class TodoItemUpdate(BaseModel):
item : str
class Config:
json_schema_extra = {
"example" : {
"item" : "Read the next chapter of the book"
}
}
todoItemList = []
apiRouter = APIRouter()
@apiRouter.post("/todo")
async def addTodoItem(todoItem : TodoItem) -> dict:
todoItemList.append(todoItem)
return {"message" : "Todo item added successfully"}
@apiRouter.get("/todo")
async def getTodoItemList() -> dict:
return {"todoItemList" : todoItemList}
async def enumerateTodoItemListInternal():
for todoItem in todoItemList:
yield json.dumps(todoItem.dict()) + "\n"
@apiRouter.get("/todos")
async def enumerateTodoItemList():
return StreamingResponse(enumerateTodoItemListInternal(), media_type = "application/json")
@apiRouter.get("/todo/{todoItemID}")
async def getTodoItem(todoItemID : int = Path(..., title = "The ID of the todo item to retrieve.")) -> dict:
for todoItem in todoItemList:
if todoItem.id == todoItemID:
return {"todoItem" : todoItem}
return {"message" : "Todo item with supplied ID doesn't exist."}
@apiRouter.put("/todo/{todoItemID}")
async def updateTodo(todoItemUpdate: TodoItemUpdate, todoItemID : int = Path(..., title = "The ID of the todo item to be updated.")) -> dict:
for todoItem in todoItemList:
if todoItem.id == todoItemID:
todoItem.item = todoItemUpdate.item
return {"message" : "Todo item updated successfully."}
return {"message" : "Todo item with supplied ID doesn't exist."}
@apiRouter.delete("/todo/{todoItemID}")
async def deleteTodo(todoItemID : int) -> dict:
for index in range(len(todoItemList)):
todoItem = todoItemList[index]
if todoItem.id == todoItemID:
todoItemList.pop(index)
return {"message" : "Todo item deleted successfully."}
return {"message" : "Todo item with supplied ID doesn't exist."}
@apiRouter.delete("/todo")
async def deleteAllTodoItems() -> dict:
todoItemList.clear()
return {"message" : "All Todo items deleted successfully."}
fastAPI = FastAPI()
@fastAPI.get("/")
def root():
return {"message" : "Hello World"}
@fastAPI.get("/home")
def home():
return {"message" : "home"}
fastAPI.include_router(apiRouter)