-
Notifications
You must be signed in to change notification settings - Fork 1.1k
main and shadow table as a Side Input in spanner-to-sourcedb #2950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @sm745052, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant refactoring to the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (63.36%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #2950 +/- ##
============================================
+ Coverage 50.26% 50.43% +0.16%
- Complexity 5024 5031 +7
============================================
Files 967 970 +3
Lines 59271 59524 +253
Branches 6459 6492 +33
============================================
+ Hits 29794 30020 +226
+ Misses 27376 27375 -1
- Partials 2101 2129 +28
🚀 New features to boost your workflow:
|
manitgupta
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It always a great practice to add details of the testing done in the PR description! ;)
Please be as elaborate as you want to.
Addressed |
| PCollectionTuple ddlTuple = | ||
| pipeline.apply( | ||
| "Process Information Schema", | ||
| new SpannerToSourceDbProcessInformationSchema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTransform names should end in *Transform to reflect that they are PTransforms. Should be named to reflect the action that the class is doing: SpannerInformationSchemaProcessorTransform.
| } | ||
|
|
||
| private static ISchemaMapper getSchemaMapper(Options options, Ddl ddl) { | ||
| public static ISchemaMapper getSchemaMapper(Options options, Ddl ddl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did the access modifier change here?
| @Teardown | ||
| public void teardown() { | ||
| if (spannerConfig != null) { | ||
| if (spannerAccessor != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was breaking earlier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was not breaking but this seems more logical
| if (this.schemaMapper == null) { | ||
| SpannerToSourceDb.Options options = | ||
| c.getPipelineOptions().as(SpannerToSourceDb.Options.class); | ||
| this.schemaMapper = SpannerToSourceDb.getSchemaMapper(options, ddl); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an anti-pattern and is very confusing.
@ProcessElementis called for every element. This code would result in a schemaMapper being initialized at the time of processing an element. Logically, schema mapping should be initialized prior to processing elements.- You should not be importing
SpannerToSourceDbhere. This looks like an inverted import.
Overall the code flow seems very odd to me. Please take a look.
| if (this.schemaMapper == null) { | ||
| SpannerToSourceDb.Options options = | ||
| c.getPipelineOptions().as(SpannerToSourceDb.Options.class); | ||
| this.schemaMapper = SpannerToSourceDb.getSchemaMapper(options, ddl); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as earlier.
| this.ddlView, | ||
| this.shadowTableDdlView)) | ||
| .withSideInputs(ddlView, shadowTableDdlView) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is ddlView and shadowTableDdlView passed both as a parameter and as a SideInput? Won't the sideInput already be available inside the DoFn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need a reference for the sideinput which we get via the constructor. LMK if i am missing something.
|
|
||
| @Override | ||
| public PCollectionTuple expand(PBegin p) { | ||
| return p.apply("Create empty", Create.of((Void) null)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a confusing name. Root transforms are generally called Pulse or something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
| .withOutputTags(MAIN_DDL_TAG, TupleTagList.of(SHADOW_TABLE_DDL_TAG))); | ||
| } | ||
|
|
||
| static class ProcessInformationSchemaFn extends DoFn<Void, Ddl> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: My personal preference is to have a DoFns which non-trivial amount of logic into their own class. It makes the PTransform using these DoFns easy to extend in the future without increasing the class length too much.
|
|
||
| @ProcessElement | ||
| public void processElement(ProcessContext c) { | ||
| LOG.info("Starting processing of Information Schema..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? Let's add logs so that they add value when something goes wrong..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
Fixes b/456048565
Added main and shadow table as a Side Input in spanner-to-sourcedb so that the job startup time is less.
Context:
when main and shadow table are given as main input, when there are large number of tables, the startup time exceeded 12 min which is the default startup timeout for a dataflow job.
Earlier:
Using 4k tables: Startup time exceeded 12 min
Now:
Tested 4k tables: Startup time was around 4 min
NOTE: