■ Collection 클래스의 update_one 메소드에서 upsert 인자를 사용해 문서를 수정하거나 추가하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
from pymongo import MongoClient, ASCENDING databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) updateResult = userCollection.update_one( {"email" : "user6@sample.com"}, # 수정할 문서의 필터 {"$set" : {"password": "password6"}}, # 수정할 내용 upsert = True # 문서가 없는 경우 새 문서를 추가한다. ) print(updateResult) for user in userCollection.find(): print(user) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 replace_one 메소드를 사용해 특정 조건을 만족하는 1개 문서를 대체하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
from pymongo import MongoClient, ASCENDING databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) updateResult = userCollection.replace_one( {"password" : "password2"}, # 대체할 문서의 필터 {"email" : "user6@sample.com", "password": "password6"} # 새 문서 ) print(updateResult) for user in userCollection.find(): print(user) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 update_many 메소드를 사용해 특정 조건을 만족하는 문서들을 수정하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
from pymongo import MongoClient, ASCENDING databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) updateResult = userCollection.update_many( {"password" : {"$gt" : "password2"}}, # 수정할 문서의 필터 {"$set" : {"password" : "password*"}} # 수정할 내용 ) print(updateResult) for user in userCollection.find(): print(user) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 update_one 메소드를 사용해 조건을 만족하는 1개 문서를 수정하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
from pymongo import MongoClient, ASCENDING databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) updateResult = userCollection.update_one( {"email" : "user1@sample.com"}, # 수정할 문서의 필터 {"$set" : {"password": "password*"}} # 수정할 내용 ) print(updateResult) for user in userCollection.find(): print(user) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 index_information 메소드를 사용해 인덱스 정보를 구하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
from pymongo import MongoClient, ASCENDING databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) indexName = testDatabase.userCollection.create_index([("email", ASCENDING)], unique = True) indexInformationDictionary = testDatabase.userCollection.index_information() print(indexInformationDictionary) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 create_index 메소드를 사용해 인덱스를 생성하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
from pymongo import MongoClient, ASCENDING databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) indexName = testDatabase.userCollection.create_index([("email", ASCENDING)], unique = True) print(indexName) # 인덱스명 |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 count_documents 메소드를 사용해 컬렉션의 문서 수를 구하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) userCount1 = userCollection.count_documents({}) userCount2 = userCollection.count_documents({"password" : {"$gt" : "password2"}}) print(userCount1) print(userCount2) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 find 메소드를 사용해 특정 조건을 만족하는 문서들을 구하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) for user in userCollection.find({"password" : {"$gt" : "password2"}}).sort("email"): print(user) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 insert_many 메소드를 사용해 복수 문서들을 추가하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) userList = [] for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userList.append(user) userCollection.insert_many(userList) for user in userCollection.find(): print(user) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 find_one 메소드를 사용해 특정 _id 값과 일치하는 1개 문서를 구하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
from pymongo import MongoClient from bson.objectid import ObjectId databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] user = userCollection.find_one({"_id": ObjectId("6655d73a919710ce0a3ce0b1")}) if user: print(user) |
▶ requirements.txt
더 읽기
■ Collection 클래스의 find_one 메소드를 사용해 특정 조건을 만족하는 1개 문서를 구하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userCollection.insert_one(user) user = userCollection.find_one({"email": "user3@sample.com"}) if user: print(user) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 find 메소드를 사용해 모든 문서를 조회하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) for i in range(1, 6): user = { "email" : f"user{i}@sample.com", "password" : f"password{i}" } userCollection.insert_one(user) for user in userCollection.find(): print(user) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 delete_many 메소드를 사용해 해당 컬렉션의 모든 문서를 삭제하는 방법을 보여준다. ▶ 예제 코드 (PY)
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userCollection.delete_many({}) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Collection 클래스의 insert_one 메소드를 사용해 문서를 추가하는 방법을 보여준다. ▶ 예제 코드 (PY)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] userDocument = { "email" : "user@sample.com", "password" : "1234" } insertOneResult = userCollection.insert_one(userDocument) # 사용자 문서를 추가한다. print(insertOneResult.inserted_id) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Database 클래스의 drop_collection 메소드를 사용해 컬렉션을 삭제하는 방법을 보여준다. ▶ 예제 코드 (PY)
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] testDatabase.drop_collection("user") |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Database 클래스의 list_collection_names 메소드를 사용해 컬렉션명 리스트를 구하는 방법을 보여준다. ▶ 예제 코드 (PY)
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) testDatabase = mongoClient["testdb"] collectionList = testDatabase.list_collection_names() print(collectionList) |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ Database 클래스를 사용해 컬렉션 객체를 구하는 방법을 보여준다. (필요시 컬렉션 생성) ▶ 예제 코드 (PY)
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) databaseNameList = mongoClient.list_database_names() print(databaseNameList) testDatabase = mongoClient["testdb"] userCollection = testDatabase["user"] # user 컬렉션이 없는 경우 컬렉션을 생성한다. |
※ 실제 컬렉션 생성 시점은
더 읽기
■ MongoClient 클래스를 사용해 데이터베이스 객체를 구하는 방법을 보여준다. (필요시 데이터베이스 생성) ▶ 예제 코드 (PY)
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) databaseNameList = mongoClient.list_database_names() testDatabase = mongoClient["testdb"] # testdb 데이터베이스가 없는 경우 데이터베이스를 생성한다. print(testDatabase) |
※ 실제 데이터베이스 생성 시점은
더 읽기
■ MongoClient 클래스의 list_database_names 메소드를 사용해 데이터베이스명 리스트를 구하는 방법을 보여준다. ▶ 예제 코드 (PY)
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) databaseNameList = mongoClient.list_database_names() print(databaseNameList) """ ['admin', 'config', 'local'] """ |
▶ requirements.txt
|
dnspython==2.6.1 pymongo==4.7.2 |
■ MongoClient 클래스를 사용해 MongoDB 데이터베이스에 접속하는 방법을 보여준다. ▶ 예제 코드 1 (PY)
|
from pymongo import MongoClient databaseURL = "mongodb://localhost:27017/" mongoClient = MongoClient(databaseURL) |
▶ 예제 코드 2 (PY)
|
from pymongo import MongoClient mongoClient = MongoClient(host = "localhost", port = 27017) |
▶
더 읽기
■ pymongo 패키지를 설치하는 방법을 보여준다. 1. 명령 프롬프트를 실행한다. 2. 명령 프롬프트에서 아래 명령을 실행한다. ▶ 실행 명령
■ Document 클래스에서 Collection 서브 클래스와 name 변수를 사용해 MongoDB 컬렉션명을 설정하는 방법을 보여준다. ▶ 예제 코드 (PY)
|
from typing import Optional, List from beanie import Document class Event(Document): creator : Optional[str] title : str image : str description : str tagList : List[str] location : str class Collection: name = "event" |
■ FastAPI 클래스를 사용해 JWT 인증 애플리케이션을 만드는 방법을 보여준다. (MongoDB 연동) ▶ .env
|
DATABASE_URL=mongodb://localhost:27017/testdb SECRET_KEY=pass1234567 |
※ testdb : MongoDB 데이터베이스명 ※ pass1234567
더 읽기
■ Document 클래스를 사용해 MongoDB 데이터베이스에서 단순 CRUD 애플리케이션을 만드는 방법을 보여준다. ▶ event.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
from typing import Optional, List from pydantic import BaseModel from beanie import Document class Event(Document): title : str image : str description : str tagList : List[str] location : str class Settings: name = "event" class EventUpdate(BaseModel): title : Optional[str] image : Optional[str] description : Optional[str] tagList : Optional[List[str]] location : Optional[str] |
▶ db_helper.py
|
from motor.motor_asyncio import AsyncIOMotorClient from beanie import init_beanie class DBHelper: def __init__(self): pass async def initialize(self, databaseURL, documentModelList): self.databaseURL = databaseURL self.documentModelList = documentModelList client = AsyncIOMotorClient(self.databaseURL) await init_beanie(database = client.get_default_database(), document_models = self.documentModelList) |
▶ document_helper.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
from typing import Any, List from beanie import PydanticObjectId from pydantic import BaseModel class DocumentHelper: def __init__(self, modelMetaClass): self.modelMetaClass = modelMetaClass async def create(self, document) -> str: document = await document.create() return document.id async def get(self, id : PydanticObjectId) -> Any: document = await self.modelMetaClass.get(id) if document: return document return False async def getList(self) -> List[Any]: documentList = await self.modelMetaClass.find_all().to_list() return documentList async def update(self, id : PydanticObjectId, updateModel : BaseModel) -> Any: documentID = id updateModelDictionary = updateModel.dict() updateModelDictionary = {k : v for k, v in baseModelDictionary.items() if v is not None} updateQuery = {"$set" : {field: value for field, value in updateModelDictionary.items()}} document = await self.get(documentID) if not document: return False await document.update(updateQuery) return document async def delete(self, id : PydanticObjectId) -> bool: document = await self.get(id) if not document: return False await document.delete() return True async def deleteAll(self): await self.modelMetaClass.find_all().delete() |
▶
더 읽기
■ beanie 패키지를 설치하는 방법을 보여준다. 비동기 객체 문서 매퍼(Object Document Mapper, ODM)로 몽고DB 처리를 담당한다. 1. 명령 프롬프트를 실행한다. 2. 명령
더 읽기