-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest2.py
More file actions
44 lines (37 loc) · 1.62 KB
/
test2.py
File metadata and controls
44 lines (37 loc) · 1.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
spark = (
SparkSession.builder.appName("Protobuf Example")
.config("spark.jars.packages", "org.apache.spark:spark-protobuf_2.13:4.0.0")
.getOrCreate()
)
person_schema = T.StructType([
T.StructField("id", T.StringType(), False),
T.StructField("name", T.StringType(), True),
T.StructField("age", T.IntegerType(), True),
T.StructField("home", T.StructType([
T.StructField("line1", T.StringType(), True),
T.StructField("city", T.StringType(), True),
T.StructField("country", T.StringType(), True),
]), True),
T.StructField("emails", T.ArrayType(T.StringType()), True),
T.StructField("tags", T.MapType(T.StringType(), T.StringType()), True),
])
rows = [
{"id":"p1","name":"Alice","age":30,
"home":{"line1":"1 King St","city":"London","country":"UK"},
"emails":["alice@ex.com","a@work.com"], "tags":{"role":"engineer","dept":"data"}},
{"id":"p2","name":"Bob","age":28,
"home":{"line1":"5 Queen Rd","city":"Leeds","country":"UK"},
"emails":[], "tags":{}},
]
df = spark.createDataFrame(rows, person_schema)
DESC = "./person.desc" # <-- change me
STRUCT_COL = "person"
BIN_COL = "payload"
encoded = df.select(F.struct(*df.columns).alias(STRUCT_COL)) \
.select(to_protobuf(F.col(STRUCT_COL), "tutorial.Person", DESC).alias(BIN_COL))
decoded = encoded.select(from_protobuf(F.col(BIN_COL), "tutorial.Person", DESC).alias(STRUCT_COL)) \
.select(f"{STRUCT_COL}.*")
decoded.show(truncate=False)
spark.stop()