Informer implementation with the NewSharedInformerFactoryWithOptions
Informer and websocket implementation for specific workload (deployments) * Implementation of watch workload (in this case deployment)
by Rahul Vishwakarma
See this Blog for more clarity
* Informer implementation with the NewSharedInformerFactoryWithOptions
* Implementation of watch workload (in this case deployment)
* Informer and websocket implementation for specific workload (deployments)
* Kubernetes custom controller from scratch in go
Pull request 👨🏻💻
package main
// websocket
router.GET("/ws", func(ctx *gin.Context) {
deployment.HandleDeploymentLogs(ctx.Writer, ctx.Request)
})
// controller.go
package deployment
import (
"context"
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/katamyra/kubestellarUI/wds"
v1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"log"
"net/http"
"time"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type DeploymentUpdate struct {
Timestamp string `json:"timestamp"`
Message string `json:"message"`
}
func HandleDeploymentLogs(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("WebSocket Upgrade Error:", err)
return
}
defer conn.Close()
namespace := r.URL.Query().Get("namespace")
deploymentName := r.URL.Query().Get("deployment")
if namespace == "" || deploymentName == "" {
if err := conn.WriteMessage(websocket.TextMessage, []byte("Error: Missing namespace or deployment name")); err != nil {
log.Printf("Failed to write message: %v", err)
}
return
}
// nothing but just getting the kubeclient
clientset, err := wds.GetClientSetKubeConfig()
if err != nil {
if err := conn.WriteMessage(websocket.TextMessage, []byte("Error: Failed to create Kubernetes clientset - "+err.Error())); err != nil {
log.Printf("Failed to send WebSocket message: %v", err)
}
return
}
sendInitialLogs(conn, clientset, namespace, deploymentName)
// Use an informer to watch the deployment
watchDeploymentWithInformer(conn, clientset, namespace, deploymentName)
// WE ARE USING INFORMER
//watchDeploymentChanges(conn, clientset, namespace, deploymentName)
}
func sendInitialLogs(conn *websocket.Conn, clientset *kubernetes.Clientset, namespace, deploymentName string) {
deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.Background(), deploymentName, metav1.GetOptions{})
if err != nil {
if err := conn.WriteMessage(websocket.TextMessage, []byte("Error: Failed to fetch deployment - "+err.Error())); err != nil {
log.Printf("Failed to send WebSocket message: %v", err)
}
return
}
logs := getDeploymentLogs(deployment)
for _, logLine := range logs {
if err := conn.WriteMessage(websocket.TextMessage, []byte(logLine)); err != nil {
log.Println("Error writing to WebSocket:", err)
return
}
time.Sleep(200 * time.Millisecond)
}
}
func watchDeploymentWithInformer(conn *websocket.Conn, clientset *kubernetes.Clientset, namespace, deploymentName string) {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fmt.Sprintf("metadata.name=%s", deploymentName)
}),
)
informer := factory.Apps().V1().Deployments().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
oldDeployment, ok1 := oldObj.(*v1.Deployment)
newDeployment, ok2 := newObj.(*v1.Deployment)
if !ok1 || !ok2 || newDeployment.Name != deploymentName {
return
}
updateHandler(conn, oldDeployment, newDeployment)
},
})
// make(chan struct{}) creates an empty struct channel.
// This channel is used to signal the informer to stop.
// struct{} is a zero-memory type, meaning it doesn’t allocate memory.
// We don’t need to send any actual data through this channel, just a signal.
stopCh := make(chan struct{})
defer close(stopCh)
go informer.Run(stopCh)
select {}
}
Explaination:
select {}
Keep the connection open
This is a blocking operation that:
Prevents the function from exiting.
Keeps the WebSocket connection open.
💡 Why does this work?
select {} waits indefinitely because there are no case statements.
The function never returns, so the informer keeps running.
stopCh := make(chan struct{})
defer close(stopCh)
go informer.Run(stopCh)
select {}
1️⃣ Create a channel (stopCh) for stopping the informer.
2️⃣ Run the informer in a goroutine (go informer.Run(stopCh)).
3️⃣ Block forever (select {}) to keep the function running.
4️⃣ If the function returns (not in this case), defer close(stopCh) stops the informer.
// controller.go
func updateHandler(conn *websocket.Conn, oldDeployment, newDeployment *v1.Deployment) {
var logs []DeploymentUpdate
if *oldDeployment.Spec.Replicas != *newDeployment.Spec.Replicas {
logs = append(logs, DeploymentUpdate{
Timestamp: time.Now().Format(time.RFC3339),
Message: fmt.Sprintf("Deployment %s updated - Replicas changed: %d", newDeployment.Name, *newDeployment.Spec.Replicas),
})
}
oldImage := oldDeployment.Spec.Template.Spec.Containers[0].Image
newImage := newDeployment.Spec.Template.Spec.Containers[0].Image
if oldImage != newImage {
logs = append(logs, DeploymentUpdate{
Timestamp: time.Now().Format(time.RFC3339),
Message: fmt.Sprintf("Deployment %s updated - Image changed: %s", newDeployment.Name, newImage),
})
}
for _, logLine := range logs {
jsonMessage, _ := json.Marshal(logLine)
conn.WriteMessage(websocket.TextMessage, jsonMessage)
}
}
// Watches deployment changes and sends updates
// Keeping it for reference - NOT USEFUL
func watchDeploymentChanges(conn *websocket.Conn, clientset *kubernetes.Clientset, namespace, deploymentName string) {
options := metav1.ListOptions{
// remove this line it will become universal for all the deployment
// it will listen for all deployment inside namespace
FieldSelector: fmt.Sprintf("metadata.name=%s", deploymentName),
}
watcher, err := clientset.AppsV1().Deployments(namespace).Watch(context.Background(), options)
if err != nil {
if err := conn.WriteMessage(websocket.TextMessage, []byte("Error: Failed to watch deployment - "+err.Error())); err != nil {
log.Printf("Failed to send WebSocket message: %v", err)
}
return
}
defer watcher.Stop()
// preserving the replicas and image for next call
var lastReplicas *int32
var lastImage string
for event := range watcher.ResultChan() {
deployment, ok := event.Object.(*v1.Deployment)
if !ok {
continue
}
var logs []DeploymentUpdate
message := fmt.Sprintf("Deployment %s changed: %s", deployment.Name, event.Type)
log.Println(message)
if lastReplicas == nil || *lastReplicas != *deployment.Spec.Replicas {
message = fmt.Sprintf("Deployment %s updated - Replicas changed: %d", deployment.Name, *deployment.Spec.Replicas)
lastReplicas = deployment.Spec.Replicas
logs = append(logs, DeploymentUpdate{
Timestamp: time.Now().Format(time.RFC3339),
Message: message,
})
}
if len(deployment.Spec.Template.Spec.Containers) > 0 {
currentImage := deployment.Spec.Template.Spec.Containers[0].Image
if lastImage == "" || lastImage != currentImage {
message = fmt.Sprintf("Deployment %s updated - Image changed: %s", deployment.Name, currentImage)
logs = append(logs, DeploymentUpdate{
Timestamp: time.Now().Format(time.RFC3339),
Message: message,
})
lastImage = currentImage
}
}
for _, logLine := range logs {
jsonMessage, _ := json.Marshal(logLine)
if err := conn.WriteMessage(websocket.TextMessage, jsonMessage); err != nil {
log.Println("Error writing to WebSocket:", err)
return
}
}
}
}
func getDeploymentLogs(deployment *v1.Deployment) []string {
baseTime := time.Now().Format(time.RFC3339)
replicas := int32(1)
if deployment.Spec.Replicas != nil {
replicas = *deployment.Spec.Replicas
}
logs := []string{
fmt.Sprintf("[%v] INFO: Deployment workload %v initiated ", baseTime, deployment.Name),
fmt.Sprintf("[%v] INFO: Workload created with replicas: %d, image: %v ", baseTime, replicas, deployment.Spec.Template.Spec.Containers[0].Image),
fmt.Sprintf("[%v] INFO: Namespace %v successfully updated ", baseTime, deployment.Namespace),
fmt.Sprintf("[%v] INFO: Available Replicas: %d ", baseTime, deployment.Status.AvailableReplicas),
}
// Check if Conditions slice has elements before accessing it
if len(deployment.Status.Conditions) > 0 {
condition := deployment.Status.Conditions[0]
logs = append(logs,
fmt.Sprintf("[%v] INFO: Conditions: %s ", baseTime, condition.Type),
fmt.Sprintf("[%v] INFO: LastUpdateTime : %s ", baseTime, condition.LastUpdateTime.Time),
fmt.Sprintf("[%v] INFO: LastTransitionTime : %s ", baseTime, condition.LastTransitionTime.Time),
fmt.Sprintf("[%v] INFO: Message: %s ", baseTime, condition.Message),
)
} else {
logs = append(logs, fmt.Sprintf("[%v] INFO: No conditions available", baseTime))
}
return logs
}
package wds
import (
"fmt"
"os"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
/*
Load the KubeConfig file and return the kubernetes clientset which gives you access to play with the k8s api
*/
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
func GetClientSetKubeConfig() (*kubernetes.Clientset, error) {
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
if home := homeDir(); home != "" {
kubeconfig = fmt.Sprintf("%s/.kube/config", home)
}
}
// Load the kubeconfig file
config, err := clientcmd.LoadFromFile(kubeconfig)
if err != nil {
// c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to load kubeconfig"})
return nil, fmt.Errorf("failed to load kubeconfig")
}
// Use WDS1 context specifically
ctxContext := config.Contexts["wds1"]
if ctxContext == nil {
// c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create ctxConfig"})
return nil, fmt.Errorf("failed to create ctxConfig")
}
// Create config for WDS cluster
clientConfig := clientcmd.NewDefaultClientConfig(
*config,
&clientcmd.ConfigOverrides{
CurrentContext: "wds1",
},
)
restConfig, err := clientConfig.ClientConfig()
if err != nil {
return nil, fmt.Errorf("failed to create restconfig")
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client")
}
return clientset, nil
}
Reference:
- https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
- (Imp) https://github.com/kubernetes/sample-controller/blob/master/controller.go#L110-L114
- (Imp) https://medium.com/speechmatics/how-to-write-kubernetes-custom-controllers-in-go-8014c4a04235
- (Youtube) https://www.infracloud.io/kubernetes-school/writing-custom-k8s-controller/writing-a-kubernetes-custom-controller/
- https://chishengliu.com/posts/kubernetes-operator-sample-controller/
- https://www.nakamasato.com/kubernetes-training/kubernetes-operator/client-go/informer/#implementation-sharedinformerfactory
- https://buraksekili.github.io/articles/controller-runtime-1/