-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
118 lines (91 loc) · 3.29 KB
/
main.py
File metadata and controls
118 lines (91 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import pyspark
print(pyspark.version.__version__)
from pyspark.sql import SparkSession
from pyspark.sql.functions import struct
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
# Create SparkSession with Protobuf JAR
spark = (
SparkSession.builder.appName("Protobuf Example")
.config("spark.jars.packages", "org.apache.spark:spark-protobuf_2.13:4.0.0")
.getOrCreate()
)
person_schema = T.StructType([
T.StructField("id", T.StringType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True),
T.StructField("home", T.StructType([
T.StructField("line1", T.StringType(), True),
T.StructField("city", T.StringType(), True),
T.StructField("country", T.StringType(), True),
]), True),
T.StructField("emails", T.ArrayType(T.StringType()), True),
T.StructField("tags", T.MapType(T.StringType(), T.StringType()), True),
])
rows = [
{
"id": "p1",
"name": "Alice",
"age": 30,
"home": {"line1": "1 King St", "city": "London", "country": "UK"},
"emails": ["alice@ex.com", "a@work.com"],
"tags": {"role": "engineer", "dept": "data"}
},
{
"id": "p2",
"name": "Bob",
"age": 28,
"home": {"line1": "5 Queen Rd", "city": "Leeds", "country": "UK"},
"emails": [],
"tags": {}
},
]
df = spark.createDataFrame(rows, schema=person_schema)
# to_protobuf expects a single struct column.
# If your data has multiple columns, wrap them into a struct first:
df_struct = df.select(F.struct(*df.columns).alias("person"))
BIN_COL = "person_bin"
DESC = "./person.desc" # <-- update this path
df_encoded = df_struct.select(
to_protobuf(F.col("person"), "tutorial.Person", DESC).alias(BIN_COL)
)
df_encoded.show(truncate=False)
# The column is binary (bytes) — ready to store to Kafka/S3/etc.
df_decoded = df_encoded.select(
from_protobuf(F.col(BIN_COL), "tutorial.Person", DESC).alias("person")
)
df_roundtrip = df_decoded.select("person.*")
df_roundtrip.show(truncate=False)
df_roundtrip.printSchema()
# kdf = (spark.read
# .format("kafka")
# .option("kafka.bootstrap.servers", "broker:9092")
# .option("subscribe", "people")
# .load())
# decoded = kdf.select(
# from_protobuf(F.col("value"), "tutorial.Person", DESC).alias("person")
# ).select("person.*")
# encoded = df.select(F.struct(*df.columns).alias("person")) \
# .select(to_protobuf("person", "tutorial.Person", DESC).alias("value"))
# (encoded
# .write
# .format("kafka")
# .option("kafka.bootstrap.servers", "broker:9092")
# .option("topic", "people")
# .save())
# # Write bytes to Parquet (binary column)
# df_encoded.write.mode("overwrite").parquet("/tmp/people_protobuf")
# # Later, read and decode
# bdf = spark.read.parquet("/tmp/people_protobuf")
# decoded = bdf.select(from_protobuf(F.col(BIN_COL), "tutorial.Person", DESC).alias("person")).select("person.*")
# # 8) Options you can pass (use when needed)
# decoded = df_encoded.select(
# from_protobuf(
# F.col(BIN_COL),
# "tutorial.Person",
# DESC,
# {"ignoreUnknownFields": "true"}
# ).alias("person")
# )