diff --git a/README.md b/README.md index 34a9d01..207e48a 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,13 @@ Options: stopping the import --replace-tables Replace tables if they already exist + --update-tables Manages an extra table .csvs-meta that keeps + track of each CSV file and the checksum of the + file. On subsequent runs, the CSVs will be + compared against the checksum in the table to + see what has updated, and only those specific + tables will be replaced. + -t, --table TEXT Table to use (instead of using CSV filename) -c, --extract-column TEXT One or more columns to 'extract' into a separate lookup table. If you pass a simple diff --git a/csvs_to_sqlite/cli.py b/csvs_to_sqlite/cli.py index 310f0f3..ce094bd 100644 --- a/csvs_to_sqlite/cli.py +++ b/csvs_to_sqlite/cli.py @@ -14,6 +14,8 @@ load_csv, refactor_dataframes, table_exists, + table_outdated, + update_csv_meta, drop_table, to_sql_with_foreign_keys, ) @@ -39,6 +41,18 @@ @click.option( "--replace-tables", is_flag=True, help="Replace tables if they already exist" ) +@click.option( + "--update-tables", + is_flag=True, + help=( + "Manages an extra table .csvs-meta that keeps " + "track of each CSV file and the checksum of the " + "file. On subsequent runs, the CSVs will be " + "compared against the checksum in the table " + "to see what has updated, and only those " + "specific tables will be replaced." + ), +) @click.option( "--table", "-t", help="Table to use (instead of using CSV filename)", default=None ) @@ -153,6 +167,7 @@ def cli( quoting, skip_errors, replace_tables, + update_tables, table, extract_column, date, @@ -195,32 +210,36 @@ def cli( sql_type_overrides = None for name, path in csvs.items(): try: - df = load_csv( - path, separator, skip_errors, quoting, shape, just_strings=just_strings - ) - df.table_name = table or name - if filename_column: - df[filename_column] = name - if shape: - shape += ",{}".format(filename_column) - if fixed_columns: - for colname, value in fixed_columns: - df[colname] = value + if not update_tables or table_outdated(conn, path): + df = load_csv( + path, separator, skip_errors, quoting, shape, just_strings=just_strings + ) + df.table_name = table or name + if filename_column: + df[filename_column] = name if shape: - shape += ",{}".format(colname) - if fixed_columns_int: - for colname, value in fixed_columns_int: - df[colname] = value - if shape: - shape += ",{}".format(colname) - if fixed_columns_float: - for colname, value in fixed_columns_float: - df[colname] = value - if shape: - shape += ",{}".format(colname) - sql_type_overrides = apply_shape(df, shape) - apply_dates_and_datetimes(df, date, datetime, datetime_format) - dataframes.append(df) + shape += ",{}".format(filename_column) + if fixed_columns: + for colname, value in fixed_columns: + df[colname] = value + if shape: + shape += ",{}".format(colname) + if fixed_columns_int: + for colname, value in fixed_columns_int: + df[colname] = value + if shape: + shape += ",{}".format(colname) + if fixed_columns_float: + for colname, value in fixed_columns_float: + df[colname] = value + if shape: + shape += ",{}".format(colname) + sql_type_overrides = apply_shape(df, shape) + apply_dates_and_datetimes(df, date, datetime, datetime_format) + dataframes.append(df) + + if update_tables: + update_csv_meta(conn, path) except LoadCsvError as e: click.echo("Could not load {}: {}".format(path, e), err=True) @@ -245,7 +264,7 @@ def cli( for df in refactored: # This is a bit trickier because we need to # create the table with extra SQL for foreign keys - if replace_tables and table_exists(conn, df.table_name): + if (replace_tables or update_tables) and table_exists(conn, df.table_name): drop_table(conn, df.table_name) if table_exists(conn, df.table_name): df.to_sql(df.table_name, conn, if_exists="append", index=False) @@ -284,12 +303,18 @@ def cli( conn.close() - if db_existed: + if db_existed and not update_tables: click.echo( "Added {} CSV file{} to {}".format( len(csvs), "" if len(csvs) == 1 else "s", dbname ) ) + elif db_existed and update_tables: + click.echo( + "Updated {} CSV file{} in {}".format( + len(dataframes), "" if len(dataframes) == 1 else "s", dbname + ) + ) else: click.echo( "Created {} from {} CSV file{}".format( diff --git a/csvs_to_sqlite/utils.py b/csvs_to_sqlite/utils.py index 5a2bca2..8ca96cc 100644 --- a/csvs_to_sqlite/utils.py +++ b/csvs_to_sqlite/utils.py @@ -1,5 +1,6 @@ import dateparser import os +import stat import fnmatch import hashlib import lru @@ -264,6 +265,36 @@ def table_exists(conn, table): ).fetchone()[0] +def csv_md5_checksum(path): + with open(path, "rb") as f: + file_hash = hashlib.md5() + for chunk in iter(lambda: f.read(8192), b''): + file_hash.update(chunk) + return file_hash.hexdigest() + + +def table_outdated(conn, path): + csv_cs = csv_md5_checksum(path) + + if table_exists(conn, ".csvs-meta"): + csv_meta_entry = conn.execute("select * from [.csvs-meta] where csv_path=?", [path]).fetchone() + if csv_meta_entry is None: + return True + return csv_cs != csv_meta_entry[1] + return True + + +def update_csv_meta(conn, path): + conn.execute("CREATE TABLE IF NOT EXISTS [.csvs-meta] (csv_path TEXT PRIMARY KEY, md5_checksum INTEGER)") + + csv_modified = csv_md5_checksum(path) + conn.execute( + """ + INSERT INTO [.csvs-meta] VALUES (?, ?) + ON CONFLICT(csv_path) DO UPDATE SET md5_checksum=excluded.md5_checksum; + """, [path, csv_modified]) + + def drop_table(conn, table): conn.execute("DROP TABLE [{}]".format(table))