rhythmic-stone-34843
02/02/2025, 7:48 PM@app.get("/login")
async def show_login_form(request: Request):
"""
Handles GET requests to display the login form.
Fetches login flow data from Kratos.
"""
# Check if the session cookie is present and valid
if "ory_kratos_session" in request.cookies:
session_response = requests.get(
f"{KRATOS_EXTERNAL_API_URL}/sessions/whoami",
cookies=request.cookies
)
if session_response.status_code == 200 and session_response.json().get("active"):
return RedirectResponse(url="/home")
else:
RedirectResponse(url=f"{KRATOS_EXTERNAL_API_URL}/self-service/login/browser")
try:
async with httpx.AsyncClient() as client:
cookies = request.cookies
print(f"i am in start login flow cookie is :{cookies}")
print("=====================================================================================")
headers = {"Accept": "application/json"}
response = await client.get(
f"{KRATOS_EXTERNAL_API_URL}/self-service/login/browser",
cookies=cookies,
headers=headers,
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Failed to start login flow")
print(f"response login: {response.json()}")
flow_data = response.json()
csrf_token = None
# Extract CSRF token
for node in flow_data["ui"]["nodes"]:
attributes = node["attributes"]
if attributes.get("name") == "csrf_token":
csrf_token = attributes["value"]
break
if not csrf_token or not flow_data["id"]:
raise HTTPException(status_code=400, detail="CSRF token or Flow ID is missing.")
print(f"i am in start login flow csrf_token is :{csrf_token}")
print("=====================================================================================")
return templates.TemplateResponse(
"login.html",
{
"request": request,
"flow_id": flow_data["id"],
"csrf_token": csrf_token,
},
)
except Exception as e:
print(f"Error fetching login flow: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/login")
async def handle_login(
request: Request,
email: str = Form(...),
password: str = Form(...),
flow: str = Form(...),
csrf_token: str = Form(...),
):
"""
Handles POST requests for user login.
"""
try:
print(f"i am in update login flow csrf_token is :{csrf_token}")
print("=====================================================================================")
# Prepare the login data
login_data = UpdateLoginFlowBody(
actual_instance=UpdateLoginFlowWithPasswordMethod(
csrf_token=csrf_token,
password=password,
method="password",
identifier=email,
transient_payload={},
)
)
# Convert the login data to a dictionary
login_data_dict = login_data.dict()
# Convert cookies dictionary to a string
cookies_str = "; ".join([f"{key}={value}" for key, value in request.cookies.items()])
print(f"Cookies as string: {cookies_str}")
# Send the login request to Kratos
async with httpx.AsyncClient() as client:
response = await client.post(
f"{KRATOS_EXTERNAL_API_URL}/self-service/login?flow={flow}",
json=login_data_dict["actual_instance"],
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"Cookie": cookies_str,
"X-CSRF-Token": csrf_token,
},
)
print(f"Raw Kratos response: {response.json()}")
# Handle the response
if response.status_code == 200:
# Login successful
print("Login successful")
# Set the session cookie in the response
session_cookie = response.cookies.get("ory_kratos_session")
if session_cookie:
response = RedirectResponse(url="/home", status_code=303)
response.set_cookie(
key="ory_kratos_session",
value=session_cookie,
httponly=True,
secure=True, # Set to False if not using HTTPS
samesite="lax",
)
return response
else:
raise HTTPException(status_code=500, detail="Session cookie not found in Kratos response")
elif response.status_code == 400:
# Handle form validation errors
error_detail = response.json().get("error", {}).get("message", "Login failed")
print(f"Login failed: {error_detail}")
raise HTTPException(status_code=400, detail=error_detail)
elif response.status_code == 410:
# Handle expired flow
error_detail = response.json().get("error", {}).get("message", "The login flow has expired")
print(f"Login flow expired: {error_detail}")
raise HTTPException(status_code=410, detail=error_detail)
else:
# Handle unexpected response types
print(f"Unexpected response: {response.json()}")
raise HTTPException(status_code=500, detail="Unexpected response from Kratos")
except ValueError as ve:
print(f"ValueError: {ve}")
raise HTTPException(status_code=400, detail=f"Value Error: {ve}")
except AttributeError as ae:
print(f"AttributeError: {ae}")
raise HTTPException(status_code=400, detail=f"Attribute Error: {ae}")
except Exception as e:
print(f"Error: {e}")
raise HTTPException(status_code=400, detail=f"Data serialization error: {e}")