kubelet Code Implementation
func (k KubeletStub) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
In the Kubelet implementation code, if the preceding functions are defined as one line, the excessive part may be automatically wrapped due to the PDF file format restriction. As a result, the compilation fails. In this case, you need to manually merge the lines into one line.
// k8s test client package main import ( "flag" "fmt" "io/ioutil" "log" "net" "os" "os/signal" "path" "sync" "syscall" "time" "golang.org/x/net/context" "google.golang.org/grpc" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" ) const ( debug = false sleepTime = 5 grpcTimeOut = 10 testNum = 4 ) // KubeletStub for test type KubeletStub struct { sync.Mutex server *grpc.Server socket string pluginEndpoint string testDir string } // NewKubeletStub returns an initialized KubeletStub for testing purpose. func NewKubeletStub(socket string) *KubeletStub { return &KubeletStub{ socket: socket, } } // Register from Device Plugin func (k KubeletStub) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) { k.Lock() defer k.Unlock() k.pluginEndpoint = r.Endpoint go testTask(k.testDir, k.pluginEndpoint) return &pluginapi.Empty{}, nil } // Start Server and Register to Deivce Plugin func (k *KubeletStub) Start() error { os.Remove(k.socket) s, err := net.Listen("unix", k.socket) if err != nil { fmt.Printf("Can't listen at the socket: %+v", err) return err } k.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterRegistrationServer(k.server, k) go k.server.Serve(s) return nil } func main() { flag.Parse() testDir := os.TempDir() + "/hiai_plugin_test" if _, err := os.Stat(testDir); err != nil { log.Printf("%s not exist\n", testDir) return } log.Println(testDir) log.Println("Starting OS watcher.") sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) kubeletEndpoint := path.Join(testDir, "kubelet.sock") kubeletStub := NewKubeletStub(kubeletEndpoint) kubeletStub.testDir = testDir err := kubeletStub.Start() if err != nil { log.Println("KubeletStub Server Start Error") return } defer kubeletStub.server.Stop() log.Println("KubeletStub Server Start") time.Sleep(sleepTime * time.Second) select { case s := <-sigs: switch s { case syscall.SIGHUP: log.Println("Received SIGHUP, restarting.") removeAllSocks(testDir) os.Remove(kubeletEndpoint) default: log.Printf("Received signal \"%v\", shutting down.", s) os.Remove(kubeletEndpoint) removeAllSocks(testDir) break } } } func newOSWatcher(sigs ...os.Signal) chan os.Signal { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, sigs...) return sigChan } // Remove all sock file on the local node by walking `/testDir` directory. func removeAllSocks(testDir string) error { files, err := ioutil.ReadDir(testDir) if err != nil { return err } for _, f := range files { if f.IsDir() { continue } os.Remove(testDir + "/" + f.Name()) } return nil } func testTask(testDir, pluginEndpoint string) { devicePluginSock := path.Join(testDir, pluginEndpoint) // Verifies the grpcServer is ready to serve services. conn, err := grpc.Dial(devicePluginSock, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(grpcTimeOut * time.Second), grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", addr, timeout) })) if err != nil { log.Printf("Connect device plugin fail, %+v\n", err) return } client := pluginapi.NewDevicePluginClient(conn) // Tests ListAndWatch fmt.Printf("\n---> ListAndWatch\n") stream, err := client.ListAndWatch(context.Background(), &pluginapi.Empty{}) devs, err := stream.Recv() fmt.Printf("%+v\n\n", devs) time.Sleep(1 * time.Second) devices := make([]pluginapi.Device, 0) for _, d := range devs.Devices { dev := pluginapi.Device{ ID: d.ID, Health: d.Health, } devices = append(devices, dev) } allocateID := make([]string, 0) if len(devices) > testNum { allocateID = append(allocateID, devices[0].ID, devices[1].ID, devices[2].ID) } else if len(devices) == testNum { allocateID = append(allocateID, devices[0].ID, devices[1].ID) } else { allocateID = append(allocateID, devices[0].ID) } // Tests Allocate fmt.Printf("\n---> Allocate\n") resp, err := client.Allocate(context.Background(), &pluginapi.AllocateRequest{ ContainerRequests: []*pluginapi.ContainerAllocateRequest{ {DevicesIDs: allocateID}}}) fmt.Printf("%+v\n\n", resp) time.Sleep(1 * time.Second) conn.Close() }