Skip to content

Commit 5f701bf

Browse files
delta003bulldozer-bot[bot]
authored andcommitted
[SPARK-26200] Fix transposed column values in pyspark.Row (apache-spark-on-k8s#462)
## Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain) https://issues.apache.org/jira/browse/SPARK-26200 ## What changes were proposed in this pull request? Row type is handled differently depending on _needSerializeAnyField value. When _needSerializeAnyField, Row is handled as tuple which leads to column values being transposed (see upstream ticket for details). ## How was this patch tested? Unit test.
1 parent b50649d commit 5f701bf

File tree

2 files changed

+28
-0
lines changed

2 files changed

+28
-0
lines changed

python/pyspark/sql/tests.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,6 +1792,31 @@ def test_struct_type(self):
17921792
self.assertRaises(IndexError, lambda: struct1[9])
17931793
self.assertRaises(TypeError, lambda: struct1[9.9])
17941794

1795+
def test_struct_type_to_internal(self):
1796+
# Verify when not needSerializeAnyField
1797+
struct = StructType().add("b", StringType()).add("a", StringType())
1798+
string_a = "value_a"
1799+
string_b = "value_b"
1800+
row = Row(a=string_a, b=string_b)
1801+
tupleResult = struct.toInternal(row)
1802+
# Reversed because of struct
1803+
self.assertEqual(tupleResult, (string_b, string_a))
1804+
1805+
# Verify when needSerializeAnyField
1806+
struct1 = StructType().add("b", TimestampType()).add("a", TimestampType())
1807+
timestamp_a = datetime.datetime(2018, 1, 1, 1, 1, 1)
1808+
timestamp_b = datetime.datetime(2019, 1, 1, 1, 1, 1)
1809+
row = Row(a=timestamp_a, b=timestamp_b)
1810+
tupleResult = struct1.toInternal(row)
1811+
# Reversed because of struct
1812+
d = 1000000
1813+
ts_b = tupleResult[0]
1814+
new_timestamp_b = datetime.datetime.fromtimestamp(ts_b // d).replace(microsecond=ts_b % d)
1815+
ts_a = tupleResult[1]
1816+
new_timestamp_a = datetime.datetime.fromtimestamp(ts_a // d).replace(microsecond=ts_a % d)
1817+
self.assertEqual(timestamp_a, new_timestamp_a)
1818+
self.assertEqual(timestamp_b, new_timestamp_b)
1819+
17951820
def test_parse_datatype_string(self):
17961821
from pyspark.sql.types import _all_atomic_types, _parse_datatype_string
17971822
for k, t in _all_atomic_types.items():

python/pyspark/sql/types.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,9 @@ def toInternal(self, obj):
599599
if isinstance(obj, dict):
600600
return tuple(f.toInternal(obj.get(n)) if c else obj.get(n)
601601
for n, f, c in zip(self.names, self.fields, self._needConversion))
602+
elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
603+
return tuple(f.toInternal(obj[n]) if c else obj[n]
604+
for n, f, c in zip(self.names, self.fields, self._needConversion))
602605
elif isinstance(obj, (tuple, list)):
603606
return tuple(f.toInternal(v) if c else v
604607
for f, v, c in zip(self.fields, obj, self._needConversion))

0 commit comments

Comments
 (0)