diff --git a/docs/src/sdp/api.rst b/docs/src/sdp/api.rst index 23d58495..4400cd4c 100644 --- a/docs/src/sdp/api.rst +++ b/docs/src/sdp/api.rst @@ -279,7 +279,7 @@ Miscellaneous .. autodata:: sdp.processors.CreateInitialManifestByExt :annotation: -.. autodata:: sdp.processors.ApplyInnerJoin +.. autodata:: sdp.processors.ApplyMerge :annotation: .. _sdp-base-classes: diff --git a/sdp/processors/__init__.py b/sdp/processors/__init__.py index 23079d84..f9ac7967 100644 --- a/sdp/processors/__init__.py +++ b/sdp/processors/__init__.py @@ -24,9 +24,6 @@ from sdp.processors.datasets.fleurs.create_initial_manifest import ( CreateInitialManifestFleurs, ) -from sdp.processors.datasets.uzbekvoice.create_initial_manifest import ( - CreateInitialManifestUzbekvoice, -) from sdp.processors.datasets.ksc2.create_initial_manifest import ( CreateInitialManifestKSC2, ) @@ -51,6 +48,9 @@ CreateInitialManifestSLR140, CustomDataSplitSLR140, ) +from sdp.processors.datasets.uzbekvoice.create_initial_manifest import ( + CreateInitialManifestUzbekvoice, +) from sdp.processors.datasets.voxpopuli.create_initial_manifest import ( CreateInitialManifestVoxpopuli, ) @@ -61,6 +61,7 @@ from sdp.processors.modify_manifest.common import ( AddConstantFields, ApplyInnerJoin, + ApplyMerge, ChangeToRelativePath, CombineSources, DuplicateFields, @@ -69,7 +70,10 @@ SortManifest, SplitOnFixedDuration, ) -from sdp.processors.modify_manifest.create_manifest import CreateInitialManifestByExt +from sdp.processors.modify_manifest.create_manifest import ( + CreateInitialManifestByExt, + ReadCsv, +) from sdp.processors.modify_manifest.data_to_data import ( CountNumWords, FfmpegConvert, @@ -98,8 +102,8 @@ DropLowWordMatchRate, DropNonAlphabet, DropOnAttribute, - PreserveByValue, DropRepeatedFields, + PreserveByValue, ) from sdp.processors.modify_manifest.make_letters_uppercase_after_period import ( MakeLettersUppercaseAfterPeriod, diff --git a/sdp/processors/modify_manifest/common.py b/sdp/processors/modify_manifest/common.py index d4e8fad6..6a3076a1 100644 --- a/sdp/processors/modify_manifest/common.py +++ b/sdp/processors/modify_manifest/common.py @@ -15,11 +15,12 @@ import json import os from pathlib import Path -from typing import Dict, List, Union, Optional +from typing import Dict, List, Optional, Union import pandas as pd from tqdm import tqdm +from sdp.logging import logger from sdp.processors.base_processor import ( BaseParallelProcessor, BaseProcessor, @@ -27,6 +28,7 @@ ) from sdp.utils.common import load_manifest + class CombineSources(BaseParallelProcessor): """Can be used to create a single field from two alternative sources. @@ -346,8 +348,8 @@ def process(self): fout.write(json.dumps(new_line, ensure_ascii=False) + "\n") -class ApplyInnerJoin(BaseProcessor): - """Applies inner join to two manifests, i.e. creates a manifest from records that have matching values in both manifests. +class ApplyMerge(BaseProcessor): + """Applies merge to two manifests, i.e. creates a manifest from records that have matching values in both manifests. For more information, please refer to the Pandas merge function documentation: https://pandas.pydata.org/docs/reference/api/pandas.merge.html#pandas.merge @@ -356,11 +358,13 @@ class ApplyInnerJoin(BaseProcessor): column_id (Union[str, List[str], None]): Field names to join on. These must be found in both manifests. If `column_id` is None then this defaults to the intersection of the columns in both manifests. Defaults to None. + how (str): Similar to "how" parameter in DataFrame.merge(how=""), can be "left", "right", "outer", "inner", "cross", default "inner". + See more https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html left_manifest_file (Optional[str]): path to the left manifest. Defaults to `input_manifest_file`. right_manifest_file (str): path to the right manifest. Returns: - Inner join of two manifests. + Merge of two manifests. """ def __init__( @@ -368,18 +372,25 @@ def __init__( left_manifest_file: Optional[str], right_manifest_file: str, column_id: Union[str, List[str], None] = None, + how: str = "inner", **kwargs, ): super().__init__(**kwargs) self.left_manifest_file = left_manifest_file if left_manifest_file != None else self.input_manifest_file self.right_manifest_file = right_manifest_file self.column_id = column_id + self.how = how def process(self): m1 = pd.DataFrame.from_records(load_manifest(Path(self.left_manifest_file))) m2 = pd.DataFrame.from_records(load_manifest(Path(self.right_manifest_file))) - m3 = pd.merge(m1, m2, on=self.column_id, how="inner") + m3 = pd.merge(m1, m2, on=self.column_id, how=self.how) with open(self.output_manifest_file, "wt", encoding="utf8") as fout: for _, line in m3.iterrows(): fout.write(json.dumps(dict(line), ensure_ascii=False) + "\n") + + logger.info("Total number of entries after join: " + str(m3.shape[0])) + + +ApplyInnerJoin = ApplyMerge diff --git a/sdp/processors/modify_manifest/create_manifest.py b/sdp/processors/modify_manifest/create_manifest.py index 77096b3e..df7bbf9b 100644 --- a/sdp/processors/modify_manifest/create_manifest.py +++ b/sdp/processors/modify_manifest/create_manifest.py @@ -12,9 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json from pathlib import Path -from sdp.processors.base_processor import BaseParallelProcessor, DataEntry +import pandas as pd + +from sdp.processors.base_processor import ( + BaseParallelProcessor, + BaseProcessor, + DataEntry, +) class CreateInitialManifestByExt(BaseParallelProcessor): @@ -48,3 +55,35 @@ def read_manifest(self): def process_dataset_entry(self, data_entry): data = {self.output_file_key: data_entry} return [DataEntry(data=data)] + + +class ReadCsv(BaseProcessor): + """ + Processor for reading a CSV file and converting its content into a JSON lines format. + + This class reads a CSV file using pandas, processes each row, and writes the output to a specified manifest file in JSON lines format. + + Args: + header (int, optional): Row number to use as the column names. Defaults to None, meaning no header. + sep (str, optional): Delimiter to use for separating values in the CSV file. Defaults to ','. + **kwargs: Additional keyword arguments to be passed to the base class `BaseProcessor`. + + Methods: + process(): Reads the input CSV file and writes its content as JSON lines to the output manifest file. + """ + + def __init__( + self, + header: int = None, + sep: str = ",", + **kwargs, + ): + super().__init__(**kwargs) + self.header = header + self.sep = sep + + def process(self): + df1 = pd.read_csv(self.input_manifest_file, header=self.header, sep=self.sep) + with open(self.output_manifest_file, "w") as out_file: + for j, row in df1.iterrows(): + out_file.write(json.dumps(dict(row), ensure_ascii=False) + "\n") diff --git a/tests/test_modify_manifest.py b/tests/test_modify_manifest.py index 99583c26..0836bf94 100644 --- a/tests/test_modify_manifest.py +++ b/tests/test_modify_manifest.py @@ -19,7 +19,7 @@ import pytest -from sdp.processors import ApplyInnerJoin, DropNonAlphabet +from sdp.processors import ApplyMerge, DropNonAlphabet def _write_manifest(manifest: Path, entries: List[Dict[str, Union[str, float]]]): @@ -154,11 +154,12 @@ def test_apply_inner_join( _write_manifest(manifest1, input1) _write_manifest(manifest2, input2) - processor = ApplyInnerJoin( + processor = ApplyMerge( left_manifest_file=manifest1, right_manifest_file=manifest2, column_id=coloumn_id, output_manifest_file=manifest_out, + how="inner", ) processor.process()