Kubernetes Web View to list and view all Kubernetes resources https://kube-web-view.readthedocs.io/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
4.9 KiB

import logging
import time
from pathlib import Path
from typing import List
from urllib.parse import urljoin
import requests
from pykube import HTTPClient
from pykube import KubeConfig
from requests.auth import AuthBase
logger = logging.getLogger(__name__)
class OAuth2BearerTokenAuth(AuthBase):
"""Dynamic authentication loading OAuth Bearer token from file (potentially mounted from a Kubernetes secret)."""
def __init__(self, token_path: Path):
self.token_path = token_path
def __call__(self, request):
if "Authorization" in request.headers:
# do not overwrite any existing Authorization header
return request
with self.token_path.open() as fd:
token = fd.read().strip()
request.headers["Authorization"] = f"Bearer {token}"
return request
class Cluster:
def __init__(
self, name: str, api: HTTPClient, labels: dict = None, spec: dict = None
self.name = name
self.api = api
self.labels = labels or {}
self.spec = spec or {}
class StaticClusterDiscoverer:
def __init__(self, clusters: dict):
self._clusters = []
for cluster_name, url in clusters.items():
config = KubeConfig.from_url(url)
client = HTTPClient(config)
cluster = Cluster(cluster_name, client)
def get_clusters(self):
return self._clusters
class ServiceAccountNotFound(Exception):
class ServiceAccountClusterDiscoverer:
def __init__(self):
self._clusters = []
config = KubeConfig.from_service_account()
except FileNotFoundError:
# we are not running inside a cluster
raise ServiceAccountNotFound()
client = HTTPClient(config)
cluster = Cluster("local", client)
def get_clusters(self):
return self._clusters
class ClusterRegistryDiscoverer:
def __init__(
cluster_registry_url: str,
oauth2_bearer_token_path: Path,
self._url = cluster_registry_url
self._oauth2_bearer_token_path = oauth2_bearer_token_path
self._cache_lifetime = cache_lifetime
self._last_cache_refresh = 0
self._clusters: List[Cluster] = []
self._session = requests.Session()
if self._oauth2_bearer_token_path:
self._session.auth = OAuth2BearerTokenAuth(self._oauth2_bearer_token_path)
def refresh(self):
response = self._session.get(
urljoin(self._url, "/kubernetes-clusters"), timeout=10
clusters = []
for row in response.json()["items"]:
# only consider "ready" clusters
if row.get("lifecycle_status", "ready") == "ready":
config = KubeConfig.from_url(row["api_server_url"])
client = HTTPClient(config)
client.session.auth = OAuth2BearerTokenAuth(
labels = {}
for key in (
if key in row:
labels[key.replace("_", "-")] = row[key]
clusters.append(Cluster(row["alias"], client, labels, row))
self._clusters = clusters
self._last_cache_refresh = time.time()
except Exception:
logger.exception(f"Failed to refresh from cluster registry {self._url}")
def get_clusters(self):
now = time.time()
if now - self._last_cache_refresh > self._cache_lifetime:
return self._clusters
class KubeconfigDiscoverer:
def __init__(self, kubeconfig_path: Path, contexts: set):
self._path = kubeconfig_path
self._contexts = contexts
def get_clusters(self):
# Kubernetes Python client expects "vintage" string path
config_file = str(self._path) if self._path else None
config = KubeConfig.from_file(config_file)
for context in config.contexts:
if self._contexts and context not in self._contexts:
# filter out
# create a new KubeConfig with new "current context"
context_config = KubeConfig(config.doc, context)
client = HTTPClient(context_config)
cluster = Cluster(context, client)
yield cluster
class MockDiscoverer:
def get_clusters(self):
for i in range(3):
yield Cluster(f"mock-cluster-{i}", client=None)