Skip to content

add ReadCsv proccessor #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/sdp/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ Miscellaneous
.. autodata:: sdp.processors.CreateInitialManifestByExt
:annotation:

.. autodata:: sdp.processors.ApplyInnerJoin
.. autodata:: sdp.processors.ApplyMerge
:annotation:

.. _sdp-base-classes:
Expand Down
14 changes: 9 additions & 5 deletions sdp/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
from sdp.processors.datasets.fleurs.create_initial_manifest import (
CreateInitialManifestFleurs,
)
from sdp.processors.datasets.uzbekvoice.create_initial_manifest import (
CreateInitialManifestUzbekvoice,
)
from sdp.processors.datasets.ksc2.create_initial_manifest import (
CreateInitialManifestKSC2,
)
Expand All @@ -51,6 +48,9 @@
CreateInitialManifestSLR140,
CustomDataSplitSLR140,
)
from sdp.processors.datasets.uzbekvoice.create_initial_manifest import (
CreateInitialManifestUzbekvoice,
)
from sdp.processors.datasets.voxpopuli.create_initial_manifest import (
CreateInitialManifestVoxpopuli,
)
Expand All @@ -61,6 +61,7 @@
from sdp.processors.modify_manifest.common import (
AddConstantFields,
ApplyInnerJoin,
ApplyMerge,
ChangeToRelativePath,
CombineSources,
DuplicateFields,
Expand All @@ -69,7 +70,10 @@
SortManifest,
SplitOnFixedDuration,
)
from sdp.processors.modify_manifest.create_manifest import CreateInitialManifestByExt
from sdp.processors.modify_manifest.create_manifest import (
CreateInitialManifestByExt,
ReadCsv,
)
from sdp.processors.modify_manifest.data_to_data import (
CountNumWords,
FfmpegConvert,
Expand Down Expand Up @@ -98,8 +102,8 @@
DropLowWordMatchRate,
DropNonAlphabet,
DropOnAttribute,
PreserveByValue,
DropRepeatedFields,
PreserveByValue,
)
from sdp.processors.modify_manifest.make_letters_uppercase_after_period import (
MakeLettersUppercaseAfterPeriod,
Expand Down
21 changes: 16 additions & 5 deletions sdp/processors/modify_manifest/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
import json
import os
from pathlib import Path
from typing import Dict, List, Union, Optional
from typing import Dict, List, Optional, Union

import pandas as pd
from tqdm import tqdm

from sdp.logging import logger
from sdp.processors.base_processor import (
BaseParallelProcessor,
BaseProcessor,
DataEntry,
)
from sdp.utils.common import load_manifest


class CombineSources(BaseParallelProcessor):
"""Can be used to create a single field from two alternative sources.

Expand Down Expand Up @@ -346,8 +348,8 @@ def process(self):
fout.write(json.dumps(new_line, ensure_ascii=False) + "\n")


class ApplyInnerJoin(BaseProcessor):
"""Applies inner join to two manifests, i.e. creates a manifest from records that have matching values in both manifests.
class ApplyMerge(BaseProcessor):
"""Applies merge to two manifests, i.e. creates a manifest from records that have matching values in both manifests.
For more information, please refer to the Pandas merge function documentation:
https://pandas.pydata.org/docs/reference/api/pandas.merge.html#pandas.merge

Expand All @@ -356,30 +358,39 @@ class ApplyInnerJoin(BaseProcessor):
column_id (Union[str, List[str], None]): Field names to join on. These must be found in both manifests.
If `column_id` is None then this defaults to the intersection of the columns in both manifests.
Defaults to None.
how (str): Similar to "how" parameter in DataFrame.merge(how=""), can be "left", "right", "outer", "inner", "cross", default "inner".
See more https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.merge.html
left_manifest_file (Optional[str]): path to the left manifest. Defaults to `input_manifest_file`.
right_manifest_file (str): path to the right manifest.

Returns:
Inner join of two manifests.
Merge of two manifests.
"""

def __init__(
self,
left_manifest_file: Optional[str],
right_manifest_file: str,
column_id: Union[str, List[str], None] = None,
how: str = "inner",
**kwargs,
):
super().__init__(**kwargs)
self.left_manifest_file = left_manifest_file if left_manifest_file != None else self.input_manifest_file
self.right_manifest_file = right_manifest_file
self.column_id = column_id
self.how = how

def process(self):
m1 = pd.DataFrame.from_records(load_manifest(Path(self.left_manifest_file)))
m2 = pd.DataFrame.from_records(load_manifest(Path(self.right_manifest_file)))
m3 = pd.merge(m1, m2, on=self.column_id, how="inner")
m3 = pd.merge(m1, m2, on=self.column_id, how=self.how)

with open(self.output_manifest_file, "wt", encoding="utf8") as fout:
for _, line in m3.iterrows():
fout.write(json.dumps(dict(line), ensure_ascii=False) + "\n")

logger.info("Total number of entries after join: " + str(m3.shape[0]))


ApplyInnerJoin = ApplyMerge
41 changes: 40 additions & 1 deletion sdp/processors/modify_manifest/create_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from pathlib import Path

from sdp.processors.base_processor import BaseParallelProcessor, DataEntry
import pandas as pd

from sdp.processors.base_processor import (
BaseParallelProcessor,
BaseProcessor,
DataEntry,
)


class CreateInitialManifestByExt(BaseParallelProcessor):
Expand Down Expand Up @@ -48,3 +55,35 @@ def read_manifest(self):
def process_dataset_entry(self, data_entry):
data = {self.output_file_key: data_entry}
return [DataEntry(data=data)]


class ReadCsv(BaseProcessor):
"""
Processor for reading a CSV file and converting its content into a JSON lines format.

This class reads a CSV file using pandas, processes each row, and writes the output to a specified manifest file in JSON lines format.

Args:
header (int, optional): Row number to use as the column names. Defaults to None, meaning no header.
sep (str, optional): Delimiter to use for separating values in the CSV file. Defaults to ','.
**kwargs: Additional keyword arguments to be passed to the base class `BaseProcessor`.

Methods:
process(): Reads the input CSV file and writes its content as JSON lines to the output manifest file.
"""

def __init__(
self,
header: int = None,
sep: str = ",",
**kwargs,
):
super().__init__(**kwargs)
self.header = header
self.sep = sep

def process(self):
df1 = pd.read_csv(self.input_manifest_file, header=self.header, sep=self.sep)
with open(self.output_manifest_file, "w") as out_file:
for j, row in df1.iterrows():
out_file.write(json.dumps(dict(row), ensure_ascii=False) + "\n")
5 changes: 3 additions & 2 deletions tests/test_modify_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import pytest

from sdp.processors import ApplyInnerJoin, DropNonAlphabet
from sdp.processors import ApplyMerge, DropNonAlphabet


def _write_manifest(manifest: Path, entries: List[Dict[str, Union[str, float]]]):
Expand Down Expand Up @@ -154,11 +154,12 @@ def test_apply_inner_join(
_write_manifest(manifest1, input1)
_write_manifest(manifest2, input2)

processor = ApplyInnerJoin(
processor = ApplyMerge(
left_manifest_file=manifest1,
right_manifest_file=manifest2,
column_id=coloumn_id,
output_manifest_file=manifest_out,
how="inner",
)

processor.process()
Expand Down
Loading