Skip to content

feat: add catalog API #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 319 additions & 0 deletions crates/paimon/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;

use async_trait::async_trait;
use chrono::Duration;

use crate::error::Result;
use crate::io::FileIO;
use crate::spec::{RowType, SchemaChange, TableSchema};

/// This interface is responsible for reading and writing metadata such as database/table from a paimon catalog.
///
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java#L42>
#[async_trait]
pub trait Catalog: Send + Sync {
fn default_database_info(&self) -> &str {
"default"
}

fn default_system_table_splitter_info(&self) -> &str {
"$"
}

fn default_system_database_name_info(&self) -> &str {
"sys"
}

/// Returns the warehouse root path containing all database directories in this catalog.
fn warehouse(&self) -> &str;

/// Returns the catalog options.
fn options(&self) -> &HashMap<String, String>;

/// Returns the FileIO instance.
fn file_io(&self) -> &FileIO;

/// Lists all databases in this catalog.
async fn list_databases(&self) -> Result<Vec<String>>;

/// Checks if a database exists in this catalog.
async fn database_exists(&self, database_name: &str) -> Result<bool>;

/// Creates a new database.
async fn create_database(
&self,
name: &str,
ignore_if_exists: bool,
properties: Option<HashMap<String, String>>,
) -> Result<()>;

/// Loads database properties.
async fn load_database_properties(&self, name: &str) -> Result<HashMap<String, String>>;

/// Drops a database.
async fn drop_database(
&self,
name: &str,
ignore_if_not_exists: bool,
cascade: bool,
) -> Result<()>;

/// Returns a Table instance for the specified identifier.
async fn get_table(&self, identifier: &Identifier) -> Result<Box<dyn Table>>;

/// Lists all tables in the specified database.
async fn list_tables(&self, database_name: &str) -> Result<Vec<String>>;

/// Checks if a table exists.
async fn table_exists(&self, identifier: &Identifier) -> Result<bool> {
match self.get_table(identifier).await {
Ok(_) => Ok(true),
Err(e) => match e {
crate::error::Error::TableNotExist { .. } => Ok(false),
_ => Err(e),
},
}
}

/// Drops a table.
async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()>;

/// Creates a new table.
async fn create_table(
&self,
identifier: &Identifier,
schema: TableSchema,
ignore_if_exists: bool,
) -> Result<()>;

/// Renames a table.
async fn rename_table(
&self,
from_table: &Identifier,
to_table: &Identifier,
ignore_if_not_exists: bool,
) -> Result<()>;

/// Alters an existing table.
async fn alter_table(
&self,
identifier: &Identifier,
changes: Vec<SchemaChange>,
ignore_if_not_exists: bool,
) -> Result<()>;

/// Drops a partition from the specified table.
async fn drop_partition(
&self,
identifier: &Identifier,
partitions: &HashMap<String, String>,
) -> Result<()>;

/// Returns whether this catalog is case-sensitive.
fn case_sensitive(&self) -> bool {
true
}
}

/// Identifies an object in a catalog.
///
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java#L35>
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Identifier {
database: String,
table: String,
}

impl Identifier {
pub const UNKNOWN_DATABASE: &'static str = "unknown";

/// Create a new identifier.
pub fn new(database: String, table: String) -> Self {
Self { database, table }
}

/// Get the table name.
pub fn database_name(&self) -> &str {
&self.database
}

/// Get the table name.
pub fn object_name(&self) -> &str {
&self.table
}

/// Get the full name of the identifier.
pub fn full_name(&self) -> String {
if self.database == Self::UNKNOWN_DATABASE {
self.table.clone()
} else {
format!("{}.{}", self.database, self.table)
}
}

/// Get the full name of the identifier with a specified character.
pub fn escaped_full_name(&self) -> String {
self.escaped_full_name_with_char('`')
}

/// Get the full name of the identifier with a specified character.
pub fn escaped_full_name_with_char(&self, escape_char: char) -> String {
format!(
"{0}{1}{0}.{0}{2}{0}",
escape_char, self.database, self.table
)
}

/// Create a new identifier.
pub fn create(db: &str, table: &str) -> Self {
Self::new(db.to_string(), table.to_string())
}
}

impl fmt::Display for Identifier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.full_name())
}
}

/// A table provides basic abstraction for a table type and table scan, and table read.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/table/Table.java#L41>
pub trait Table {
// ================== Table Metadata =====================

/// A name to identify this table.
fn name(&self) -> &str;

/// Returns the row type of this table.
fn row_type(&self) -> &RowType;

/// Partition keys of this table.
fn partition_keys(&self) -> Vec<String>;

/// Primary keys of this table.
fn primary_keys(&self) -> Vec<String>;

/// Options of this table.
fn options(&self) -> HashMap<String, String>;

/// Optional comment of this table.
fn comment(&self) -> Option<&String>;

// ================= Table Operations ====================

/// Copy this table with adding dynamic options.
fn copy(&self, dynamic_options: HashMap<String, String>) -> Box<dyn Table>;

/// Rollback table's state to a specific snapshot.
fn rollback_to(&mut self, snapshot_id: u64);

/// Create a tag from given snapshot.
fn create_tag(&mut self, tag_name: &str, from_snapshot_id: u64);

fn create_tag_with_retention(
&mut self,
tag_name: &str,
from_snapshot_id: u64,
time_retained: Duration,
);

/// Create a tag from the latest snapshot.
fn create_tag_from_latest(&mut self, tag_name: &str);

fn create_tag_from_latest_with_retention(&mut self, tag_name: &str, time_retained: Duration);

/// Delete a tag by name.
fn delete_tag(&mut self, tag_name: &str);

/// Rollback table's state to a specific tag.
fn rollback_to_tag(&mut self, tag_name: &str);

/// Create an empty branch.
fn create_branch(&mut self, branch_name: &str);

/// Create a branch from given snapshot.
fn create_branch_from_snapshot(&mut self, branch_name: &str, snapshot_id: u64);

/// Create a branch from given tag.
fn create_branch_from_tag(&mut self, branch_name: &str, tag_name: &str);

/// Delete a branch by branchName.
fn delete_branch(&mut self, branch_name: &str);
}
#[cfg(test)]
mod catalog_tests {
use super::*;

#[tokio::test]
async fn test_full_name_identifier() {
let database_name = "trade".to_string();
let table_name = "dwv_xxxxx".to_string();
let my_sign = "`".to_string();

let identifier = Identifier {
database: database_name.clone(),
table: table_name.clone(),
};

assert_eq!(identifier.database_name(), database_name);
assert_eq!(identifier.object_name(), table_name);
assert_eq!(
identifier.full_name(),
format!("{}.{}", database_name.clone(), table_name.clone())
);
assert_eq!(
identifier.escaped_full_name(),
format!(
"{0}{1}{0}.{0}{2}{0}",
my_sign,
database_name.clone(),
table_name.clone()
)
);
}

#[tokio::test]
async fn test_unkown_name_identifier() {
let database_name = "unknown".to_string();
let table_name = "dwv_xxxxx".to_string();
let my_sign = "`".to_string();

let identifier = Identifier {
database: database_name.to_string(),
table: table_name.clone(),
};

assert_eq!(identifier.database_name(), database_name);
assert_eq!(identifier.object_name(), table_name);
assert_eq!(identifier.full_name(), table_name.clone());
assert_eq!(
identifier.escaped_full_name(),
format!(
"{0}{1}{0}.{0}{2}{0}",
my_sign,
database_name.clone(),
table_name.clone()
)
);
}
}
32 changes: 31 additions & 1 deletion crates/paimon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use crate::catalog::Identifier;
use snafu::prelude::*;

/// Result type used in paimon.
pub type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -65,6 +65,36 @@ pub enum Error {
display("Paimon hitting invalid file index format: {}", message)
)]
FileIndexFormatInvalid { message: String },

#[snafu(display("Database {} is not empty.", database))]
DatabaseNotEmpty { database: String },

#[snafu(display("Database {} already exists.", database))]
DatabaseAlreadyExist { database: String },

#[snafu(display("Database {} does not exist.", database))]
DatabaseNotExist { database: String },

#[snafu(display("Can't do operation on system database."))]
ProcessSystemDatabase,

#[snafu(display("Table {} already exists.", identifier.full_name()))]
TableAlreadyExist { identifier: Identifier },

#[snafu(display("Table {} does not exist.", identifier.full_name()))]
TableNotExist { identifier: Identifier },

#[snafu(display("Partition {} do not exist in the table {}.", identifier.full_name(), partitions))]
PartitionNotExist {
identifier: Identifier,
partitions: String,
},

#[snafu(display("Column {} already exists.", column_name))]
ColumnAlreadyExist { column_name: String },

#[snafu(display("Column {} does not exist.", column_name))]
ColumnNotExist { column_name: String },
}

impl From<opendal::Error> for Error {
Expand Down
2 changes: 1 addition & 1 deletion crates/paimon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
mod error;
pub use error::Error;
pub use error::Result;

pub mod catalog;
pub mod file_index;
pub mod io;
pub mod spec;
Loading