From 1b4c31cd3fb2c34910a07e1d820682b0dbaefedd Mon Sep 17 00:00:00 2001 From: Nikolay Karpov Date: Sun, 24 Nov 2024 03:23:49 -0800 Subject: [PATCH 1/4] add ReadCsv Signed-off-by: Nikolay Karpov --- sdp/processors/modify_manifest/common.py | 10 ++++- .../modify_manifest/create_manifest.py | 41 ++++++++++++++++++- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/sdp/processors/modify_manifest/common.py b/sdp/processors/modify_manifest/common.py index d4e8fad6..adb436df 100644 --- a/sdp/processors/modify_manifest/common.py +++ b/sdp/processors/modify_manifest/common.py @@ -15,11 +15,12 @@ import json import os from pathlib import Path -from typing import Dict, List, Union, Optional +from typing import Dict, List, Optional, Union import pandas as pd from tqdm import tqdm +from sdp.logging import logger from sdp.processors.base_processor import ( BaseParallelProcessor, BaseProcessor, @@ -27,6 +28,7 @@ ) from sdp.utils.common import load_manifest + class CombineSources(BaseParallelProcessor): """Can be used to create a single field from two alternative sources. @@ -368,18 +370,22 @@ def __init__( left_manifest_file: Optional[str], right_manifest_file: str, column_id: Union[str, List[str], None] = None, + how: str = "inner", **kwargs, ): super().__init__(**kwargs) self.left_manifest_file = left_manifest_file if left_manifest_file != None else self.input_manifest_file self.right_manifest_file = right_manifest_file self.column_id = column_id + self.how = how def process(self): m1 = pd.DataFrame.from_records(load_manifest(Path(self.left_manifest_file))) m2 = pd.DataFrame.from_records(load_manifest(Path(self.right_manifest_file))) - m3 = pd.merge(m1, m2, on=self.column_id, how="inner") + m3 = pd.merge(m1, m2, on=self.column_id, how=self.how) with open(self.output_manifest_file, "wt", encoding="utf8") as fout: for _, line in m3.iterrows(): fout.write(json.dumps(dict(line), ensure_ascii=False) + "\n") + + logger.info("Total number of entries after join: " + str(m3.shape[0])) diff --git a/sdp/processors/modify_manifest/create_manifest.py b/sdp/processors/modify_manifest/create_manifest.py index 77096b3e..df7bbf9b 100644 --- a/sdp/processors/modify_manifest/create_manifest.py +++ b/sdp/processors/modify_manifest/create_manifest.py @@ -12,9 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json from pathlib import Path -from sdp.processors.base_processor import BaseParallelProcessor, DataEntry +import pandas as pd + +from sdp.processors.base_processor import ( + BaseParallelProcessor, + BaseProcessor, + DataEntry, +) class CreateInitialManifestByExt(BaseParallelProcessor): @@ -48,3 +55,35 @@ def read_manifest(self): def process_dataset_entry(self, data_entry): data = {self.output_file_key: data_entry} return [DataEntry(data=data)] + + +class ReadCsv(BaseProcessor): + """ + Processor for reading a CSV file and converting its content into a JSON lines format. + + This class reads a CSV file using pandas, processes each row, and writes the output to a specified manifest file in JSON lines format. + + Args: + header (int, optional): Row number to use as the column names. Defaults to None, meaning no header. + sep (str, optional): Delimiter to use for separating values in the CSV file. Defaults to ','. + **kwargs: Additional keyword arguments to be passed to the base class `BaseProcessor`. + + Methods: + process(): Reads the input CSV file and writes its content as JSON lines to the output manifest file. + """ + + def __init__( + self, + header: int = None, + sep: str = ",", + **kwargs, + ): + super().__init__(**kwargs) + self.header = header + self.sep = sep + + def process(self): + df1 = pd.read_csv(self.input_manifest_file, header=self.header, sep=self.sep) + with open(self.output_manifest_file, "w") as out_file: + for j, row in df1.iterrows(): + out_file.write(json.dumps(dict(row), ensure_ascii=False) + "\n") From f0133ac82e1a14133476e4744357ea8ff913f7a2 Mon Sep 17 00:00:00 2001 From: Nikolay Karpov Date: Sun, 24 Nov 2024 04:21:32 -0800 Subject: [PATCH 2/4] rename to ApplyMerge Signed-off-by: Nikolay Karpov --- sdp/processors/modify_manifest/common.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sdp/processors/modify_manifest/common.py b/sdp/processors/modify_manifest/common.py index adb436df..8d13dfbd 100644 --- a/sdp/processors/modify_manifest/common.py +++ b/sdp/processors/modify_manifest/common.py @@ -349,7 +349,18 @@ def process(self): class ApplyInnerJoin(BaseProcessor): - """Applies inner join to two manifests, i.e. creates a manifest from records that have matching values in both manifests. + def __init__( + self, + left_manifest_file: Optional[str], + right_manifest_file: str, + column_id: Union[str, List[str], None] = None, + **kwargs, + ): + raise DeprecationWarning("Use processor ApplyMerge instead") + + +class ApplyMerge(BaseProcessor): + """Applies merge to two manifests, i.e. creates a manifest from records that have matching values in both manifests. For more information, please refer to the Pandas merge function documentation: https://pandas.pydata.org/docs/reference/api/pandas.merge.html#pandas.merge @@ -358,11 +369,13 @@ class ApplyInnerJoin(BaseProcessor): column_id (Union[str, List[str], None]): Field names to join on. These must be found in both manifests. If `column_id` is None then this defaults to the intersection of the columns in both manifests. Defaults to None. + how (str): Similar to "how" parameter in DataFrame.merge(how=""), can be "left", "right", "outer", "inner", "cross", default "inner". + See more https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html left_manifest_file (Optional[str]): path to the left manifest. Defaults to `input_manifest_file`. right_manifest_file (str): path to the right manifest. Returns: - Inner join of two manifests. + Merge of two manifests. """ def __init__( From 39cc32f977d41761296f132c12929e97eb05a285 Mon Sep 17 00:00:00 2001 From: Nikolay Karpov Date: Fri, 13 Dec 2024 01:15:24 -0800 Subject: [PATCH 3/4] rename to ApplyMerge Signed-off-by: Nikolay Karpov --- docs/src/sdp/api.rst | 2 +- sdp/processors/__init__.py | 3 ++- tests/test_modify_manifest.py | 5 +++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/src/sdp/api.rst b/docs/src/sdp/api.rst index 6d85e83d..11e51f58 100644 --- a/docs/src/sdp/api.rst +++ b/docs/src/sdp/api.rst @@ -274,7 +274,7 @@ Miscellaneous .. autodata:: sdp.processors.CreateInitialManifestByExt :annotation: -.. autodata:: sdp.processors.ApplyInnerJoin +.. autodata:: sdp.processors.ApplyMerge :annotation: .. _sdp-base-classes: diff --git a/sdp/processors/__init__.py b/sdp/processors/__init__.py index fdafb521..82345697 100644 --- a/sdp/processors/__init__.py +++ b/sdp/processors/__init__.py @@ -58,6 +58,7 @@ from sdp.processors.modify_manifest.common import ( AddConstantFields, ApplyInnerJoin, + ApplyMerge, ChangeToRelativePath, CombineSources, DuplicateFields, @@ -95,8 +96,8 @@ DropLowWordMatchRate, DropNonAlphabet, DropOnAttribute, - PreserveByValue, DropRepeatedFields, + PreserveByValue, ) from sdp.processors.modify_manifest.make_letters_uppercase_after_period import ( MakeLettersUppercaseAfterPeriod, diff --git a/tests/test_modify_manifest.py b/tests/test_modify_manifest.py index 99583c26..0836bf94 100644 --- a/tests/test_modify_manifest.py +++ b/tests/test_modify_manifest.py @@ -19,7 +19,7 @@ import pytest -from sdp.processors import ApplyInnerJoin, DropNonAlphabet +from sdp.processors import ApplyMerge, DropNonAlphabet def _write_manifest(manifest: Path, entries: List[Dict[str, Union[str, float]]]): @@ -154,11 +154,12 @@ def test_apply_inner_join( _write_manifest(manifest1, input1) _write_manifest(manifest2, input2) - processor = ApplyInnerJoin( + processor = ApplyMerge( left_manifest_file=manifest1, right_manifest_file=manifest2, column_id=coloumn_id, output_manifest_file=manifest_out, + how="inner", ) processor.process() From f288950f99d62e7b127ae1f2851c12ee447b7998 Mon Sep 17 00:00:00 2001 From: Nikolay Karpov Date: Fri, 21 Feb 2025 06:58:21 -0800 Subject: [PATCH 4/4] rm ApplyInnerJoin Signed-off-by: Nikolay Karpov --- sdp/processors/__init__.py | 11 +++++++---- sdp/processors/modify_manifest/common.py | 14 +++----------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/sdp/processors/__init__.py b/sdp/processors/__init__.py index f6c7de05..f9ac7967 100644 --- a/sdp/processors/__init__.py +++ b/sdp/processors/__init__.py @@ -24,9 +24,6 @@ from sdp.processors.datasets.fleurs.create_initial_manifest import ( CreateInitialManifestFleurs, ) -from sdp.processors.datasets.uzbekvoice.create_initial_manifest import ( - CreateInitialManifestUzbekvoice, -) from sdp.processors.datasets.ksc2.create_initial_manifest import ( CreateInitialManifestKSC2, ) @@ -51,6 +48,9 @@ CreateInitialManifestSLR140, CustomDataSplitSLR140, ) +from sdp.processors.datasets.uzbekvoice.create_initial_manifest import ( + CreateInitialManifestUzbekvoice, +) from sdp.processors.datasets.voxpopuli.create_initial_manifest import ( CreateInitialManifestVoxpopuli, ) @@ -70,7 +70,10 @@ SortManifest, SplitOnFixedDuration, ) -from sdp.processors.modify_manifest.create_manifest import CreateInitialManifestByExt +from sdp.processors.modify_manifest.create_manifest import ( + CreateInitialManifestByExt, + ReadCsv, +) from sdp.processors.modify_manifest.data_to_data import ( CountNumWords, FfmpegConvert, diff --git a/sdp/processors/modify_manifest/common.py b/sdp/processors/modify_manifest/common.py index 8d13dfbd..6a3076a1 100644 --- a/sdp/processors/modify_manifest/common.py +++ b/sdp/processors/modify_manifest/common.py @@ -348,17 +348,6 @@ def process(self): fout.write(json.dumps(new_line, ensure_ascii=False) + "\n") -class ApplyInnerJoin(BaseProcessor): - def __init__( - self, - left_manifest_file: Optional[str], - right_manifest_file: str, - column_id: Union[str, List[str], None] = None, - **kwargs, - ): - raise DeprecationWarning("Use processor ApplyMerge instead") - - class ApplyMerge(BaseProcessor): """Applies merge to two manifests, i.e. creates a manifest from records that have matching values in both manifests. For more information, please refer to the Pandas merge function documentation: @@ -402,3 +391,6 @@ def process(self): fout.write(json.dumps(dict(line), ensure_ascii=False) + "\n") logger.info("Total number of entries after join: " + str(m3.shape[0])) + + +ApplyInnerJoin = ApplyMerge