Skip to content

Commit 986ce2a

Browse files
authored
Fix: Stop reapplying changes which have already been applied (#101)
Co-authored by @beaucollins This fix finds itself in the midst of a struggle to find the root cause of the so-called "ghost-writing" or "infinite-duplication" bug. Under certain relatively-rare and hard-to-reproduce circumstances we find a given change reapplied as quickly as the library can communicate with the server. This tends to destroy entities when transforming the change against itself. In this patch we're applying two techniques to prevent cycles in our change update flow: stop applying any patch whose end version is below our ghost's version; and make sure we ackwnoledge a queued change if the server sends it back as a duplicate change.. If we have a newer ghost than the change then it implies that the change has already been applied. Likewise, if we submit a patch the server has already received then we should flush any local buffers holding on to it. These two changes will prevent a subset of possible ways that we can trigger ghost-writing but it's unclear how significant that subset is relative to all cases. We know that in some cases the local client library continues to send changes to the server without updating its local `sv` and it does not appear like this change will address those cases.
1 parent 4c6d3c5 commit 986ce2a

File tree

5 files changed

+235
-3
lines changed

5 files changed

+235
-3
lines changed

RELEASE-NOTES.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# Changelog
22

3-
## 1.1.0
3+
## 1.1.0 - 1.1.1
44

55
- Increase usability in other projects by separating `node` and browser modules [#100](https://github.com/Simperium/node-simperium/pull/100)
6+
- Stop applying changes that have already been applied to an entity [#101](https://github.com/Simperium/node-simperium/pull/101)
67

78
## 1.0.4
89

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "simperium",
3-
"version": "1.1.0",
3+
"version": "1.1.1",
44
"description": "A simperium client for node.js",
55
"main": "./lib/simperium/index.js",
66
"browser": {

src/simperium/channel.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ internal.updateAcknowledged = function( change ) {
133133
};
134134

135135
internal.findAcknowledgedChange = function( change ) {
136+
if (this.localQueue.seenChanges.has(change.id)) {
137+
return this.localQueue.seenChanges.get(change.id);
138+
}
139+
136140
var possibleChange = this.localQueue.sent[change.id];
137141
if ( possibleChange ) {
138142
if ( ( change.ccids || [] ).indexOf( possibleChange.ccid ) > -1 ) {
@@ -175,6 +179,10 @@ internal.applyChange = function( change, ghost ) {
175179
}
176180

177181
if ( change.o === operation.MODIFY ) {
182+
if ( ghost && ghost.version >= change.ev ) {
183+
return;
184+
}
185+
178186
if ( ghost && ( ghost.version !== change.sv ) ) {
179187
internal.requestObjectVersion.call( this, change.id, change.sv ).then( data => {
180188
internal.applyChange.call( this, change, { version: change.sv, data } )
@@ -714,6 +722,7 @@ Queue.prototype.run = function() {
714722
function LocalQueue( store ) {
715723
this.store = store;
716724
this.sent = {};
725+
this.seenChanges = new Map();
717726
this.queues = {};
718727
this.ready = false;
719728
}
@@ -734,6 +743,7 @@ LocalQueue.prototype.pause = function() {
734743

735744
LocalQueue.prototype.acknowledge = function( change ) {
736745
if ( this.sent[change.id] === change ) {
746+
this.seenChanges.set( change.id, change );
737747
delete this.sent[change.id];
738748
}
739749

test/simperium/crossed_wires_test.js

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,227 @@ describe( 'Crossed Wires', () => {
105105
notes.map( note => note.data.content ),
106106
)
107107
} )
108+
109+
it( 'ignores ccid after receiving a 409 for it', async() => {
110+
/**
111+
* Scenario:
112+
*
113+
* Client 1 sends a change (ccid x) "AC" => "ACD" : "=2\t+D"
114+
* Client 2 sends a change (ccid y) "AC" => "ABC" : "=1\t+B\t=1"
115+
*
116+
* Server accepts ccid x as is and broadcasts back to clients
117+
*
118+
* c:{ccids:[y],v:"=1\t+B\t=1"}
119+
*
120+
* Server accepts ccid y, server sees that the change needs to be modified because of x:
121+
*
122+
* c:{ccids:[x],v:"=3\t+D"}
123+
*
124+
* Client 1 and Client 2 should now have Ghosts that match.
125+
*/
126+
127+
// Two clients that need indexes downloaded
128+
const bucketX = createBucket();
129+
bucketX.id = 'x';
130+
const bucketY = createBucket();
131+
bucketY.id = 'y';
132+
const clients = [bucketX, bucketY];
133+
134+
const responses = await Promise.all( [
135+
waitForClient( bucketX, () => bucketX.channel.handleMessage( 'auth:user' ) ),
136+
waitForClient( bucketY, () => bucketY.channel.handleMessage( 'auth:user' ) ),
137+
] );
138+
139+
deepEqual(
140+
Array( 2 ).fill( 'i:1:::10' ),
141+
responses
142+
);
143+
144+
const cvs = await Promise.all( clients.map( client => {
145+
const indexed = new Promise( resolve => {
146+
client.once( 'index', resolve );
147+
} );
148+
client.channel.handleMessage( 'i:' + JSON.stringify( {
149+
index: [{
150+
id: 'note-id',
151+
v: 1,
152+
d: { content: 'AC' }
153+
}],
154+
current: 'cv-1',
155+
} ) );
156+
return indexed;
157+
} ) );
158+
159+
deepEqual( Array( 2 ).fill( 'cv-1' ), cvs );
160+
161+
deepEqual(
162+
Array( 2 ).fill( { data: { content: 'AC' }, id: 'note-id' } ),
163+
await Promise.all( clients.map( client => client.get( 'note-id' ) ) ),
164+
);
165+
166+
const [changeY, changeX] = ( await Promise.all( [
167+
waitForClient( bucketY, () => bucketY.update( 'note-id', { content: 'ABC' } ) ),
168+
waitForClient( bucketX, () => bucketX.update( 'note-id', { content: 'ACD' } ) ),
169+
] ) ).map( msg => JSON.parse( parseMessage( msg ).data ) );
170+
171+
equal( '=1\t+B\t=1', changeY.v.content.v );
172+
equal( '=2\t+D', changeX.v.content.v );
173+
174+
/**
175+
* At this point, both clients have sent a change and are waiting for the
176+
* server to respond. Their `localQueue`s should have a `.sent['note-id']`.
177+
*
178+
* If a client were to update `note-id` at this moment, since it is waiting
179+
* for the sent change to be acknowledged by the server it will indicate
180+
* that with a `localQueue.queues['note-id']`.
181+
*/
182+
const [serverChange1] = [
183+
[ { cv: 'cv-2', ccids: [changeY.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: {
184+
o: 'd', v: '=1\t+B\t=1'
185+
} } } ],
186+
// This ccid/change is modified by the server, see: '=3\t+D' vs '=2\t+D'
187+
[ { cv: 'cv-3', ccids: [changeX.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: {
188+
o: 'd', v: '=3\t+D'
189+
} } } ],
190+
];
191+
192+
const notes = await Promise.all( [
193+
new Promise( ( resolve, reject ) => {
194+
bucketY.channel.on( 'acknowledge', () => {
195+
setTimeout(() => resolve(bucketY.get('note-id')), 10);
196+
} );
197+
198+
bucketY.channel.on( 'send', (data) => {
199+
reject(new Error( 'should not send more things' ) );
200+
} );
201+
202+
bucketY.channel.handleMessage( 'c:' + JSON.stringify([{
203+
id: 'note-id',
204+
error: 409,
205+
ccids: serverChange1[0].ccids,
206+
}] ) );
207+
208+
bucketY.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) );
209+
} ),
210+
new Promise( resolve => {
211+
bucketX.once( 'update', () => resolve( bucketX.get( 'note-id' ) ) );
212+
bucketX.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) );
213+
} )
214+
] );
215+
216+
deepEqual(
217+
[ 'ABC', 'ABCD' ],
218+
notes.map( note => note.data.content ),
219+
)
220+
} )
221+
222+
it( 'ignores inbound changes after they have already been applied', async() => {
223+
/**
224+
* Scenario:
225+
*
226+
* Client 1 sends a change (ccid x) "AC" => "ACD" : "=2\t+D"
227+
* Client 2 sends a change (ccid y) "AC" => "ABC" : "=1\t+B\t=1"
228+
*
229+
* Server accepts ccid x as is and broadcasts back to clients
230+
*
231+
* c:{ccids:[y],v:"=1\t+B\t=1"}
232+
*
233+
* Server accepts ccid y, server sees that the change needs to be modified because of x:
234+
*
235+
* c:{ccids:[x],v:"=3\t+D"}
236+
*
237+
* Client 1 and Client 2 should now have Ghosts that match.
238+
*/
239+
240+
// Two clients that need indexes downloaded
241+
const bucketX = createBucket();
242+
bucketX.id = 'x';
243+
const bucketY = createBucket();
244+
bucketY.id = 'y';
245+
const clients = [bucketX, bucketY];
246+
247+
const responses = await Promise.all( [
248+
waitForClient( bucketX, () => bucketX.channel.handleMessage( 'auth:user' ) ),
249+
waitForClient( bucketY, () => bucketY.channel.handleMessage( 'auth:user' ) ),
250+
] );
251+
252+
deepEqual(
253+
Array( 2 ).fill( 'i:1:::10' ),
254+
responses
255+
);
256+
257+
const cvs = await Promise.all( clients.map( client => {
258+
const indexed = new Promise( resolve => {
259+
client.once( 'index', resolve );
260+
} );
261+
client.channel.handleMessage( 'i:' + JSON.stringify( {
262+
index: [{
263+
id: 'note-id',
264+
v: 1,
265+
d: { content: 'AC' }
266+
}],
267+
current: 'cv-1',
268+
} ) );
269+
return indexed;
270+
} ) );
271+
272+
deepEqual( Array( 2 ).fill( 'cv-1' ), cvs );
273+
274+
deepEqual(
275+
Array( 2 ).fill( { data: { content: 'AC' }, id: 'note-id' } ),
276+
await Promise.all( clients.map( client => client.get( 'note-id' ) ) ),
277+
);
278+
279+
const [changeY, changeX] = ( await Promise.all( [
280+
waitForClient( bucketY, () => bucketY.update( 'note-id', { content: 'ABC' } ) ),
281+
waitForClient( bucketX, () => bucketX.update( 'note-id', { content: 'ACD' } ) ),
282+
] ) ).map( msg => JSON.parse( parseMessage( msg ).data ) );
283+
284+
equal( '=1\t+B\t=1', changeY.v.content.v );
285+
equal( '=2\t+D', changeX.v.content.v );
286+
287+
/**
288+
* At this point, both clients have sent a change and are waiting for the
289+
* server to respond. Their `localQueue`s should have a `.sent['note-id']`.
290+
*
291+
* If a client were to update `note-id` at this moment, since it is waiting
292+
* for the sent change to be acknowledged by the server it will indicate
293+
* that with a `localQueue.queues['note-id']`.
294+
*/
295+
const [serverChange1] = [
296+
[ { cv: 'cv-2', ccids: [changeY.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: {
297+
o: 'd', v: '=1\t+B\t=1'
298+
} } } ],
299+
// This ccid/change is modified by the server, see: '=3\t+D' vs '=2\t+D'
300+
[ { cv: 'cv-3', ccids: [changeX.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: {
301+
o: 'd', v: '=3\t+D'
302+
} } } ],
303+
];
304+
305+
const notes = await Promise.all( [
306+
new Promise( ( resolve, reject ) => {
307+
bucketY.channel.on( 'acknowledge', () => {
308+
setTimeout(() => resolve(bucketY.get('note-id')), 10);
309+
} );
310+
311+
bucketY.channel.on( 'send', (data) => {
312+
reject(new Error( 'should not send more things' ) );
313+
} );
314+
315+
bucketY.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) );
316+
bucketY.channel.handleMessage('c:' + JSON.stringify(serverChange1));
317+
} ),
318+
new Promise( resolve => {
319+
bucketX.once( 'update', () => resolve( bucketX.get( 'note-id' ) ) );
320+
bucketX.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) );
321+
} )
322+
] );
323+
324+
deepEqual(
325+
[ 'ABC', 'ABCD' ],
326+
notes.map( note => note.data.content ),
327+
)
328+
} )
108329
} );
109330

110331
function waitForClient( client, action ) {

0 commit comments

Comments
 (0)