Skip to content

Commit ff44591

Browse files
Add parameters on 'sparkctl create' to delete SparkApplication if exists and to follow logs (#1506)
1 parent 0e919bf commit ff44591

File tree

3 files changed

+82
-25
lines changed

3 files changed

+82
-25
lines changed

sparkctl/cmd/create.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ import (
4141

4242
const bufferSize = 1024
4343

44+
var DeleteIfExists bool
45+
var LogsEnabled bool
4446
var RootPath string
4547
var UploadToPath string
4648
var UploadToEndpoint string
@@ -90,6 +92,10 @@ var createCmd = &cobra.Command{
9092
}
9193

9294
func init() {
95+
createCmd.Flags().BoolVarP(&DeleteIfExists, "delete", "d", false,
96+
"delete the SparkApplication if already exists")
97+
createCmd.Flags().BoolVarP(&LogsEnabled, "logs", "l", false,
98+
"watch the SparkApplication logs")
9399
createCmd.Flags().StringVarP(&UploadToPath, "upload-to", "u", "",
94100
"the name of the bucket where local application dependencies are to be uploaded")
95101
createCmd.Flags().StringVarP(&RootPath, "upload-prefix", "p", "",
@@ -151,6 +157,10 @@ func createFromScheduledSparkApplication(name string, kubeClient clientset.Inter
151157
}
152158

153159
func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset.Interface, crdClient crdclientset.Interface) error {
160+
if DeleteIfExists {
161+
deleteSparkApplication(app.Name, crdClient)
162+
}
163+
154164
v1beta2.SetSparkApplicationDefaults(app)
155165
if err := validateSpec(app.Spec); err != nil {
156166
return err
@@ -177,6 +187,10 @@ func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset.
177187

178188
fmt.Printf("SparkApplication \"%s\" created\n", app.Name)
179189

190+
if LogsEnabled {
191+
doLog(app.Name, true, kubeClient, crdClient)
192+
}
193+
180194
return nil
181195
}
182196

sparkctl/cmd/delete.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,15 @@ var deleteCmd = &cobra.Command{
5151
}
5252

5353
func doDelete(name string, crdClientset crdclientset.Interface) error {
54-
err := crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
55-
if err != nil {
54+
if err := deleteSparkApplication(name, crdClientset); err != nil {
5655
return err
5756
}
5857

5958
fmt.Printf("SparkApplication \"%s\" deleted\n", name)
6059

6160
return nil
6261
}
62+
63+
func deleteSparkApplication(name string, crdClientset crdclientset.Interface) error {
64+
return crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
65+
}

sparkctl/cmd/log.go

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"fmt"
2222
"io"
2323
"os"
24-
"strings"
24+
"time"
2525

2626
"github.com/spf13/cobra"
2727

@@ -57,7 +57,7 @@ var logCommand = &cobra.Command{
5757
return
5858
}
5959

60-
if err := doLog(args[0], kubeClientset, crdClientset); err != nil {
60+
if err := doLog(args[0], FollowLogs, kubeClientset, crdClientset); err != nil {
6161
fmt.Fprintf(os.Stderr, "failed to get driver logs of SparkApplication %s: %v\n", args[0], err)
6262
}
6363
},
@@ -69,36 +69,76 @@ func init() {
6969
logCommand.Flags().BoolVarP(&FollowLogs, "follow", "f", false, "whether to stream the logs")
7070
}
7171

72-
func doLog(name string, kubeClientset clientset.Interface, crdClientset crdclientset.Interface) error {
73-
app, err := crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Get(context.TODO(), name, metav1.GetOptions{})
74-
if err != nil {
75-
return fmt.Errorf("failed to get SparkApplication %s: %v", name, err)
76-
}
72+
func doLog(
73+
name string,
74+
followLogs bool,
75+
kubeClient clientset.Interface,
76+
crdClient crdclientset.Interface) error {
7777

78+
timeout := 30 * time.Second
79+
80+
podNameChannel := getPodNameChannel(name, crdClient)
7881
var podName string
79-
if ExecutorId < 0 {
80-
podName = app.Status.DriverInfo.PodName
81-
} else {
82-
podName = strings.NewReplacer("driver", fmt.Sprintf("exec-%d", ExecutorId)).
83-
Replace(app.Status.DriverInfo.PodName)
82+
83+
select {
84+
case podName = <-podNameChannel:
85+
case <-time.After(timeout):
86+
return fmt.Errorf("not found pod name")
8487
}
8588

86-
if podName == "" {
87-
return fmt.Errorf("unable to fetch logs as the name of the target pod is empty")
89+
waitLogsChannel := waitForLogsFromPodChannel(podName, kubeClient, crdClient)
90+
91+
select {
92+
case <-waitLogsChannel:
93+
case <-time.After(timeout):
94+
return fmt.Errorf("timeout to fetch logs from pod \"%s\"", podName)
8895
}
8996

90-
out := os.Stdout
91-
if FollowLogs {
92-
if err := streamLogs(out, kubeClientset, podName); err != nil {
93-
return err
94-
}
97+
if followLogs {
98+
return streamLogs(os.Stdout, kubeClient, podName)
9599
} else {
96-
if err := printLogs(out, kubeClientset, podName); err != nil {
97-
return err
98-
}
100+
return printLogs(os.Stdout, kubeClient, podName)
99101
}
102+
}
100103

101-
return nil
104+
func getPodNameChannel(
105+
sparkApplicationName string,
106+
crdClient crdclientset.Interface) chan string {
107+
108+
channel := make(chan string, 1)
109+
go func() {
110+
for true {
111+
app, _ := crdClient.SparkoperatorV1beta2().SparkApplications(Namespace).Get(
112+
context.TODO(),
113+
sparkApplicationName,
114+
metav1.GetOptions{})
115+
116+
if app.Status.DriverInfo.PodName != "" {
117+
channel <- app.Status.DriverInfo.PodName
118+
break
119+
}
120+
}
121+
}()
122+
return channel
123+
}
124+
125+
func waitForLogsFromPodChannel(
126+
podName string,
127+
kubeClient clientset.Interface,
128+
crdClient crdclientset.Interface) chan bool {
129+
130+
channel := make(chan bool, 1)
131+
go func() {
132+
for true {
133+
_, err := kubeClient.CoreV1().Pods(Namespace).GetLogs(podName, &apiv1.PodLogOptions{}).Do(context.TODO()).Raw()
134+
135+
if err == nil {
136+
channel <- true
137+
break
138+
}
139+
}
140+
}()
141+
return channel
102142
}
103143

104144
// printLogs is a one time operation that prints the fetched logs of the given pod.

0 commit comments

Comments
 (0)