Back_Marco / main.py
BuildTools
nouveau Back
1c05df2
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from bson import ObjectId
from datetime import datetime, timedelta
import jwt
from passlib.context import CryptContext
app = FastAPI()
# Configuration de la base de données MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["user_db"]
users_collection = db["users"]
# Configuration de la sécurité
SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Fonction pour vérifier le mot de passe
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# Fonction pour hasher le mot de passe
def get_password_hash(password):
return pwd_context.hash(password)
# Fonction pour créer un token JWT
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Fonction pour authentifier un utilisateur
def authenticate_user(email: str, password: str):
user = users_collection.find_one({"email": email})
if not user:
return False
if not verify_password(password, user["hashed_password"]):
return False
return user
# Route pour s'inscrire
@app.post("/register")
async def register(email: str, password: str):
hashed_password = get_password_hash(password)
try:
users_collection.insert_one({"email": email, "hashed_password": hashed_password})
return {"message": "User created successfully"}
except DuplicateKeyError:
raise HTTPException(status_code=400, detail="Email already registered")
# Route pour se connecter
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["email"]}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# Route protégée
@app.get("/protected")
async def protected_route(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email = payload.get("sub")
if email is None:
raise HTTPException(status_code=400, detail="Invalid token")
user = users_collection.find_one({"email": email})
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "You are authenticated", "user": email}
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="Invalid token")