A Democratic Social Network. Currently available at https://democraticnet.de The Test Server is available at https://test.democraticnet.de
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
DemNet/main.py

754 lines
26 KiB

"""
This module implements
the Flask app serving the
dynamic HTML Files.
"""
from functools import wraps
import json
import subprocess
import requests
import re
from pygit2 import Repository
from flask import Flask, url_for, redirect, request, g, render_template, session, Markup
import flask
from peewee import *
from Server.Markdown import markdown
from Server.Database import *
from Server.API import api_app
from Server.Files import ALLOWED_FILES, accepted_file_types
from clock import init
SECRET_KEY = os.environ["SECRET_KEY"]
DEBUG = "DEBUG" in os.environ
GITLAB_URI = os.environ["GITLAB_URI"]
GITLAB_TOKEN = os.environ["GITLAB_TOKEN"]
DEMNET_ID = os.environ["DEMNET_ID"]
DEMNET_LOCATION = os.environ["DEMNET_LOCATION"]
USERS_WITH_ADMIN_ACCESS = os.environ["DEMNET_ADMINS"].split(";")
REGISTRATION = os.environ.get("REGISTRATION", False) == "enabled"
app = Flask ( __name__
, static_folder = "static"
, static_url_path = "/static"
, template_folder = "output"
)
app.register_blueprint(api_app, url_prefix = "/api/v0")
app.config.from_object(__name__)
app.add_template_global(User, "User")
app.add_template_global(USERS_WITH_ADMIN_ACCESS, "ADMIN_USERS")
repo = Repository(os.getcwd())
app.add_template_global(repo.head.name.rsplit("/", 1)[1], "branch_name")
@app.template_filter("markdown")
def markdown_filter(text):
"""Markdown filter
"""
return Markup(markdown(text))
app.add_template_global(User, "User")
@app.before_request
def before_request():
"""Connect to the Database
"""
g.db = database
if g.db.is_closed():
g.db.connect()
@app.after_request
def after_request(response):
"""Disconnect from the databse, if the connection exists.
"""
if not g.db.is_closed():
g.db.close()
return response
# Routes used by the average users:
def login_required(endpoint):
"""Decorate any endpoint with this, that requires
an authenticated user to use.
If the user is not authenticated, they are redirected to /login.
"""
@wraps(endpoint)
def inner(*args, **kwargs):
if not session.get("authenticated", default = False):
return redirect(url_for("login", refer = request.full_path))
elif has_unaccepted_books():
return redirect(url_for("agreements", refer = request.full_path))
else:
return endpoint(*args, **kwargs)
return inner
def admin_required(endpoint):
"""Decorate any endpoint with this, that requires
an admin user.
Requires endpoint to be decorated by login_required
before.
"""
@wraps(endpoint)
def inner(*args, **kwargs):
if not session["username"] in USERS_WITH_ADMIN_ACCESS:
return redirect(url_for("index"))
else:
return endpoint(*args, **kwargs)
return inner
def has_unaccepted_books() -> bool:
if session.get("authenticated", default = False):
return User.get_by_id(session["username"]).agreements.where(AcceptedLastChanges.accepted == False).exists()
else:
return False
app.add_template_global(has_unaccepted_books, "has_unaccepted_books")
@app.route("/login", methods=["POST", "GET"])
def login():
"""Endpoint to login a user.
A user needs the username and password to login.
This is also the page anyone sees, who is not
logged in.
"""
try:
refer = request.values.get("refer", default = None)
if refer is None or not refer.startswith("/"):
refer = "/"
if request.method == "GET":
failed_already = request.values.get("failed_already") == "true"
response = render_template( "login.html"
, failed_already = failed_already
, refer = refer
, registration = REGISTRATION
)
else:
username = request.values["username"]
password = request.values["password"]
user = User.get(User.name == username)
if user.can_login(password):
session["authenticated"] = True
session["username"] = user.name
response = ""
else:
raise DoesNotExist()
except DoesNotExist:
return "", 403
except KeyError:
return "data not provided", 400
except Exception as error:
after_request("")
raise error
else:
return response, 200
@app.route("/agreements", methods=["GET"])
def agreements():
"""Prompt the user to accept
yet unaccepted books.
"""
try:
if session.get("authenticated", default = False):
response = render_template("agreements.html"
, refer = request.values.get("refer", default = "")
)
else:
response = redirect(url_for("login"))
except Exception as error:
raise error
else:
return response
@app.route("/agreements/<name>", methods=["GET"])
def agreement(name):
"""
View a single agreement
"""
try:
book = Book.get_by_id(name)
response = render_template("agreement.html", book = book)
except Exception as e:
raise e
return response
@app.route("/logout", methods=["GET"])
@login_required
def logout():
"""Remove all login details from the session.
This does the opposite of /login.
"""
try:
session["authenticated"] = False
session["username"] = None
except Exception as error:
after_request("")
raise error
else:
return redirect("/login")
### USER Resources
@app.route("/settings", methods=["GET", "POST"])
@login_required
def settings():
"""Settings of the user.
"""
try:
user = User.get_by_id(session["username"])
if request.method == "POST":
user.public = request.values.get("public", "off") == "on"
user.save()
response = redirect("/settings")
else:
response = render_template("settings.html", user = user)
except Exception as error:
raise error
else:
return response
@app.route("/change_password", methods=["GET", "POST"])
@login_required
def change_password():
"""Change the password as an authenticated user.
This requires the user to provide their old password
and repeat their new one twice.
After that, the password is hashed with a new salt
and stored in the database to replace the old password.
"""
try:
if request.method == "GET":
response = render_template("change_password.html")
else:
user = User.get_by_id(session["username"])
password = request.form["password"]
password = hash_passwords(password, user.salt)
new_passsword = request.form["new_passsword"]
new_repeated_password = request.form["new_repeated_password"]
if password == user.password:
if new_passsword == new_repeated_password:
new_salt = SHA256.new(data = get_random_bytes(2**3)).hexdigest()
new_passsword = hash_passwords(new_passsword, new_salt)
user.password = new_passsword
user.salt = new_salt
user.save()
response = "Done"
else:
response = "Passwords don't match"
else:
response = "Invalid current password"
except KeyError:
return "Data not provided"
except Exception as error:
after_request("")
raise error
else:
return response
def fetch_messages(messages, page = 1, search = ""):
"""Filter all the messages,
that contain the search in their title, content or keywords.
Then paginate them by 20 messages per page starting at page 1
and order them from most recent downward.
"""
messages = messages.where(Message.response_to >> None)
messages = list(messages)
if search != "":
matches = { m.id : m.match_index(search) for m in messages }
messages = list(filter(lambda m: matches[m.id] != 0, messages))
messages = sorted(messages, key = lambda m: matches[m.id], reverse = True)
else:
messages = sorted(messages, key = lambda m: m.id, reverse = True)
if not session.get("authenticated", False):
messages = list(filter(lambda m: m.author.public is None or m.author.public, messages))
return messages[(page-1)*20:(page-1)*20+20]
@app.route("/", methods=["GET"])
def index():
"""Endpoint for the Feed.
It serves index.html.
The client can provide these arguments:
- page : int, the page by which to paginate
with a page size of 20 Messages
- search : str, filter out all Messages, that don't match this search
in their title, content or keywords.
"""
try:
page = int(request.values.get("page", 1))
search = request.values.get("search", "")
feed = list(fetch_messages(Message.select(), page = page, search = search ))
message = request.values.get("message")
response = render_template("index.html", feed = feed, message = message, page = page)
except ValueError:
return "data format invalid"
except KeyError:
return "data not provided"
except Exception as error:
after_request("")
raise error
else:
return response
@app.route("/read/<message_id>", methods=["GET"])
def read(message_id : str):
"""Renders the post with message_id
in read.html and returns that.
"""
try:
try:
message = Message.get_by_id(int(message_id))
except ValueError:
message = Message.get(Message.hash == message_id)
logged_in = session.get("authenticated", default = False)
username = session.get("username", default = None)
is_author = logged_in and username == message.author.name
if message.views != None:
message.views += 1
else:
message.views = 0
message.save()
response = render_template ( "read.html"
, message = message
, is_author = is_author
)
except DoesNotExist:
return "Message id doesn't exists"
except Exception as error:
after_request("")
raise error
else:
return response
@app.route("/profile/<username>", methods=["GET"])
def profile(username):
"""This endpoint responds
with all the messages by a user
and possibly some settings for that user.
"""
try:
page = int(request.values.get("page", 1))
user = User.get_by_id(username)
if session.get("authenticated", False) or user.public is None or user.public:
messages = fetch_messages(user.messages, page = page)
response = render_template("profile.html", user = user, messages = messages)
else:
response = redirect("/login")
except DoesNotExist:
return redirect(url_for("index", message = "User doesn't exist"))
except ValueError:
return "data format invalid"
except Exception as error:
response = redirect(url_for("index", message = "An unknown error occured. Please contact the maintainers."))
after_request("")
raise error
return response
@app.route("/follow/<username>", methods=["POST"])
@login_required
def follow(username : str):
"""Create a new Follower entry into the database
between the authenticated and the provided user.
"""
try:
assert username != session["username"], "same"
follower = User.get_by_id(session["username"])
following = User.get_by_id(username)
assert not follower.is_following(following), "already"
follower.follow(following)
except AssertionError as error:
if error.args[0] == "same":
response = redirect(url_for( "index", message = "You can't follow yourself" ))
else:
response = redirect(url_for( "index", message = "You are already following this user"))
return response
except DoesNotExist:
response = redirect(
url_for ( "index"
, message = "You want to follow a user, that doesn't exist." )
)
return response
except Exception as error:
after_request("")
raise error
else:
response = redirect(url_for( "index", message = f"You are following {username}" ))
return response
@app.route("/unfollow/<username>", methods=["POST"])
@login_required
def unfollow(username : str):
"""Undo follow endpoint.
Delete any Follower entry between
<username> and the logged in user,
if such exists.
Otherwise do nothing.
"""
try:
follower = User.get_by_id(session["username"])
followed = User.get_by_id(username)
assert follower.is_following(followed)
assert follower.unfollow(followed)
except AssertionError:
return redirect(url_for("index", message = "Can't unfollow, whom you didn't follow"))
except DoesNotExist:
return redirect(url_for("index", message = "Can't unfollow someone, who doesn't exist"))
except Exception as error:
after_request("")
raise error
else:
return redirect(url_for("index", message = f"You no longer follow {username}"))
@app.route("/publish", methods=["GET", "POST"])
@login_required
def publish():
"""Create a new post.
Renders publish.html if method == GET.
Otherwise a new post is created:
Mandatory arguments for that:
- title : str
- content : str Markdown Content
Possible other arguments:
- response_to : int ID of the post to respond to.
- keywords : str Keywords string, seperated by #.
"""
try:
response_to = request.values.get("response_to", None)
message = request.values.get("message", None)
if message is not None:
message = Message.get_by_id(int(message))
if request.method == "GET":
response = render_template( "publish.html"
, response_to = Message.get_by_id(int(response_to)) if response_to != None else None
, accepted_file_types = ",".join(accepted_file_types)
, message = message
)
else:
if response_to:
response_to = int(response_to)
title = request.form["title"]
content = request.form["content"]
keywords = request.form.get("keywords", "").upper()
author = User.get_by_id(session["username"])
author.publish(title, content, keywords = keywords, response_to = response_to )
# Redirect to the thread, to which one responded or feed.
if type(response_to) == int:
response_message = Message.get_by_id(response_to)
while response_message.response_to != None:
response_message = Message.get_by_id(response_message.response_to)
response = redirect(f"/read/{response_message.id}?message=\"You have responded\"")
else:
response = redirect(url_for( "index", message = "You have posted" ))
except DoesNotExist:
message = "You can't respond to a message, that doesn't exist"
return redirect(url_for("index", message = message))
except KeyError:
return "data not provided"
except ValueError:
return "data format invalid"
except Exception as error:
after_request("")
raise error
else:
return response
@app.route("/files_dashboard", methods=["GET"])
@login_required
def files_dashboard():
"""Serve the static file at output/files.html.
This file seperatly communicates with the Files API.
"""
try:
response = render_template("files.html", ALLOWED_FILES=ALLOWED_FILES)
except Exception as error:
after_request("")
raise error
else:
return response
@app.route("/unpublish/<int:message_id>", methods=["POST"])
@login_required
def unpublish(message_id : int):
"""Delete a post if there are now responses to the post.
"""
try:
message = Message.get_by_id(message_id)
if session["username"] == message.author.name:
message.delete_instance()
response = "Done"
else:
response = "You don't have the right to do this"
except DoesNotExist:
return "Doesn't exist"
except IntegrityError:
return "You can't delete an article, that is subject of debate."
except Exception as error:
after_request("")
raise error
else:
return redirect(url_for("index", message = response))
### VOTE Resource
@app.route("/vote", methods=["GET", "POST"])
@login_required
def vote():
"""Render vote.html if GET method.
Depending on the state of the current election,
it will be rendered differently.
If the method is POST,
there is one required argument:
- choice : JSON List of proposal IDs.
The List is cut off at the NoneOfTheOtherOptions
at the end.
Example:
["1","2","3"]
is a vote for:
1. the proposal of ID 1
2. the proposal for ID 2
3. the proposal for ID 3
4. NoneOfTheOtherOptions
"""
try:
user = User.get_by_id(session["username"])
election = current_election()
voting_phase_start = election.proposal_phase_start + length_proposal_phase
voting_phase_end = voting_phase_start + length_voting_phase
can_vote = election.in_voting_phase() and not user.has_voted(election)
if request.method == "GET":
can_propose = election.in_proposal_phase()
response = render_template( "vote.html"
, can_vote = can_vote
, can_propose = can_propose
, voting_phase_start = voting_phase_start
, voting_phase_end = voting_phase_end
, election = election
)
elif request.method == "POST":
choice = json.loads(request.values["vote"])
if user.vote(election, choice):
response = redirect(url_for("index", message = "Voted"))
else:
message = "You have already voted or do not have the right to vote."
response = redirect(url_for("index", message = message))
except AssertionError:
message = "Invalid Merge Request selected. Resolve all conflicts please."
return redirect(url_for("index", message = message))
except DoesNotExist:
return redirect(url_for("index", message = "Election not found"))
except ValueError:
return redirect(url_for("index", message = "Data format invalid"))
except KeyError:
return redirect(url_for("index", messsage = "Data not provided"))
except Exception as error:
after_request("")
raise error
else:
return response
@app.route("/vote/propose", methods = ["POST"])
def propose():
"""POST only endpoint
to create a new proposal for the current election
from a Merge Request on GitLab.
Mandatory arguments:
- merge_request_iid : int the internal id of the merge request
The merge request is used to get the title,
content and the patch for the proposal.
The title and content are used to create a new
post by the logged in user, while also
to create the new Proposal entry.
The patch is used to apply this proposal
later, if it wins the election.
The patch is verified with git apply --check
and then the proposal entry is created.
"""
try:
user = User.get_by_id(session["username"])
election = current_election()
mr_iid = request.values["merge_request_iid"]
merge_request = requests.get( f"{GITLAB_URI}/projects/{DEMNET_ID}/merge_requests/{mr_iid}"
, headers = { "Private-Token" : GITLAB_TOKEN }
).json()
merge_message = "Unable to merge because GitLab cannot merge"
branch_message = "Target Branch isn't master"
assert merge_request["merge_status"] == "can_be_merged", merge_message
assert merge_request["target_branch"] == "master", branch_message
title = merge_request["title"]
description = merge_request["description"]
patch = requests.get(f"{DEMNET_LOCATION}/-/merge_requests/{mr_iid}.patch")
patch.encoding = "utf-8"
patch = patch.text
init()
open(f"/tmp/{mr_iid}.patch", "w+").write(patch)
assert not election.proposals.where(Proposal.patch == patch).exists(), "Exact same proposal was already made"
assert subprocess.run ( [f"git apply --check --whitespace=fix /tmp/{mr_iid}.patch"]
, shell = True
, cwd = os.environ["GIT_REPOSITORY"]
).returncode == 0, "Invalid Merge Request selected. Resolve all conflicts please. (git apply --check --whitespace=fix failed)"
if election.proposals.where(Proposal.title == title).exists():
same_title_proposals = election.proposals.where(Proposal.title.regexp(f"{title}( [0-9])?"))
index = map(lambda p: re.match(f"{title}( [0-9])?", p.title), same_title_proposals)
index = map(lambda m: 0 if m[1] is None else int(m[1]), index)
index = max(index) + 1
title = f"{title} {index}"
election.propose( title = title
, description = description
, patch = patch
)
user.publish( title
, description + f"\n\nThis is a propsal for the #{election.id} election "
, keywords = f"#election{election.id}" + '#'.join(merge_request["labels"])
)
response = redirect(url_for("index", message = "Your proposal was made"))
except AssertionError as error:
message = error.args[0]
return redirect(url_for("index", message = message))
except subprocess.CalledProcessError:
message = "Cannot propose. git apply --check --whitespace=fix failed"
return redirect(url_for("index", message = message))
except Exception as error:
after_request("")
raise error
else:
return response
@app.route("/vote/patch/<int:id>", methods = ["GET"])
def patch(id):
"""Return the patch for a proposal by the id
"""
try:
response = flask.make_response(Proposal.get_by_id(id).patch)
response.headers.set("Content-Type", "text/plain")
except DoesNotExist:
return "Proposal does not exist", 404
else:
return response
@app.route("/grant_vote", methods = ["GET","POST"])
@login_required
@admin_required
def grant_vote():
try:
if request.method == "POST":
user = User.get_by_id(session["username"])
has_vote = user.has_vote or user.has_vote is None
user.has_vote = not has_vote
user.save()
response = redirect("/grant_vote")
else:
users = User.select()
users = sorted(users, reverse = True, key = lambda user: int(user.has_vote is None or user.has_vote))
response = render_template("authorize.html", users = users)
except Exception as e:
raise e
else:
return response
@app.route("/register", methods=["POST"])
def register_route():
"""Register a new user.
Every user has to fill out these fields (mandatory arguments):
- username : str unique string to identify a user
- first_name : str legal first name of the user
- last_name : str legal last name of the user
- email : str email
- accept : str "1" if the terms of registration are accepted.
- password : str the first password.
"""
try:
assert REGISTRATION, "Registration not enabled"
username = request.values["username"]
first_name = request.values["first_name"]
last_name = request.values["last_name"]
email = request.values["email"]
password = request.values["password"]
assert request.values["accept"] == "1", "Terms not accepted"
response = register( username
, first_name
, last_name
, email
, None
, password
, connected = True
)
session["authenticated"] = response
session["username"] = username
response = "1", 200
except AssertionError as e:
return e.args[0], 400
except KeyError:
return "data not provided", 400
except Exception as error:
after_request("")
raise error
else:
return response