A minimal statuspage to show uptime and latency for HTTP endpoints https://status.demo.j-serv.de/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
2.7 KiB

import collections
import logging
from pathlib import Path
import aiohttp_jinja2
import jinja2
from aiohttp import web
from statuspage import __version__
logger = logging.getLogger(__name__)
HEALTH_PATH = "/health"
CONFIG = "config"
routes = web.RouteTableDef()
async def get_index(request):
config = request.app[CONFIG]
probe_results_path = Path(config.data_path) / "probe-results.tsv"
last_results = {}
previous_results = collections.defaultdict(lambda: collections.deque(maxlen=100))
max_durations = collections.defaultdict(float)
with probe_results_path.open() as fd:
for line in fd:
cols = line.split("\t")
result = {
"time": cols[0],
"result": cols[2],
"status_code": int(cols[3]),
"duration": float(cols[4]),
"message": cols[5],
last_results[cols[1]] = result
except Exception as e:
logger.exception(f"Failed to read probe result: {e}")
except Exception as e:
logger.exception(f"Failed to read probe results file {probe_results_path}: {e}")
for key, results in previous_results.items():
for result in results:
if result["duration"] > max_durations[key]:
max_durations[key] = result["duration"]
return {
"sites": config.sites,
"last_results": last_results,
"previous_results": previous_results,
"max_durations": max_durations,
async def get_health(request):
return web.Response(text="OK")
def get_app(config):
templates_paths = [str(Path(__file__).parent / "templates")]
if config.templates_path:
# prepend the custom template path so custom templates will overwrite any default ones
templates_paths.insert(0, config.templates_path)
static_assets_path = Path(__file__).parent / "templates" / "assets"
if config.static_assets_path:
# overwrite assets path
static_assets_path = Path(config.static_assets_path)
app = web.Application()
env = aiohttp_jinja2.get_env(app)
env.globals["version"] = __version__
app.router.add_static("/assets", static_assets_path)
app[CONFIG] = config
return app