Lightweight Python 3+ client library for Kubernetes (pykube-ng) https://pykube.readthedocs.io/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

261 lines
8.4 KiB

import json
from collections import namedtuple
from typing import Union
from urllib.parse import urlencode
from .exceptions import ObjectDoesNotExist
from .http import HTTPClient
all_ = object()
everything = object()
now = object()
class Table:
"""
Tabular resource representation
See https://kubernetes.io/docs/reference/using-api/api-concepts/#receiving-resources-as-tables
"""
def __init__(self, api_obj_class, obj: dict):
assert obj["kind"] == "Table"
self.api_obj_class = api_obj_class
self.obj = obj
def __repr__(self) -> str:
return "<Table of {kind} at {address}>".format(
kind=self.api_obj_class.kind, address=hex(id(self))
)
@property
def columns(self):
return self.obj["columnDefinitions"]
@property
def rows(self):
return self.obj["rows"]
class BaseQuery:
def __init__(self, api: HTTPClient, api_obj_class, namespace: str = None):
self.api = api
self.api_obj_class = api_obj_class
self.namespace = namespace
self.selector = everything
self.field_selector = everything
def __repr__(self) -> str:
return "<Query of {kind} at {address}>".format(
kind=self.api_obj_class.kind, address=hex(id(self))
)
def all(self) -> "BaseQuery":
return self._clone()
def filter(
self,
namespace: str = None,
selector: Union[str, dict] = None,
field_selector: Union[str, dict] = None,
) -> "BaseQuery":
"""
Filter objects by namespace, labels, or fields
:param namespace: Namespace to filter by (pass pykube.all to get objects in all namespaces)
:param selector: Label selector, can be a dictionary of label names/values
"""
clone = self._clone()
if namespace is not None:
clone.namespace = namespace
if selector is not None:
clone.selector = selector
if field_selector is not None:
clone.field_selector = field_selector
return clone
def _clone(self, cls=None):
if cls is None:
cls = self.__class__
clone = cls(self.api, self.api_obj_class, namespace=self.namespace)
clone.selector = self.selector
clone.field_selector = self.field_selector
return clone
def _build_api_url(self, params: dict = None):
if params is None:
params = {}
if self.selector is not everything:
params["labelSelector"] = as_selector(self.selector) # type: ignore
if self.field_selector is not everything:
params["fieldSelector"] = as_selector(self.field_selector) # type: ignore
query_string = urlencode(params)
return "{}{}".format(
self.api_obj_class.endpoint,
f"?{query_string}" if query_string else "",
)
class Query(BaseQuery):
def get_by_name(self, name: str):
"""
Get object by name, raises ObjectDoesNotExist if not found
"""
kwargs = {
"url": f"{self.api_obj_class.endpoint}/{name}",
"namespace": self.namespace,
}
if self.api_obj_class.base:
kwargs["base"] = self.api_obj_class.base
if self.api_obj_class.version:
kwargs["version"] = self.api_obj_class.version
r = self.api.get(**kwargs)
if not r.ok:
if r.status_code == 404:
raise ObjectDoesNotExist(f"{name} does not exist.")
self.api.raise_for_status(r)
return self.api_obj_class(self.api, r.json())
def get(self, *args, **kwargs):
"""
Get a single object by name, namespace, label, ..
"""
if "name" in kwargs:
return self.get_by_name(kwargs["name"])
clone = self.filter(*args, **kwargs)
num = len(clone)
if num == 1:
return clone.query_cache["objects"][0]
if not num:
raise ObjectDoesNotExist("get() returned zero objects")
raise ValueError("get() more than one object; use filter")
def get_or_none(self, *args, **kwargs):
"""
Get object by name, return None if not found
"""
try:
return self.get(*args, **kwargs)
except ObjectDoesNotExist:
return None
def watch(self, since=None, *, params=None):
query = self._clone(WatchQuery)
query.params = params
if since is now:
query.resource_version = self.response["metadata"]["resourceVersion"]
elif since is not None:
query.resource_version = since
return query
def execute(self, **kwargs):
kwargs["url"] = self._build_api_url()
if self.api_obj_class.base:
kwargs["base"] = self.api_obj_class.base
if self.api_obj_class.version:
kwargs["version"] = self.api_obj_class.version
if self.namespace is not None and self.namespace is not all_:
kwargs["namespace"] = self.namespace
r = self.api.get(**kwargs)
r.raise_for_status()
return r
def as_table(self) -> Table:
"""
Execute query and return result as Table (similar to what kubectl does)
See https://kubernetes.io/docs/reference/using-api/api-concepts/#receiving-resources-as-tables
"""
response = self.execute(
headers={"Accept": "application/json;as=Table;v=v1beta1;g=meta.k8s.io"}
)
return Table(self.api_obj_class, response.json())
def iterator(self):
"""
Execute the API request and return an iterator over the objects. This
method does not use the query cache.
"""
for obj in self.execute().json().get("items") or []:
yield self.api_obj_class(self.api, obj)
@property
def query_cache(self):
if not hasattr(self, "_query_cache"):
cache = {"objects": []}
cache["response"] = self.execute().json()
for obj in cache["response"].get("items") or []:
cache["objects"].append(self.api_obj_class(self.api, obj))
self._query_cache = cache
return self._query_cache
def __len__(self):
return len(self.query_cache["objects"])
def __iter__(self):
return iter(self.query_cache["objects"])
@property
def response(self):
return self.query_cache["response"]
class WatchQuery(BaseQuery):
def __init__(self, *args, **kwargs):
self.resource_version = kwargs.pop("resource_version", None)
self.params = None
super(WatchQuery, self).__init__(*args, **kwargs)
self._response = None
def object_stream(self):
params = dict(self.params or {}) # shallow clone for local use
params["watch"] = "true"
if self.resource_version is not None:
params["resourceVersion"] = self.resource_version
kwargs = {"url": self._build_api_url(params=params), "stream": True}
if self.namespace is not all_:
kwargs["namespace"] = self.namespace
if self.api_obj_class.version:
kwargs["version"] = self.api_obj_class.version
r = self.api.get(**kwargs)
self.api.raise_for_status(r)
self._response = r
WatchEvent = namedtuple("WatchEvent", "type object")
for line in r.iter_lines():
we = json.loads(line.decode("utf-8"))
yield WatchEvent(
type=we["type"], object=self.api_obj_class(self.api, we["object"])
)
def __iter__(self):
return iter(self.object_stream())
@property
def response(self):
return self._response
def as_selector(value: Union[str, dict]) -> str:
if isinstance(value, str):
return value
s = []
for k, v in value.items():
bits = k.split("__")
assert len(bits) <= 2, "too many __ in selector"
if len(bits) == 1:
label = bits[0]
op = "eq"
else:
label = bits[0]
op = bits[1]
# map operator to selector
if op == "eq":
s.append(f"{label}={v}")
elif op == "neq":
s.append(f"{label} != {v}")
elif op == "in":
s.append("{} in ({})".format(label, ",".join(sorted(v))))
elif op == "notin":
s.append("{} notin ({})".format(label, ",".join(sorted(v))))
else:
raise ValueError(f"{op} is not a valid comparison operator")
return ",".join(s)