kopf package

The main Kopf module for all the exported functions & classes.

kopf.register(fn, *, id=None, param=None, errors=None, timeout=None, retries=None, backoff=None, labels=None, annotations=None, when=None)[source]

Register a function as a sub-handler of the currently executed handler.

Example:

@kopf.on.create('kopfexamples')
def create_it(spec, **kwargs):
    for task in spec.get('tasks', []):

        def create_single_task(task=task, **_):
            pass

        kopf.register(id=task, fn=create_single_task)

This is efficiently an equivalent for:

@kopf.on.create('kopfexamples')
def create_it(spec, **kwargs):
    for task in spec.get('tasks', []):

        @kopf.subhandler(id=task)
        def create_single_task(task=task, **_):
            pass
Return type:

Callable[..., Union[object, None, Coroutine[None, None, Optional[object]]]]

Parameters:
async kopf.execute(*, fns=None, handlers=None, registry=None, lifecycle=None, cause=None)[source]

Execute the handlers in an isolated lifecycle.

This function is just a public wrapper for execute with multiple ways to specify the handlers: either as the raw functions, or as the pre-created handlers, or as a registry (as used in the object handling).

If no explicit functions or handlers or registry are passed, the sub-handlers of the current handler are assumed, as accumulated in the per-handler registry with @kopf.subhandler.

If the call to this method for the sub-handlers is not done explicitly in the handler, it is done implicitly after the handler is exited. One way or another, it is executed for the sub-handlers.

Return type:

None

Parameters:
  • fns (Iterable[Callable[[...], object | None | Coroutine[None, None, object | None]]] | None) –

  • handlers (Iterable[ChangingHandler] | None) –

  • registry (ChangingRegistry | None) –

  • lifecycle (LifeCycleFn | None) –

  • cause (Cause | None) –

kopf.daemon(__group_or_groupversion_or_name=None, __version_or_name=None, __name=None, *, group=None, version=None, kind=None, plural=None, singular=None, shortcut=None, category=None, id=None, param=None, errors=None, timeout=None, retries=None, backoff=None, initial_delay=None, cancellation_backoff=None, cancellation_timeout=None, cancellation_polling=None, labels=None, annotations=None, when=None, field=None, value=None, registry=None)[source]

@kopf.daemon() decorator for the background threads/tasks.

Return type:

Callable[[Callable[..., Union[object, None, Coroutine[None, None, Optional[object]]]]], Callable[..., Union[object, None, Coroutine[None, None, Optional[object]]]]]

Parameters:
  • __group_or_groupversion_or_name (str | Marker | None) –

  • __version_or_name (str | Marker | None) –

  • __name (str | Marker | None) –

  • group (str | None) –

  • version (str | None) –

  • kind (str | None) –

  • plural (str | None) –

  • singular (str | None) –

  • shortcut (str | None) –

  • category (str | None) –

  • id (str | None) –

  • param (Any | None) –

  • errors (ErrorsMode | None) –

  • timeout (float | None) –

  • retries (int | None) –

  • backoff (float | None) –

  • initial_delay (float | None) –

  • cancellation_backoff (float | None) –

  • cancellation_timeout (float | None) –

  • cancellation_polling (float | None) –

  • labels (Mapping[str, str | MetaFilterToken | Callable[[...], bool]] | None) –

  • annotations (Mapping[str, str | MetaFilterToken | Callable[[...], bool]] | None) –

  • when (Callable[[...], bool] | None) –

  • field (None | str | Tuple[str, ...] | List[str]) –

  • value (None | Any | MetaFilterToken | Callable[[...], bool]) –

  • registry (OperatorRegistry | None) –

kopf.timer(__group_or_groupversion_or_name=None, __version_or_name=None, __name=None, *, group=None, version=None, kind=None, plural=None, singular=None, shortcut=None, category=None, id=None, param=None, errors=None, timeout=None, retries=None, backoff=None, interval=None, initial_delay=None, sharp=None, idle=None, labels=None, annotations=None, when=None, field=None, value=None, registry=None)[source]

@kopf.timer() handler for the regular events.

Return type:

Callable[[Callable[..., Union[object, None, Coroutine[None, None, Optional[object]]]]], Callable[..., Union[object, None, Coroutine[None, None, Optional[object]]]]]

Parameters:
kopf.index(__group_or_groupversion_or_name=None, __version_or_name=None, __name=None, *, group=None, version=None, kind=None, plural=None, singular=None, shortcut=None, category=None, id=None, param=None, errors=None, timeout=None, retries=None, backoff=None, labels=None, annotations=None, when=None, field=None, value=None, registry=None)[source]

@kopf.index() handler for the indexing callbacks.

Return type:

Callable[[Callable[..., Union[object, None, Coroutine[None, None, Optional[object]]]]], Callable[..., Union[object, None, Coroutine[None, None, Optional[object]]]]]

Parameters:
kopf.configure(debug=None, verbose=None, quiet=None, log_format=LogFormat.FULL, log_prefix=False, log_refkey=None)[source]
Return type:

None

Parameters:
  • debug (bool | None) –

  • verbose (bool | None) –

  • quiet (bool | None) –

  • log_format (LogFormat) –

  • log_prefix (bool | None) –

  • log_refkey (str | None) –

class kopf.LogFormat(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Log formats, as specified on CLI.

PLAIN = '%(message)s'[source]
FULL = '[%(asctime)s] %(name)-20.20s [%(levelname)-8.8s] %(message)s'[source]
JSON = '-json-'[source]
kopf.login_via_pykube(*, logger, **_)[source]
Return type:

Optional[ConnectionInfo]

Parameters:
kopf.login_via_client(*, logger, **_)[source]
Return type:

Optional[ConnectionInfo]

Parameters:
kopf.login_with_kubeconfig(**_)[source]

A minimalistic login handler that can get raw data from a kubeconfig file.

Authentication capabilities can be limited to keep the code short & simple. No parsing or sophisticated multi-step token retrieval is performed.

This login function is intended to make Kopf runnable in trivial cases when neither pykube-ng nor the official client library are installed.

Return type:

Optional[ConnectionInfo]

Parameters:

_ (Any) –

kopf.login_with_service_account(**_)[source]

A minimalistic login handler that can get raw data from a service account.

Authentication capabilities can be limited to keep the code short & simple. No parsing or sophisticated multi-step token retrieval is performed.

This login function is intended to make Kopf runnable in trivial cases when neither pykube-ng nor the official client library are installed.

Return type:

Optional[ConnectionInfo]

Parameters:

_ (Any) –

exception kopf.LoginError[source]

Bases: Exception

Raised when the operator cannot login to the API.

class kopf.ConnectionInfo(server, ca_path=None, ca_data=None, insecure=None, username=None, password=None, scheme=None, token=None, certificate_path=None, certificate_data=None, private_key_path=None, private_key_data=None, default_namespace=None, priority=0, expiration=None)[source]

Bases: object

A single endpoint with specific credentials and connection flags to use.

Parameters:
  • server (str) –

  • ca_path (str | None) –

  • ca_data (bytes | None) –

  • insecure (bool | None) –

  • username (str | None) –

  • password (str | None) –

  • scheme (str | None) –

  • token (str | None) –

  • certificate_path (str | None) –

  • certificate_data (bytes | None) –

  • private_key_path (str | None) –

  • private_key_data (bytes | None) –

  • default_namespace (str | None) –

  • priority (int) –

  • expiration (datetime | None) –

server: str[source]
ca_path: Optional[str] = None[source]
ca_data: Optional[bytes] = None[source]
insecure: Optional[bool] = None[source]
username: Optional[str] = None[source]
password: Optional[str] = None[source]
scheme: Optional[str] = None[source]
token: Optional[str] = None[source]
certificate_path: Optional[str] = None[source]
certificate_data: Optional[bytes] = None[source]
private_key_path: Optional[str] = None[source]
private_key_data: Optional[bytes] = None[source]
default_namespace: Optional[str] = None[source]
priority: int = 0[source]
expiration: Optional[datetime] = None[source]
kopf.event(objs, *, type, reason, message='')[source]
Return type:

None

Parameters:
kopf.info(objs, *, reason, message='')[source]
Return type:

None

Parameters:
kopf.warn(objs, *, reason, message='')[source]
Return type:

None

Parameters:
kopf.exception(objs, *, reason='', message='', exc=None)[source]
Return type:

None

Parameters:
async kopf.spawn_tasks(*, lifecycle=None, indexers=None, registry=None, settings=None, memories=None, insights=None, identity=None, standalone=None, priority=None, peering_name=None, liveness_endpoint=None, clusterwide=False, namespaces=(), namespace=None, stop_flag=None, ready_flag=None, vault=None, memo=None, _command=None)[source]

Spawn all the tasks needed to run the operator.

The tasks are properly inter-connected with the synchronisation primitives.

Return type:

Collection[Task]

Parameters:
  • lifecycle (LifeCycleFn | None) –

  • indexers (OperatorIndexers | None) –

  • registry (OperatorRegistry | None) –

  • settings (OperatorSettings | None) –

  • memories (ResourceMemories | None) –

  • insights (Insights | None) –

  • identity (Identity | None) –

  • standalone (bool | None) –

  • priority (int | None) –

  • peering_name (str | None) –

  • liveness_endpoint (str | None) –

  • clusterwide (bool) –

  • namespaces (Collection[str | Pattern[str]]) –

  • namespace (str | Pattern[str] | None) –

  • stop_flag (Future | Event | Future | Event | None) –

  • ready_flag (Future | Event | Future | Event | None) –

  • vault (Vault | None) –

  • memo (object | None) –

  • _command (Coroutine[None, None, None] | None) –

async kopf.run_tasks(root_tasks, *, ignored=frozenset({}))[source]

Orchestrate the tasks and terminate them gracefully when needed.

The root tasks are expected to run forever. Their number is limited. Once any of them exits, the whole operator and all other root tasks should exit.

The root tasks, in turn, can spawn multiple sub-tasks of various purposes. They can be awaited, monitored, or fired-and-forgot.

The hung tasks are those that were spawned during the operator runtime, and were not cancelled/exited on the root tasks termination. They are given some extra time to finish, after which they are forcely terminated too. :rtype: None

Note

Due to implementation details, every task created after the operator’s startup is assumed to be a task or a sub-task of the operator. In the end, all tasks are forcely cancelled. Even if those tasks were created by other means. There is no way to trace who spawned what. Only the tasks that existed before the operator startup are ignored (for example, those that spawned the operator itself).

Parameters:
Return type:

None

async kopf.operator(*, lifecycle=None, indexers=None, registry=None, settings=None, memories=None, insights=None, identity=None, standalone=None, priority=None, peering_name=None, liveness_endpoint=None, clusterwide=False, namespaces=(), namespace=None, stop_flag=None, ready_flag=None, vault=None, memo=None, _command=None)[source]

Run the whole operator asynchronously.

This function should be used to run an operator in an asyncio event-loop if the operator is orchestrated explicitly and manually.

It is efficiently spawn_tasks + run_tasks with some safety.

Return type:

None

Parameters:
  • lifecycle (LifeCycleFn | None) –

  • indexers (OperatorIndexers | None) –

  • registry (OperatorRegistry | None) –

  • settings (OperatorSettings | None) –

  • memories (ResourceMemories | None) –

  • insights (Insights | None) –

  • identity (Identity | None) –

  • standalone (bool | None) –

  • priority (int | None) –

  • peering_name (str | None) –

  • liveness_endpoint (str | None) –

  • clusterwide (bool) –

  • namespaces (Collection[str | Pattern[str]]) –

  • namespace (str | Pattern[str] | None) –

  • stop_flag (Future | Event | Future | Event | None) –

  • ready_flag (Future | Event | Future | Event | None) –

  • vault (Vault | None) –

  • memo (object | None) –

  • _command (Coroutine[None, None, None] | None) –

kopf.run(*, loop=None, lifecycle=None, indexers=None, registry=None, settings=None, memories=None, insights=None, identity=None, standalone=None, priority=None, peering_name=None, liveness_endpoint=None, clusterwide=False, namespaces=(), namespace=None, stop_flag=None, ready_flag=None, vault=None, memo=None, _command=None)[source]

Run the whole operator synchronously.

If the loop is not specified, the operator runs in the event loop of the current _context_ (by asyncio’s default, the current thread). See: https://docs.python.org/3/library/asyncio-policy.html for details.

Alternatively, use asyncio.run(kopf.operator(...)) with the same options. It will take care of a new event loop’s creation and finalization for this call. See: asyncio.run().

Return type:

None

Parameters:
  • loop (AbstractEventLoop | None) –

  • lifecycle (LifeCycleFn | None) –

  • indexers (OperatorIndexers | None) –

  • registry (OperatorRegistry | None) –

  • settings (OperatorSettings | None) –

  • memories (ResourceMemories | None) –

  • insights (Insights | None) –

  • identity (Identity | None) –

  • standalone (bool | None) –

  • priority (int | None) –

  • peering_name (str | None) –

  • liveness_endpoint (str | None) –

  • clusterwide (bool) –

  • namespaces (Collection[str | Pattern[str]]) –

  • namespace (str | Pattern[str] | None) –

  • stop_flag (Future | Event | Future | Event | None) –

  • ready_flag (Future | Event | Future | Event | None) –

  • vault (Vault | None) –

  • memo (object | None) –

  • _command (Coroutine[None, None, None] | None) –

kopf.adopt(objs, owner=None, *, forced=False, strict=False, nested=None)[source]

The children should be in the same namespace, named after their parent, and owned by it.

Return type:

None

Parameters:
kopf.label(objs, labels=_UNSET.token, *, forced=False, nested=None, force=None)[source]

Apply the labels to the object(s).

Return type:

None

Parameters:
kopf.not_(fn)[source]
Return type:

TypeVar(_FnT, Callable[..., bool], Callable[..., bool])

Parameters:

fn (_FnT) –

kopf.all_(fns)[source]
Return type:

TypeVar(_FnT, Callable[..., bool], Callable[..., bool])

Parameters:

fns (Collection[_FnT]) –

kopf.any_(fns)[source]
Return type:

TypeVar(_FnT, Callable[..., bool], Callable[..., bool])

Parameters:

fns (Collection[_FnT]) –

kopf.none_(fns)[source]
Return type:

TypeVar(_FnT, Callable[..., bool], Callable[..., bool])

Parameters:

fns (Collection[_FnT]) –

kopf.get_default_lifecycle()[source]
Return type:

LifeCycleFn

kopf.set_default_lifecycle(lifecycle)[source]
Return type:

None

Parameters:

lifecycle (LifeCycleFn | None) –

kopf.build_object_reference(body)[source]

Construct an object reference for the events.

Keep in mind that some fields can be absent: e.g. namespace for cluster resources, or e.g. apiVersion for kind: Node, etc.

Return type:

ObjectReference

Parameters:

body (Body) –

kopf.build_owner_reference(body, *, controller=True, block_owner_deletion=True)[source]

Construct an owner reference object for the parent-children relationships.

The structure needed to link the children objects to the current object as a parent. See https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/

Keep in mind that some fields can be absent: e.g. namespace for cluster resources, or e.g. apiVersion for kind: Node, etc.

Return type:

OwnerReference

Parameters:
  • body (Body) –

  • controller (bool | None) –

  • block_owner_deletion (bool | None) –

kopf.append_owner_reference(objs, owner=None, *, controller=True, block_owner_deletion=True)[source]

Append an owner reference to the resource(s), if it is not yet there.

Note: the owned objects are usually not the one being processed, so the whole body can be modified, no patches are needed.

Return type:

None

Parameters:
kopf.remove_owner_reference(objs, owner=None)[source]

Remove an owner reference to the resource(s), if it is there.

Note: the owned objects are usually not the one being processed, so the whole body can be modified, no patches are needed.

Return type:

None

Parameters:
class kopf.ErrorsMode(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

How arbitrary (non-temporary/non-permanent) exceptions are treated.

IGNORED = 1[source]
TEMPORARY = 2[source]
PERMANENT = 3[source]
exception kopf.AdmissionError(message='', code=500)[source]

Bases: PermanentError

Raised by admission handlers when an API operation under check is bad.

An admission error behaves the same as kopf.PermanentError, but provides admission-specific payload for the response: a message & a numeric code.

This error type is preferred when selecting only one error to report back to apiservers as the admission review result – in case multiple handlers are called in one admission request, i.e. when the webhook endpoints are not mapped to the handler ids (e.g. when configured manually).

Parameters:
  • message (str | None) –

  • code (int | None) –

Return type:

None

class kopf.WebhookClientConfigService[source]

Bases: TypedDict

namespace: Optional[str][source]
name: Optional[str][source]
path: Optional[str][source]
port: Optional[int][source]
class kopf.WebhookClientConfig[source]

Bases: TypedDict

A config of clients (apiservers) to access the webhooks’ server (operators).

This dictionary is put into managed webhook configurations “as is”. The fields & type annotations are only for hinting.

Kopf additionally modifies the url and the service’s path to inject handler ids as the last path component. This must be taken into account by custom webhook servers.

caBundle: Optional[str][source]
url: Optional[str][source]
service: Optional[WebhookClientConfigService][source]
class kopf.UserInfo[source]

Bases: TypedDict

username: str[source]
uid: str[source]
groups: List[str][source]
class kopf.WebhookFn(*args, **kwargs)[source]

Bases: Protocol

A framework-provided function to call when an admission request is received.

The framework provides the actual function. Custom webhook servers must accept the function, invoke it accordingly on admission requests, wait for the admission response, serialise it and send it back. They do not implement this function. This protocol only declares the exact signature.

class kopf.WebhookServer(*, addr=None, port=None, path=None, host=None, cadata=None, cafile=None, cadump=None, context=None, insecure=False, certfile=None, pkeyfile=None, password=None, extra_sans=(), verify_mode=None, verify_cafile=None, verify_capath=None, verify_cadata=None)[source]

Bases: WebhookContextManager

A local HTTP/HTTPS endpoint.

Currently, the server is based on aiohttp, but the implementation can change in the future without warning.

This server is also used by specialised tunnels when they need a local endpoint to be tunneled.

  • addr, port is where to listen for connections (defaults to localhost and 9443).

  • path is the root path for a webhook server (defaults to no root path).

  • host is an optional override of the hostname for webhook URLs; if not specified, the addr will be used.

Kubernetes requires HTTPS, so HTTPS is the default mode of the server. This webhook server supports SSL both for the server certificates and for client certificates (e.g., for authentication) at the same time:

  • cadata, cafile is the CA bundle to be passed as a “client config” to the webhook configuration objects, to be used by clients/apiservers when talking to the webhook server; it is not used in the server itself.

  • cadump is a path to save the resulting CA bundle to be used by clients, i.e. apiservers; it can be passed to curl --cacert ...; if cafile is provided, it contains the same content.

  • certfile, pkeyfile define the server’s endpoint certificate; if not specified, a self-signed certificate and CA will be generated for both addr & host as SANs (but only host for CommonName).

  • password is either for decrypting the provided pkeyfile, or for encrypting and decrypting the generated private key.

  • extra_sans are put into the self-signed certificate as SANs (DNS/IP) in addition to the host & addr (in case some other endpoints exist).

  • verify_mode, verify_cafile, verify_capath, verify_cadata will be loaded into the SSL context for verifying the client certificates when provided and if provided by the clients, i.e. apiservers or curl; (ssl.SSLContext.verify_mode, ssl.SSLContext.load_verify_locations).

  • insecure flag disables HTTPS and runs an HTTP webhook server. This is used in ngrok for a local endpoint, but can be used for debugging or when the certificate-generating dependencies/extras are not installed.

Parameters:
DEFAULT_HOST: Optional[str] = None[source]
addr: Optional[str][source]
port: Optional[int][source]
path: Optional[str][source]
host: Optional[str][source]
cadata: Optional[bytes][source]
cafile: Union[str, PathLike, None][source]
cadump: Union[str, PathLike, None][source]
context: Optional[SSLContext][source]
insecure: bool[source]
certfile: Union[str, PathLike, None][source]
pkeyfile: Union[str, PathLike, None][source]
password: Optional[str][source]
extra_sans: Iterable[str][source]
verify_mode: Optional[VerifyMode][source]
verify_cafile: Union[str, PathLike, None][source]
verify_capath: Union[str, PathLike, None][source]
verify_cadata: Union[str, bytes, None][source]
static build_certificate(hostnames, password=None)[source]

Build a self-signed certificate with SANs (subject alternative names).

Returns a tuple of the certificate and its private key (PEM-formatted).

The certificate is “minimally sufficient”, without much of the extra information on the subject besides its common and alternative names. However, IP addresses are properly recognised and normalised for better compatibility with strict SSL clients (like apiservers of Kubernetes). The first non-IP hostname becomes the certificate’s common name – by convention, non-configurable. If no hostnames are found, the first IP address is used as a fallback. Magic IPs like 0.0.0.0 are excluded.

certbuilder is used as an implementation because it is lightweight: 2.9 MB vs. 8.7 MB for cryptography. Still, it is too heavy to include as a normal runtime dependency (for 8.8 MB of Kopf itself), so it is only available as the kopf[dev] extra for development-mode dependencies. This can change in the future if self-signed certificates become used at runtime (e.g. in production/staging environments or other real clusters).

Return type:

Tuple[bytes, bytes]

Parameters:
class kopf.WebhookK3dServer(*, addr=None, port=None, path=None, host=None, cadata=None, cafile=None, cadump=None, context=None, insecure=False, certfile=None, pkeyfile=None, password=None, extra_sans=(), verify_mode=None, verify_cafile=None, verify_capath=None, verify_cadata=None)[source]

Bases: WebhookServer

A tunnel from inside of K3d/K3s to its host where the operator is running.

With this tunnel, a developer can develop the webhooks when fully offline, since all the traffic is local and never leaves the host machine.

The forwarding is maintained by K3d itself. This tunnel only replaces the endpoints for the Kubernetes webhook and injects an SSL certificate with proper CN/SANs — to match Kubernetes’s SSL validity expectations.

Parameters:
DEFAULT_HOST: Optional[str] = 'host.k3d.internal'[source]
class kopf.WebhookMinikubeServer(*, addr=None, port=None, path=None, host=None, cadata=None, cafile=None, cadump=None, context=None, insecure=False, certfile=None, pkeyfile=None, password=None, extra_sans=(), verify_mode=None, verify_cafile=None, verify_capath=None, verify_cadata=None)[source]

Bases: WebhookServer

A tunnel from inside of Minikube to its host where the operator is running.

With this tunnel, a developer can develop the webhooks when fully offline, since all the traffic is local and never leaves the host machine.

The forwarding is maintained by Minikube itself. This tunnel only replaces the endpoints for the Kubernetes webhook and injects an SSL certificate with proper CN/SANs — to match Kubernetes’s SSL validity expectations.

Parameters:
DEFAULT_HOST: Optional[str] = 'host.minikube.internal'[source]
class kopf.WebhookNgrokTunnel(*, addr=None, port=None, path=None, token=None, region=None, binary=None)[source]

Bases: WebhookContextManager

Tunnel admission webhook request via an external tunnel: ngrok.

addr, port, and path have the same meaning as in kopf.WebhookServer: where to listen for connections locally. Ngrok then tunnels this endpoint remotely with.

Mind that the ngrok webhook tunnel runs the local webhook server in an insecure (HTTP) mode. For secure (HTTPS) mode, a paid subscription and properly issued certificates are needed. This goes beyond Kopf’s scope. If needed, implement your own ngrok tunnel.

Besides, ngrok tunnel does not report any CA to the webhook client configs. It is expected that the default trust chain is sufficient for ngrok’s certs.

token can be used for paid subscriptions, which lifts some limitations. Otherwise, the free plan has a limit of 40 requests per minute (this should be enough for local development).

binary, if set, will use the specified ngrok binary path; otherwise, pyngrok downloads the binary at runtime (not recommended).

Warning

The public URL is not properly protected and a malicious user can send requests to a locally running operator. If the handlers only process the data and make no side effects, this should be fine.

Despite ngrok provides basic auth (“username:password”), Kubernetes does not permit this information in the URLs.

Ngrok partially “protects” the URLS by assigning them random hostnames. Additionally, you can add random paths. However, this is not “security”, only a bit of safety for a short time (enough for development runs).

Parameters:
  • addr (str | None) –

  • port (int | None) –

  • path (str | None) –

  • token (str | None) –

  • region (str | None) –

  • binary (str | PathLike | None) –

addr: Optional[str][source]
port: Optional[int][source]
path: Optional[str][source]
token: Optional[str][source]
region: Optional[str][source]
binary: Union[str, PathLike, None][source]
class kopf.WebhookAutoServer(*, addr=None, port=None, path=None, host=None, cadata=None, cafile=None, cadump=None, context=None, insecure=False, certfile=None, pkeyfile=None, password=None, extra_sans=(), verify_mode=None, verify_cafile=None, verify_capath=None, verify_cadata=None)[source]

Bases: ClusterDetector, WebhookServer

A locally listening webserver which attempts to guess its proper hostname.

The choice is happening between supported webhook servers only (K3d/K3d and Minikube at the moment). In all other cases, a regular local server is started without hostname overrides.

If automatic tunneling is possible, consider WebhookAutoTunnel instead.

Parameters:
class kopf.WebhookAutoTunnel(*, addr=None, port=None, path=None)[source]

Bases: ClusterDetector, WebhookContextManager

The same as WebhookAutoServer, but with possible tunneling.

Generally, tunneling gives more possibilities to run in any environment, but it must not happen without a permission from the developers, and is not possible if running in a completely isolated/local/CI/CD cluster. Therefore, developers should activated automatic setup explicitly.

If automatic tunneling is prohibited or impossible, use WebhookAutoServer.

Note

Automatic server/tunnel detection is highly limited in configuration and provides only the most common options of all servers & tunners: specifically, listening addr:port/path. All other options are specific to their servers/tunnels and the auto-guessing logic cannot use/accept/pass them.

Parameters:
  • addr (str | None) –

  • port (int | None) –

  • path (str | None) –

addr: Optional[str][source]
port: Optional[int][source]
path: Optional[str][source]
exception kopf.PermanentError[source]

Bases: Exception

A fatal handler error, the retries are useless.

exception kopf.TemporaryError(_TemporaryError__msg=None, delay=60)[source]

Bases: Exception

A potentially recoverable error, should be retried.

Parameters:
  • _TemporaryError__msg (str | None) –

  • delay (float | None) –

Return type:

None

exception kopf.HandlerTimeoutError[source]

Bases: PermanentError

An error for the handler’s timeout (if set).

exception kopf.HandlerRetriesError[source]

Bases: PermanentError

An error for the handler’s retries exceeded (if set).

class kopf.OperatorRegistry[source]

Bases: object

A global registry is used for handling of multiple resources & activities.

It is usually populated by the @kopf.on... decorators, but can also be explicitly created and used in the embedded operators.

kopf.get_default_registry()[source]

Get the default registry to be used by the decorators and the reactor unless the explicit registry is provided to them.

Return type:

OperatorRegistry

kopf.set_default_registry(registry)[source]

Set the default registry to be used by the decorators and the reactor unless the explicit registry is provided to them.

Return type:

None

Parameters:

registry (OperatorRegistry) –

class kopf.OperatorSettings(process=<factory>, posting=<factory>, peering=<factory>, watching=<factory>, batching=<factory>, scanning=<factory>, admission=<factory>, execution=<factory>, background=<factory>, networking=<factory>, persistence=<factory>)[source]

Bases: object

Parameters:
  • process (ProcessSettings) –

  • posting (PostingSettings) –

  • peering (PeeringSettings) –

  • watching (WatchingSettings) –

  • batching (BatchingSettings) –

  • scanning (ScanningSettings) –

  • admission (AdmissionSettings) –

  • execution (ExecutionSettings) –

  • background (BackgroundSettings) –

  • networking (NetworkingSettings) –

  • persistence (PersistenceSettings) –

process: ProcessSettings[source]
posting: PostingSettings[source]
peering: PeeringSettings[source]
watching: WatchingSettings[source]
batching: BatchingSettings[source]
scanning: ScanningSettings[source]
admission: AdmissionSettings[source]
execution: ExecutionSettings[source]
background: BackgroundSettings[source]
networking: NetworkingSettings[source]
persistence: PersistenceSettings[source]
class kopf.DiffBaseStorage[source]

Bases: StorageKeyMarkingConvention, StorageStanzaCleaner

Store the base essence for diff calculations, i.e. last handled state.

The “essence” is a snapshot of meaningful fields, which must be tracked to identify the actual changes on the object (or absence of such).

Used in the handling routines to check if there were significant changes (i.e. not the internal and system changes, like the uids, links, etc), and to get the exact per-field diffs for the specific handler functions.

Conceptually similar to how kubectl apply stores the applied state on any object, and then uses that for the patch calculation: https://kubernetes.io/docs/concepts/overview/object-management-kubectl/declarative-config/

build(*, body, extra_fields=None)[source]

Extract only the relevant fields for the state comparisons.

The framework ignores all the system fields (mostly from metadata) and the status senza completely. Except for some well-known and useful metadata, such as labels and annotations (except for sure garbage).

A special set of fields can be provided even if they are supposed to be removed. This is used, for example, for handlers which react to changes in the specific fields in the status stanza, while the rest of the status stanza is removed.

It is generally not a good idea to override this method in custom stores, unless a different definition of an object’s essence is needed.

Return type:

BodyEssence

Parameters:
abstract fetch(*, body)[source]
Return type:

Optional[BodyEssence]

Parameters:

body (Body) –

abstract store(*, body, patch, essence)[source]
Return type:

None

Parameters:
class kopf.AnnotationsDiffBaseStorage(*, prefix='kopf.zalando.org', key='last-handled-configuration', v1=True)[source]

Bases: StorageKeyFormingConvention, DiffBaseStorage

Parameters:
build(*, body, extra_fields=None)[source]

Extract only the relevant fields for the state comparisons.

The framework ignores all the system fields (mostly from metadata) and the status senza completely. Except for some well-known and useful metadata, such as labels and annotations (except for sure garbage).

A special set of fields can be provided even if they are supposed to be removed. This is used, for example, for handlers which react to changes in the specific fields in the status stanza, while the rest of the status stanza is removed.

It is generally not a good idea to override this method in custom stores, unless a different definition of an object’s essence is needed.

Return type:

BodyEssence

Parameters:
fetch(*, body)[source]
Return type:

Optional[BodyEssence]

Parameters:

body (Body) –

store(*, body, patch, essence)[source]
Return type:

None

Parameters:
class kopf.StatusDiffBaseStorage(*, name='kopf', field='status.{name}.last-handled-configuration')[source]

Bases: DiffBaseStorage

Parameters:
property field: Tuple[str, ...][source]
build(*, body, extra_fields=None)[source]

Extract only the relevant fields for the state comparisons.

The framework ignores all the system fields (mostly from metadata) and the status senza completely. Except for some well-known and useful metadata, such as labels and annotations (except for sure garbage).

A special set of fields can be provided even if they are supposed to be removed. This is used, for example, for handlers which react to changes in the specific fields in the status stanza, while the rest of the status stanza is removed.

It is generally not a good idea to override this method in custom stores, unless a different definition of an object’s essence is needed.

Return type:

BodyEssence

Parameters:
fetch(*, body)[source]
Return type:

Optional[BodyEssence]

Parameters:

body (Body) –

store(*, body, patch, essence)[source]
Return type:

None

Parameters:
class kopf.MultiDiffBaseStorage(storages)[source]

Bases: DiffBaseStorage

Parameters:

storages (Collection[DiffBaseStorage]) –

build(*, body, extra_fields=None)[source]

Extract only the relevant fields for the state comparisons.

The framework ignores all the system fields (mostly from metadata) and the status senza completely. Except for some well-known and useful metadata, such as labels and annotations (except for sure garbage).

A special set of fields can be provided even if they are supposed to be removed. This is used, for example, for handlers which react to changes in the specific fields in the status stanza, while the rest of the status stanza is removed.

It is generally not a good idea to override this method in custom stores, unless a different definition of an object’s essence is needed.

Return type:

BodyEssence

Parameters:
fetch(*, body)[source]
Return type:

Optional[BodyEssence]

Parameters:

body (Body) –

store(*, body, patch, essence)[source]
Return type:

None

Parameters:
class kopf.ProgressRecord[source]

Bases: TypedDict

A single record stored for persistence of a single handler.

started: Optional[str][source]
stopped: Optional[str][source]
delayed: Optional[str][source]
purpose: Optional[str][source]
retries: Optional[int][source]
success: Optional[bool][source]
failure: Optional[bool][source]
message: Optional[str][source]
subrefs: Optional[Collection[NewType(HandlerId, str)]][source]
class kopf.ProgressStorage[source]

Bases: StorageStanzaCleaner

Base class and an interface for all persistent states.

The state is persisted strict per-handler, not for all handlers at once: to support overlapping operators (assuming different handler ids) storing their state on the same fields of the resource (e.g. state.kopf).

This also ensures that no extra logic for state merges will be needed: the handler states are atomic (i.e. state fields are not used separately) but independent: i.e. handlers should be persisted on their own, unrelated to other handlers; i.e. never combined to other atomic structures.

If combining is still needed with performance optimization in mind (e.g. for relational/transactional databases), the keys can be cached in memory for short time, and flush() can be overridden to actually store them.

abstract fetch(*, key, body)[source]
Return type:

Optional[ProgressRecord]

Parameters:
  • key (HandlerId) –

  • body (Body) –

abstract store(*, key, record, body, patch)[source]
Return type:

None

Parameters:
abstract purge(*, key, body, patch)[source]
Return type:

None

Parameters:
  • key (HandlerId) –

  • body (Body) –

  • patch (Patch) –

abstract touch(*, body, patch, value)[source]
Return type:

None

Parameters:
abstract clear(*, essence)[source]
Return type:

BodyEssence

Parameters:

essence (BodyEssence) –

flush()[source]
Return type:

None

class kopf.AnnotationsProgressStorage(*, prefix='kopf.zalando.org', verbose=False, touch_key='touch-dummy', v1=True)[source]

Bases: StorageKeyFormingConvention, StorageKeyMarkingConvention, ProgressStorage

State storage in .metadata.annotations with JSON-serialised content.

An example without a prefix:

An example with a prefix:

For the annotations’ naming conventions, hashing, and V1 & V2 differences, see AnnotationsNamingMixin.

Parameters:
  • prefix (str) –

  • verbose (bool) –

  • touch_key (str) –

  • v1 (bool) –

fetch(*, key, body)[source]
Return type:

Optional[ProgressRecord]

Parameters:
  • key (HandlerId) –

  • body (Body) –

store(*, key, record, body, patch)[source]
Return type:

None

Parameters:
purge(*, key, body, patch)[source]
Return type:

None

Parameters:
  • key (HandlerId) –

  • body (Body) –

  • patch (Patch) –

touch(*, body, patch, value)[source]
Return type:

None

Parameters:
clear(*, essence)[source]
Return type:

BodyEssence

Parameters:

essence (BodyEssence) –

class kopf.StatusProgressStorage(*, name='kopf', field='status.{name}.progress', touch_field='status.{name}.dummy')[source]

Bases: ProgressStorage

State storage in .status stanza with deep structure.

The structure is this:

Parameters:
property field: Tuple[str, ...][source]
property touch_field: Tuple[str, ...][source]
fetch(*, key, body)[source]
Return type:

Optional[ProgressRecord]

Parameters:
  • key (HandlerId) –

  • body (Body) –

store(*, key, record, body, patch)[source]
Return type:

None

Parameters:
purge(*, key, body, patch)[source]
Return type:

None

Parameters:
  • key (HandlerId) –

  • body (Body) –

  • patch (Patch) –

touch(*, body, patch, value)[source]
Return type:

None

Parameters:
clear(*, essence)[source]
Return type:

BodyEssence

Parameters:

essence (BodyEssence) –

class kopf.MultiProgressStorage(storages)[source]

Bases: ProgressStorage

Parameters:

storages (Collection[ProgressStorage]) –

fetch(*, key, body)[source]
Return type:

Optional[ProgressRecord]

Parameters:
  • key (HandlerId) –

  • body (Body) –

store(*, key, record, body, patch)[source]
Return type:

None

Parameters:
purge(*, key, body, patch)[source]
Return type:

None

Parameters:
  • key (HandlerId) –

  • body (Body) –

  • patch (Patch) –

touch(*, body, patch, value)[source]
Return type:

None

Parameters:
clear(*, essence)[source]
Return type:

BodyEssence

Parameters:

essence (BodyEssence) –

class kopf.SmartProgressStorage(*, name='kopf', field='status.{name}.progress', touch_key='touch-dummy', touch_field='status.{name}.dummy', prefix='kopf.zalando.org', v1=True, verbose=False)[source]

Bases: MultiProgressStorage

Parameters:
class kopf.RawEvent[source]

Bases: TypedDict

type: Literal[None, 'ADDED', 'MODIFIED', 'DELETED'][source]
object: RawBody[source]
class kopf.RawBody[source]

Bases: TypedDict

apiVersion: str[source]
kind: str[source]
metadata: RawMeta[source]
spec: Mapping[str, Any][source]
status: Mapping[str, Any][source]
class kopf.Status(_Status__src)[source]

Bases: MappingView[str, Any]

Parameters:

_Status__src (Body) –

class kopf.Spec(_Spec__src)[source]

Bases: MappingView[str, Any]

Parameters:

_Spec__src (Body) –

class kopf.Meta(_Meta__src)[source]

Bases: MappingView[str, Any]

Parameters:

_Meta__src (Body) –

property labels: Mapping[str, str][source]
property annotations: Mapping[str, str][source]
property uid: str | None[source]
property name: str | None[source]
property namespace: NamespaceName | None[source]
property creation_timestamp: str | None[source]
property deletion_timestamp: str | None[source]
class kopf.Body(_Body__src)[source]

Bases: ReplaceableMappingView[str, Any]

Parameters:

_Body__src (Mapping[str, Any]) –

property metadata: Meta[source]
property meta: Meta[source]
property spec: Spec[source]
property status: Status[source]
class kopf.BodyEssence[source]

Bases: TypedDict

metadata: MetaEssence[source]
spec: Mapping[str, Any][source]
status: Mapping[str, Any][source]
class kopf.ObjectReference[source]

Bases: TypedDict

apiVersion: str[source]
kind: str[source]
namespace: Optional[str][source]
name: str[source]
uid: str[source]
class kopf.OwnerReference[source]

Bases: TypedDict

controller: bool[source]
blockOwnerDeletion: bool[source]
apiVersion: str[source]
kind: str[source]
name: str[source]
uid: str[source]
class kopf.Memo[source]

Bases: Dict[Any, Any]

A container to hold arbitrary keys-values assigned by operator developers.

It is used in the memo kwarg to all resource handlers, isolated per individual resource object (not the resource kind).

The values can be accessed either as dictionary keys (the memo is a dict under the hood) or as object attributes (except for methods of dict).

See more in In-memory containers.

>>> memo = Memo()
>>> memo.f1 = 100
>>> memo['f1']
... 100
>>> memo['f2'] = 200
>>> memo.f2
... 200
>>> set(memo.keys())
... {'f1', 'f2'}
class kopf.Index[source]

Bases: Mapping[_K, Store[_V]], Generic[_K, _V]

A mapping of index keys to collections of values indexed under those keys.

A single index is identified by a handler id and is populated by values usually from a single indexing function (the @kopf.index() decorator).

Note

This class is only an abstract interface of an index. The actual implementation is in indexing.Index.

class kopf.Store[source]

Bases: Collection[_V], Generic[_V]

A collection of all values under a single unique index key.

Multiple objects can yield the same keys, so all their values are accumulated into collections. When an object is deleted or stops matching the filters, all associated values are discarded.

The order of values is not guaranteed.

The values are not deduplicated, so duplicates are possible if multiple objects return the same values from their indexing functions.

Note

This class is only an abstract interface of an indexed store. The actual implementation is in indexing.Store.

class kopf.ObjectLogger(*, body, settings)[source]

Bases: LoggerAdapter

A logger/adapter to carry the object identifiers for formatting.

The identifiers are then used both for formatting the per-object messages in ObjectPrefixingFormatter, and when posting the per-object k8s-events.

Constructed in event handling of each individual object.

The internal structure is made the same as an object reference in K8s API, but can change over time to anything needed for our internal purposes. However, as little information should be carried as possible, and the information should be protected against the object modification (e.g. in case of background posting via the queue; see K8sPoster).

Parameters:
process(msg, kwargs)[source]

Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.

Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.

Return type:

Tuple[str, MutableMapping[str, Any]]

Parameters:
class kopf.LocalObjectLogger(*, body, settings)[source]

Bases: ObjectLogger

The same as ObjectLogger, but does not post the messages as k8s-events.

Used in the resource-watching handlers to log the handler’s invocation successes/failures without overloading K8s with excessively many k8s-events.

This class is used internally only and is not exposed publicly in any way.

Parameters:
log(*args, **kwargs)[source]

Delegate a log call to the underlying logger, after adding contextual information from this adapter instance.

Return type:

None

Parameters:
  • args (Any) –

  • kwargs (Any) –

class kopf.Diff(_Diff__items)[source]

Bases: Sequence[DiffItem]

Parameters:

_Diff__items (Iterable[DiffItem]) –

class kopf.DiffItem(operation, field, old, new)[source]

Bases: NamedTuple

Parameters:
operation: DiffOperation[source]

Alias for field number 0

field: Tuple[str, ...][source]

Alias for field number 1

old: Any[source]

Alias for field number 2

new: Any[source]

Alias for field number 3

property op: DiffOperation[source]
class kopf.DiffOperation(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

ADD = 'add'[source]
CHANGE = 'change'[source]
REMOVE = 'remove'[source]
class kopf.Reason(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

CREATE = 'create'[source]
UPDATE = 'update'[source]
DELETE = 'delete'[source]
RESUME = 'resume'[source]
NOOP = 'noop'[source]
FREE = 'free'[source]
GONE = 'gone'[source]
class kopf.Patch(_Patch__src=None, body=None)[source]

Bases: Dict[str, Any]

Parameters:
property metadata: MetaPatch[source]
property meta: MetaPatch[source]
property spec: SpecPatch[source]
property status: StatusPatch[source]
as_json_patch()[source]
Return type:

List[JSONPatchItem]

class kopf.DaemonStoppingReason(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Flag

A reason or reasons of daemon being terminated.

Daemons are signalled to exit usually for two reasons: the operator itself is exiting or restarting, so all daemons of all resources must stop; or the individual resource was deleted, but the operator continues running.

No matter the reason, the daemons must exit, so one and only one stop-flag is used. Some daemons can check the reason of exiting if it is important.

There can be multiple reasons combined (in rare cases, all of them).

DONE = 1[source]
FILTERS_MISMATCH = 2[source]
RESOURCE_DELETED = 4[source]
OPERATOR_PAUSING = 8[source]
OPERATOR_EXITING = 16[source]
DAEMON_SIGNALLED = 32[source]
DAEMON_CANCELLED = 64[source]
DAEMON_ABANDONED = 128[source]
class kopf.Resource(group, version, plural, kind=None, singular=None, shortcuts=frozenset({}), categories=frozenset({}), subresources=frozenset({}), namespaced=None, preferred=True, verbs=frozenset({}))[source]

Bases: object

A reference to a very specific custom or built-in resource kind.

It is used to form the K8s API URLs. Generally, K8s API only needs an API group, an API version, and a plural name of the resource. All other names are remembered to match against resource selectors, for logging, and for informational purposes.

Parameters:
group: str[source]

The resource’s API group; e.g. "kopf.dev", "apps", "batch". For Core v1 API resources, an empty string: "".

version: str[source]

The resource’s API version; e.g. "v1", "v1beta1", etc.

plural: str[source]

The resource’s plural name; e.g. "pods", "kopfexamples". It is used as an API endpoint, together with API group & version.

kind: Optional[str] = None[source]

The resource’s kind (as in YAML files); e.g. "Pod", "KopfExample".

singular: Optional[str] = None[source]

The resource’s singular name; e.g. "pod", "kopfexample".

shortcuts: FrozenSet[str] = frozenset({})[source]

The resource’s short names; e.g. {"po"}, {"kex", "kexes"}.

categories: FrozenSet[str] = frozenset({})[source]

The resource’s categories, to which the resource belongs; e.g. {"all"}.

subresources: FrozenSet[str] = frozenset({})[source]

The resource’s subresources, if defined; e.g. {"status", "scale"}.

namespaced: Optional[bool] = None[source]

Whether the resource is namespaced (True) or cluster-scoped (False).

preferred: bool = True[source]

Whether the resource belong to a “preferred” API version. Only “preferred” resources are served when the version is not specified.

verbs: FrozenSet[str] = frozenset({})[source]

All available verbs for the resource, as supported by K8s API; e.g., {"list", "watch", "create", "update", "delete", "patch"}. Note that it is not the same as all verbs permitted by RBAC.

get_url(*, server=None, namespace=None, name=None, subresource=None, params=None)[source]

Build a URL to be used with K8s API.

If the namespace is not set, a cluster-wide URL is returned. For cluster-scoped resources, the namespace is ignored.

If the name is not set, the URL for the resource list is returned. Otherwise (if set), the URL for the individual resource is returned.

If subresource is set, that subresource’s URL is returned, regardless of whether such a subresource is known or not.

Params go to the query parameters (?param1=value1&param2=value2...).

Return type:

str

Parameters:
  • server (str | None) –

  • namespace (NamespaceName | None) –

  • name (str | None) –

  • subresource (str | None) –

  • params (Mapping[str, str] | None) –

Submodules