"""All of the endpoints related to Users"""
import json
import os
from flask import g, request
from google.auth.transport import requests as g_requests
from google.oauth2 import id_token
import plynx.db.node_collection_manager
from plynx.constants import Collections, IAMPolicies, TokenType, UserPostAction
from plynx.db.demo_user_manager import DemoUserManager
from plynx.db.user import User, UserCollectionManager
from plynx.utils.exceptions import RegisterUserException
from plynx.web.common import app, handle_errors, logger, make_fail_response, make_success_response, register_user, requires_auth
[docs]demo_user_manager = DemoUserManager()
[docs]template_collection_manager = plynx.db.node_collection_manager.NodeCollectionManager(collection=Collections.TEMPLATES)
[docs]GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", None)
@app.route('/plynx/api/v0/token', strict_slashes=False)
@requires_auth
@handle_errors
[docs]def get_auth_token():
"""Generate access and refresh tokens"""
access_token = g.user.generate_token(TokenType.ACCESS_TOKEN)
refresh_token = g.user.generate_token(TokenType.REFRESH_TOKEN)
user_obj = g.user.to_dict()
user_obj['hash_password'] = ''
return make_success_response({
'access_token': access_token,
'refresh_token': refresh_token,
'user': user_obj,
})
@app.route('/plynx/api/v0/users/<username>', methods=['GET'])
@handle_errors
@requires_auth
[docs]def get_user(username: str):
"""Get user info by username"""
user = UserCollectionManager.find_user_by_name(username)
if not user:
return make_fail_response('User not found'), 404
user_obj = user.to_dict()
is_admin = IAMPolicies.IS_ADMIN in g.user.policies
user_obj['_is_admin'] = is_admin
user_obj['_readonly'] = user._id != g.user._id and not is_admin
del user_obj['password_hash']
return make_success_response({
'user': user_obj,
})
@app.route('/plynx/api/v0/users', methods=['POST'])
@handle_errors
@requires_auth
[docs]def post_user():
"""Update user info"""
data = json.loads(request.data)
logger.info(data)
action = data.get('action', '')
old_password = data.get('old_password', '')
new_password = data.get('new_password', '')
if action == UserPostAction.MODIFY:
posted_user = User.from_dict(data['user'])
existing_user = UserCollectionManager.find_user_by_name(posted_user.username)
if not existing_user:
return make_fail_response('User not found'), 404
if g.user.username != posted_user.username and IAMPolicies.IS_ADMIN not in g.user.policies:
return make_fail_response('You don`t have permission to modify this user'), 401
if set(posted_user.policies) != set(existing_user.policies):
if IAMPolicies.IS_ADMIN not in g.user.policies:
return make_fail_response('You don`t have permission to modify policies'), 401
existing_user.policies = posted_user.policies
if new_password:
if not existing_user.verify_password(old_password):
return make_fail_response('Incorrect password'), 401
existing_user.hash_password(new_password)
existing_user.settings = posted_user.settings
existing_user.save()
if g.user.username == posted_user.username:
g.user = posted_user # pylint: disable=assigning-non-slot
is_admin = IAMPolicies.IS_ADMIN in g.user.policies
user_obj = existing_user.to_dict()
user_obj['_is_admin'] = is_admin
user_obj['_readonly'] = existing_user._id != g.user._id and not is_admin
del user_obj['password_hash']
return make_success_response({
'user': user_obj,
})
else:
raise Exception(f"Unknown action: `{action}`")
raise NotImplementedError("Nothing is to return")
@app.route('/plynx/api/v0/register', methods=['POST'])
@handle_errors
[docs]def post_register():
"""Register a new user"""
query = json.loads(request.data)
email = query['email'].lower()
username = query['username']
password = query['password']
try:
user = register_user(
username=username,
password=password,
email=email,
)
except RegisterUserException as ex:
return make_fail_response(
ex.message,
error_code=ex.error_code,
), 400
g.user = user # pylint: disable=assigning-non-slot
access_token = user.generate_token(TokenType.ACCESS_TOKEN)
refresh_token = user.generate_token(TokenType.REFRESH_TOKEN)
user_obj = user.to_dict()
user_obj['hash_password'] = ''
return make_success_response({
'access_token': access_token,
'refresh_token': refresh_token,
'user': user_obj,
})
@app.route('/plynx/api/v0/register_with_oauth2', methods=['POST'])
@handle_errors
[docs]def post_register_with_oauth2():
"""Register a new user"""
query = json.loads(request.data)
token = query["token"]
try:
idinfo = id_token.verify_oauth2_token(token, g_requests.Request(), GOOGLE_CLIENT_ID)
except ValueError:
# Invalid token
return make_fail_response("Invalid request"), 400
if not idinfo["email_verified"]:
return make_fail_response("Unable to verify the email"), 401
username = idinfo['sub']
user = UserCollectionManager.find_user_by_name(username)
if not user:
logger.info("The user does not exist. Creating a new one.")
user = register_user(
username=username,
password="",
email=idinfo["email"],
picture=idinfo["picture"],
is_oauth=True,
display_name=idinfo["name"],
)
demo_user_manager.create_demo_templates(user)
if user.settings.picture != idinfo["picture"]:
user.settings.picture = idinfo["picture"]
user.save()
g.user = user # pylint: disable=assigning-non-slot
access_token = user.generate_token(TokenType.ACCESS_TOKEN)
refresh_token = user.generate_token(TokenType.REFRESH_TOKEN)
user_obj = user.to_dict()
user_obj['hash_password'] = ''
return make_success_response({
'access_token': access_token,
'refresh_token': refresh_token,
'user': user_obj,
})