From fe7dfacb4926a2767d5627c9bc0ad3022976ea42 Mon Sep 17 00:00:00 2001 From: cliff moon Date: Tue, 2 Jul 2013 10:46:42 -0700 Subject: [PATCH 1/2] Make all RegisterSet operations atomic and add a concurrency test for HLL. --- .../stream/cardinality/RegisterSet.java | 67 ++++++++++++------- .../stream/cardinality/TestHyperLogLog.java | 40 +++++++++++ 2 files changed, 83 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/RegisterSet.java b/src/main/java/com/clearspring/analytics/stream/cardinality/RegisterSet.java index 6c436a7fe..50045f501 100644 --- a/src/main/java/com/clearspring/analytics/stream/cardinality/RegisterSet.java +++ b/src/main/java/com/clearspring/analytics/stream/cardinality/RegisterSet.java @@ -16,6 +16,8 @@ package com.clearspring.analytics.stream.cardinality; +import java.util.concurrent.atomic.AtomicIntegerArray; + public class RegisterSet { public final static int LOG2_BITS_PER_WORD = 6; @@ -24,7 +26,7 @@ public class RegisterSet public final int count; public final int size; - private final int[] M; + private final AtomicIntegerArray M; public RegisterSet(int count) { @@ -40,22 +42,22 @@ public RegisterSet(int count, int[] initialValues) { if (bits == 0) { - this.M = new int[1]; + this.M = new AtomicIntegerArray(1); } else if (bits % Integer.SIZE == 0) { - this.M = new int[bits]; + this.M = new AtomicIntegerArray(bits); } else { - this.M = new int[bits + 1]; + this.M = new AtomicIntegerArray(bits + 1); } } else { - this.M = initialValues; + this.M = new AtomicIntegerArray(initialValues); } - this.size = this.M.length; + this.size = this.M.length(); } public static int getBits(int count) @@ -67,14 +69,17 @@ public void set(int position, int value) { int bucketPos = position / LOG2_BITS_PER_WORD; int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD)); - this.M[bucketPos] = (this.M[bucketPos] & ~(0x1f << shift)) | (value << shift); + int currentVal; + do { + currentVal = this.M.get(bucketPos); + } while(!this.M.compareAndSet(bucketPos, currentVal, (currentVal & ~(0x1f << shift)) | (value << shift))); } public int get(int position) { int bucketPos = position / LOG2_BITS_PER_WORD; int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD)); - return (this.M[bucketPos] & (0x1f << shift)) >>> shift; + return (this.M.get(bucketPos) & (0x1f << shift)) >>> shift; } public boolean updateIfGreater(int position, int value) @@ -84,37 +89,51 @@ public boolean updateIfGreater(int position, int value) int mask = 0x1f << shift; // Use long to avoid sign issues with the left-most shift - long curVal = this.M[bucket] & mask; + int curM; long newVal = value << shift; - if (curVal < newVal) { - this.M[bucket] = (int)((this.M[bucket] & ~mask) | newVal); - return true; - } else { - return false; + long curVal; + while(true) { + curM = this.M.get(bucket); + curVal = curM & mask; + if (curVal < newVal) { + if (this.M.compareAndSet(bucket, curM, (int)((curM & ~mask) | newVal))) { + return true; + } + } else { + return false; + } } } public void merge(RegisterSet that) { - for (int bucket = 0; bucket < M.length; bucket++) + for (int bucket = 0; bucket < M.length(); bucket++) { int word = 0; - for (int j = 0; j < LOG2_BITS_PER_WORD; j++) + int thisM; + int thatM; + do { - int mask = 0x1f << (REGISTER_SIZE * j); - - int thisVal = (this.M[bucket] & mask); - int thatVal = (that.M[bucket] & mask); - word |= (thisVal < thatVal) ? thatVal : thisVal; - } - this.M[bucket] = word; + thisM = this.M.get(bucket); + thatM = that.M.get(bucket); + for (int j = 0; j < LOG2_BITS_PER_WORD; j++) + { + int mask = 0x1f << (REGISTER_SIZE * j); + + int thisVal = (thisM & mask); + int thatVal = (thatM & mask); + word |= (thisVal < thatVal) ? thatVal : thisVal; + } + } while(!this.M.compareAndSet(bucket, thisM, word)); } } public int[] bits() { int[] copy = new int[size]; - System.arraycopy(M, 0, copy, 0, M.length); + for (int i = 0; i < size; i++) { + copy[i] = M.get(i); + } return copy; } } diff --git a/src/test/java/com/clearspring/analytics/stream/cardinality/TestHyperLogLog.java b/src/test/java/com/clearspring/analytics/stream/cardinality/TestHyperLogLog.java index c4d0d9af8..036599147 100644 --- a/src/test/java/com/clearspring/analytics/stream/cardinality/TestHyperLogLog.java +++ b/src/test/java/com/clearspring/analytics/stream/cardinality/TestHyperLogLog.java @@ -16,13 +16,21 @@ package com.clearspring.analytics.stream.cardinality; +import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; + import org.junit.Ignore; import org.junit.Test; import java.io.IOException; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -75,6 +83,38 @@ public void testHighCardinality() System.out.println(err); assertTrue(err < .1); } + + @Test + public void testConcurrentUpdate() throws Exception + { + long start = System.currentTimeMillis(); + final HyperLogLog hyperLogLog = new HyperLogLog(10); + final int size = 1000; + final int perThread = 10000; + List> futures = Lists.newArrayListWithCapacity(1000); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(8); + for (int i = 0; i < size; i++) + { + final int base = i * perThread; + futures.add(executor.submit(new Runnable() { + @Override + public void run() { + for (int n = 0; n < perThread; n++) { + hyperLogLog.offer(TestICardinality.streamElement(base + n)); + } + } + })); + } + for (Future f : futures) + { + f.get(); + } + System.out.println("concurrent time: " + (System.currentTimeMillis() - start)); + long estimate = hyperLogLog.cardinality(); + double err = Math.abs(estimate - size * perThread) / (double) (size * perThread); + System.out.println(err); + assertTrue(err < .1); + } @Test public void testHighCardinality_withDefinedRSD() From 410c134e8ec7186d02588875a4bcc97c4476be51 Mon Sep 17 00:00:00 2001 From: cliff moon Date: Tue, 2 Jul 2013 10:52:39 -0700 Subject: [PATCH 2/2] cleanup unused imports in TestHLL --- .../analytics/stream/cardinality/TestHyperLogLog.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/test/java/com/clearspring/analytics/stream/cardinality/TestHyperLogLog.java b/src/test/java/com/clearspring/analytics/stream/cardinality/TestHyperLogLog.java index 036599147..31358e662 100644 --- a/src/test/java/com/clearspring/analytics/stream/cardinality/TestHyperLogLog.java +++ b/src/test/java/com/clearspring/analytics/stream/cardinality/TestHyperLogLog.java @@ -26,12 +26,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue;