Skip to content

Commit dc9c04d

Browse files
jperez999benfredAlberto Alvarezrjzamora
authored
fix arbitrary output file number bug, shrink number of files and warn… (#1301)
* fix arbitrary output file number bug, shrink number of files and warn if not enough ddf partitions * add test to verify behavior support * made changes to ci script to clean pip packages before install to make it fresh * adding user warning * add correct if condition for warning * Update nvtabular/io/dataset.py Co-authored-by: Richard (Rick) Zamora <[email protected]> * Update tests/unit/test_io.py Co-authored-by: Richard (Rick) Zamora <[email protected]> * fix issues in formatting of code Co-authored-by: Ben Frederickson <[email protected]> Co-authored-by: Alberto Alvarez <[email protected]> Co-authored-by: Richard (Rick) Zamora <[email protected]>
1 parent 76b0b5e commit dc9c04d

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

nvtabular/io/dataset.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,14 @@ def to_parquet(
861861
fns = output_files[i : i + files_per_task]
862862
start = i * split
863863
stop = min(start + split * len(fns), ddf.npartitions)
864-
new[tuple(fns)] = np.arange(start, stop)
864+
if start < stop:
865+
new[tuple(fns)] = np.arange(start, stop)
866+
# let user know they will not have expected number of output files.
867+
if len(new.keys()) < len(output_files):
868+
warnings.warn(
869+
f"Only created {len(new.keys())} files did not have enough\n"
870+
f"partitions to create {len(output_files)} files."
871+
)
865872
output_files = new
866873
suffix = "" # Don't add a suffix later - Names already include it
867874
if not isinstance(output_files, dict):

tests/unit/test_io.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,36 @@ def test_dask_dataset_itr(tmpdir, datasets, engine, gpu_memory_frac):
158158
assert len(my_iter) == size
159159

160160

161+
def test_io_partitions_push(tmpdir):
162+
os.makedirs(os.path.join(tmpdir, "csv"))
163+
164+
# Generate random csv files
165+
files = [os.path.join(tmpdir, f"csv/day_{i}") for i in range(23)]
166+
for file in files:
167+
with open(file, "w") as f:
168+
f.write("0,1,2,3,a,b,c\n" * 1000)
169+
170+
# Load csv files
171+
label_columns = ["label"]
172+
cont_columns = ["I1", "I2", "I3"]
173+
cat_columns = ["C1", "C2", "C3"]
174+
columns = label_columns + cont_columns + cat_columns
175+
dataset = nvt.Dataset(files, engine="csv", names=columns)
176+
print("npartitions of dataset:", dataset.npartitions)
177+
178+
for x in range(20):
179+
dataset.to_parquet(
180+
output_files=x,
181+
output_path=os.path.join(tmpdir, f"parquet{x}"),
182+
cats=cat_columns,
183+
conts=cont_columns,
184+
labels=label_columns,
185+
)
186+
187+
df_lib = dispatch.get_lib()
188+
df_lib.read_parquet(os.path.join(tmpdir, f"parquet{x}/part_0.parquet"))
189+
190+
161191
@pytest.mark.parametrize("engine", ["csv", "parquet", "csv-no-header"])
162192
@pytest.mark.parametrize("num_files", [1, 2])
163193
@pytest.mark.parametrize("cpu", [None, True])

0 commit comments

Comments
 (0)