Skip to content

Commit 1bbbb92

Browse files
committed
update cli
1 parent fca64ad commit 1bbbb92

5 files changed

Lines changed: 105 additions & 29 deletions

File tree

cli/cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ function-stream = { path = "../../" }
1616
protocol = { path = "../../protocol" }
1717
clap = { version = "4.5", features = ["derive"] }
1818
thiserror = "2"
19-
tokio = { version = "1.0", features = ["full"] }
19+
tokio = { version = "1.0", features = ["full", "signal"] }
2020
tonic = { version = "0.12", features = ["default"] }
2121
rustyline = { version = "14.0", features = ["with-dirs"] }
2222

cli/cli/src/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ mod repl;
1515
use clap::Parser;
1616
use repl::Repl;
1717
use std::process;
18+
use std::sync::Arc;
19+
use tokio::sync::Mutex;
1820

1921
#[derive(Parser, Debug)]
2022
#[command(name = "function-stream-cli")]
@@ -35,9 +37,9 @@ struct Args {
3537
async fn main() {
3638
let args = Args::parse();
3739

38-
let mut repl = Repl::new(args.host.clone(), args.port);
40+
let repl = Arc::new(Mutex::new(Repl::new(args.host.clone(), args.port)));
3941

40-
if let Err(e) = repl.run_async().await {
42+
if let Err(e) = Repl::run_async(repl).await {
4143
eprintln!("Error: {}", e);
4244
process::exit(1);
4345
}

cli/cli/src/repl.rs

Lines changed: 96 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use protocol::cli::{function_stream_service_client::FunctionStreamServiceClient,
2121
use rustyline::error::ReadlineError;
2222
use rustyline::{Config, DefaultEditor, EditMode};
2323
use std::io::{self, Cursor, Write};
24+
use std::sync::Arc;
25+
use tokio::sync::Mutex;
2426
use tonic::Request;
2527

2628
#[derive(Debug, thiserror::Error)]
@@ -269,22 +271,81 @@ impl Repl {
269271
}
270272
}
271273

272-
pub async fn run_async(&mut self) -> io::Result<()> {
273-
println!("Function Stream SQL Interface");
274-
println!("Server: {}\n", self.server_address());
275-
276-
if let Err(e) = self.connect().await {
277-
eprintln!("Error: {}", e);
278-
return Ok(());
274+
pub async fn run_async(repl: Arc<Mutex<Self>>) -> io::Result<()> {
275+
{
276+
let mut guard = repl.lock().await;
277+
println!("Function Stream SQL Interface");
278+
println!("Server: {}\n", guard.server_address());
279+
if let Err(e) = guard.connect().await {
280+
eprintln!("Error: {}", e);
281+
return Ok(());
282+
}
279283
}
280284

285+
#[cfg(unix)]
286+
let mut sigterm = {
287+
use tokio::signal::unix::{signal, SignalKind};
288+
signal(SignalKind::terminate()).expect("failed to register SIGTERM handler")
289+
};
290+
291+
let mut skip_save_history = false;
281292
loop {
282-
let input = match self.read_sql_input() {
283-
Ok(sql) => sql,
284-
Err(ReadlineError::Interrupted) => continue,
285-
Err(ReadlineError::Eof) => break,
286-
Err(e) => {
293+
let repl_clone = repl.clone();
294+
let read_result = {
295+
#[cfg(unix)]
296+
{
297+
tokio::select! {
298+
_ = tokio::signal::ctrl_c() => None,
299+
_ = sigterm.recv() => None,
300+
result = tokio::task::spawn_blocking(move || {
301+
let mut guard = repl_clone.blocking_lock();
302+
guard.read_sql_input()
303+
}) => match result {
304+
Ok(Ok(sql)) => Some(Ok(sql)),
305+
Ok(Err(e)) => Some(Err(e)),
306+
Err(e) => {
307+
eprintln!("Read Error: {}", e);
308+
None
309+
}
310+
},
311+
}
312+
}
313+
#[cfg(not(unix))]
314+
{
315+
tokio::select! {
316+
_ = tokio::signal::ctrl_c() => None,
317+
result = tokio::task::spawn_blocking(move || {
318+
let mut guard = repl_clone.blocking_lock();
319+
guard.read_sql_input()
320+
}) => match result {
321+
Ok(Ok(sql)) => Some(Ok(sql)),
322+
Ok(Err(e)) => Some(Err(e)),
323+
Err(e) => {
324+
eprintln!("Read Error: {}", e);
325+
None
326+
}
327+
},
328+
}
329+
}
330+
};
331+
332+
let input = match read_result {
333+
None => {
334+
skip_save_history = true;
335+
break;
336+
}
337+
Some(Ok(sql)) => sql,
338+
Some(Err(ReadlineError::Interrupted)) => {
339+
skip_save_history = true;
340+
break;
341+
}
342+
Some(Err(ReadlineError::Eof)) => {
343+
skip_save_history = true;
344+
break;
345+
}
346+
Some(Err(e)) => {
287347
eprintln!("Read Error: {}", e);
348+
skip_save_history = true;
288349
break;
289350
}
290351
};
@@ -293,28 +354,38 @@ impl Repl {
293354
continue;
294355
}
295356

357+
let mut guard = repl.lock().await;
296358
match input.trim().to_lowercase().as_str() {
297359
"exit" | "quit" | "q" => break,
298-
"help" | "h" => self.print_help(),
360+
"help" | "h" => guard.print_help(),
299361
_ => {
300-
if let Err(e) = self.execute_sql(&input).await {
362+
if let Err(e) = guard.execute_sql(&input).await {
301363
eprintln!("SQL Execution Error: {}", e);
364+
} else {
365+
guard.add_history_entry(&input);
302366
}
303367
}
304368
}
369+
drop(guard);
305370
println!();
306371
}
307372

308-
if let Some(ref mut ed) = self.editor {
309-
let _ = ed.save_history(".function-stream-cli-history");
373+
if !skip_save_history {
374+
if let Some(ref mut ed) = repl.lock().await.editor {
375+
let _ = ed.save_history(".function-stream-cli-history");
376+
}
310377
}
311378
Ok(())
312379
}
313380

314381
fn read_sql_input(&mut self) -> Result<String, ReadlineError> {
315382
let mut lines = Vec::new();
316383
loop {
317-
let prompt = if lines.is_empty() { "sql> " } else { " -> " };
384+
let prompt = if lines.is_empty() {
385+
"function-stream> "
386+
} else {
387+
" -> "
388+
};
318389

319390
let line = match self.editor.as_mut() {
320391
Some(ed) => ed.readline(prompt)?,
@@ -332,11 +403,6 @@ impl Repl {
332403
let trimmed = lines.last().map(|s| s.trim()).unwrap_or("");
333404

334405
if trimmed.ends_with(';') || self.is_balanced(&full_sql) {
335-
if let Some(ed) = self.editor.as_mut() {
336-
if !full_sql.is_empty() {
337-
let _ = ed.add_history_entry(full_sql.as_str());
338-
}
339-
}
340406
return Ok(full_sql);
341407
}
342408
}
@@ -348,6 +414,14 @@ impl Repl {
348414
open == close && !sql.trim().is_empty()
349415
}
350416

417+
fn add_history_entry(&mut self, entry: &str) {
418+
if let Some(ed) = self.editor.as_mut() {
419+
if !entry.trim().is_empty() {
420+
let _ = ed.add_history_entry(entry.trim());
421+
}
422+
}
423+
}
424+
351425
fn print_help(&self) {
352426
let mut table = Table::new();
353427
table

docs/sql-cli-guide-zh.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ SQL CLI 通过 gRPC 协议与远程 Server 通信。
5353
cargo run -p function-stream-cli -- -h <SERVER_HOST> -p <SERVER_PORT>
5454
```
5555

56-
- **终端提示符**:成功连接后显示 `sql>`
56+
- **终端提示符**:成功连接后显示 `function-stream>`
5757
- **输入规范**:支持多行输入,系统通过 `;`(分号)或括号平衡检测来判定语句结束并提交执行。
5858

5959
---
@@ -131,7 +131,7 @@ DROP FUNCTION go_processor_demo;
131131

132132
## 三、REPL 内建辅助指令
133133

134-
`sql>` 提示符下,除了标准 SQL 语句,还支持以下便捷指令
134+
`function-stream>` 提示符下,支持以下便捷指令
135135

136136
| 指令 | 简写 | 说明 |
137137
|-------|-----|---------------------------------|

docs/sql-cli-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ SQL CLI communicates with the remote Server via the gRPC protocol.
5353
cargo run -p function-stream-cli -- -h <SERVER_HOST> -p <SERVER_PORT>
5454
```
5555

56-
- **Terminal Prompt**: Displays `sql>` after successful connection.
56+
- **Terminal Prompt**: Displays `function-stream>` after successful connection.
5757
- **Input Specification**: Supports multi-line input; the system detects statement completion and submits for execution via `;` (semicolon) or parenthesis balancing.
5858

5959
---
@@ -131,7 +131,7 @@ DROP FUNCTION go_processor_demo;
131131

132132
## 3. REPL Built-in Auxiliary Commands
133133

134-
At the `sql>` prompt, in addition to standard SQL statements, the following convenient commands are also supported:
134+
At the `function-stream>` prompt, the following convenient commands are supported:
135135

136136
| Command | Short | Description |
137137
|---------|-------|---------------------------------------------------------------------------|

0 commit comments

Comments
 (0)