diff --git a/LICENSE-binary b/LICENSE-binary index 0c3c7aecb71ac..905cca552d479 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -462,7 +462,6 @@ org.antlr:antlr-runtime org.antlr:antlr4-runtime org.codehaus.janino:commons-compiler org.codehaus.janino:janino -org.fusesource.leveldbjni:leveldbjni-all org.jline:jline org.jpmml:pmml-model org.threeten:threeten-extra diff --git a/NOTICE-binary b/NOTICE-binary index a3f302b1cb04d..847fced211d83 100644 --- a/NOTICE-binary +++ b/NOTICE-binary @@ -77,12 +77,6 @@ OW2 Consortium (http://asm.ow2.org/) This product includes software developed by The Apache Software Foundation (http://www.apache.org/). -The binary distribution of this product bundles binaries of -org.iq80.leveldb:leveldb-api (https://github.com/dain/leveldb), which has the -following notices: -* Copyright 2011 Dain Sundstrom -* Copyright 2011 FuseSource Corp. http://fusesource.com - The binary distribution of this product bundles binaries of org.fusesource.hawtjni:hawtjni-runtime (https://github.com/fusesource/hawtjni), which has the following notices: diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index b96aca3d842f5..682d42d90189a 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -50,10 +50,6 @@ com.google.guava guava - - ${leveldbjni.group} - leveldbjni-all - com.fasterxml.jackson.core jackson-core diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java deleted file mode 100644 index 74843806b3ea0..0000000000000 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.kvstore; - -import java.io.File; -import java.io.IOException; -import java.lang.ref.Reference; -import java.lang.ref.WeakReference; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import org.fusesource.leveldbjni.JniDBFactory; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.DBIterator; -import org.iq80.leveldb.Options; -import org.iq80.leveldb.WriteBatch; - -import org.apache.spark.annotation.Private; - -/** - * Implementation of KVStore that uses LevelDB as the underlying data store. - */ -@Private -public class LevelDB implements KVStore { - - @VisibleForTesting - static final long STORE_VERSION = 1L; - - @VisibleForTesting - static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8); - - /** DB key where app metadata is stored. */ - private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8); - - /** DB key where type aliases are stored. */ - private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8); - - final AtomicReference _db; - final KVStoreSerializer serializer; - - /** - * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two - * purposes: make the keys stored on disk shorter, and spread out the keys, since class names - * will often have a long, redundant prefix (think "org.apache.spark."). - */ - private final ConcurrentMap typeAliases; - private final ConcurrentMap, LevelDBTypeInfo> types; - - /** - * Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to - * ensure that all iterators are correctly closed before LevelDB is closed. Use weak references - * to ensure that the iterator can be GCed, when it is only referenced here. - */ - private final ConcurrentLinkedQueue>> iteratorTracker; - - public LevelDB(File path) throws Exception { - this(path, new KVStoreSerializer()); - } - - public LevelDB(File path, KVStoreSerializer serializer) throws Exception { - this.serializer = serializer; - this.types = new ConcurrentHashMap<>(); - - Options options = new Options(); - options.createIfMissing(true); - this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options)); - - byte[] versionData = db().get(STORE_VERSION_KEY); - if (versionData != null) { - long version = serializer.deserializeLong(versionData); - if (version != STORE_VERSION) { - close(); - throw new UnsupportedStoreVersionException(); - } - } else { - db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION)); - } - - Map aliases; - try { - aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases; - } catch (NoSuchElementException e) { - aliases = new HashMap<>(); - } - typeAliases = new ConcurrentHashMap<>(aliases); - - iteratorTracker = new ConcurrentLinkedQueue<>(); - } - - @Override - public T getMetadata(Class klass) throws Exception { - try { - return get(METADATA_KEY, klass); - } catch (NoSuchElementException nsee) { - return null; - } - } - - @Override - public void setMetadata(Object value) throws Exception { - if (value != null) { - put(METADATA_KEY, value); - } else { - db().delete(METADATA_KEY); - } - } - - T get(byte[] key, Class klass) throws Exception { - byte[] data = db().get(key); - if (data == null) { - throw new NoSuchElementException(new String(key, UTF_8)); - } - return serializer.deserialize(data, klass); - } - - private void put(byte[] key, Object value) throws Exception { - Preconditions.checkArgument(value != null, "Null values are not allowed."); - db().put(key, serializer.serialize(value)); - } - - @Override - public T read(Class klass, Object naturalKey) throws Exception { - Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); - byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey); - return get(key, klass); - } - - @Override - public void write(Object value) throws Exception { - Preconditions.checkArgument(value != null, "Null values are not allowed."); - LevelDBTypeInfo ti = getTypeInfo(value.getClass()); - - try (WriteBatch batch = db().createWriteBatch()) { - byte[] data = serializer.serialize(value); - synchronized (ti) { - updateBatch(batch, value, data, value.getClass(), ti.naturalIndex(), ti.indices()); - db().write(batch); - } - } - } - - public void writeAll(List values) throws Exception { - Preconditions.checkArgument(values != null && !values.isEmpty(), - "Non-empty values required."); - - // Group by class, in case there are values from different classes in the values - // Typical usecase is for this to be a single class. - // A NullPointerException will be thrown if values contain null object. - for (Map.Entry, ? extends List> entry : - values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) { - - final Iterator valueIter = entry.getValue().iterator(); - final Iterator serializedValueIter; - - // Deserialize outside synchronized block - List list = new ArrayList<>(entry.getValue().size()); - for (Object value : entry.getValue()) { - list.add(serializer.serialize(value)); - } - serializedValueIter = list.iterator(); - - final Class klass = entry.getKey(); - final LevelDBTypeInfo ti = getTypeInfo(klass); - - synchronized (ti) { - final LevelDBTypeInfo.Index naturalIndex = ti.naturalIndex(); - final Collection indices = ti.indices(); - - try (WriteBatch batch = db().createWriteBatch()) { - while (valueIter.hasNext()) { - assert serializedValueIter.hasNext(); - updateBatch(batch, valueIter.next(), serializedValueIter.next(), klass, - naturalIndex, indices); - } - db().write(batch); - } - } - } - } - - private void updateBatch( - WriteBatch batch, - Object value, - byte[] data, - Class klass, - LevelDBTypeInfo.Index naturalIndex, - Collection indices) throws Exception { - Object existing; - try { - existing = get(naturalIndex.entityKey(null, value), klass); - } catch (NoSuchElementException e) { - existing = null; - } - - PrefixCache cache = new PrefixCache(value); - byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value)); - for (LevelDBTypeInfo.Index idx : indices) { - byte[] prefix = cache.getPrefix(idx); - idx.add(batch, value, existing, data, naturalKey, prefix); - } - } - - @Override - public void delete(Class type, Object naturalKey) throws Exception { - Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed."); - try (WriteBatch batch = db().createWriteBatch()) { - LevelDBTypeInfo ti = getTypeInfo(type); - byte[] key = ti.naturalIndex().start(null, naturalKey); - synchronized (ti) { - byte[] data = db().get(key); - if (data != null) { - Object existing = serializer.deserialize(data, type); - PrefixCache cache = new PrefixCache(existing); - byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing)); - for (LevelDBTypeInfo.Index idx : ti.indices()) { - idx.remove(batch, existing, keyBytes, cache.getPrefix(idx)); - } - db().write(batch); - } - } - } catch (NoSuchElementException nse) { - // Ignore. - } - } - - @Override - public KVStoreView view(Class type) throws Exception { - return new KVStoreView() { - @Override - public Iterator iterator() { - try { - LevelDBIterator it = new LevelDBIterator<>(type, LevelDB.this, this); - iteratorTracker.add(new WeakReference<>(it)); - return it; - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } - } - }; - } - - @Override - public boolean removeAllByIndexValues( - Class klass, - String index, - Collection indexValues) throws Exception { - LevelDBTypeInfo.Index naturalIndex = getTypeInfo(klass).naturalIndex(); - boolean removed = false; - KVStoreView view = view(klass).index(index); - - for (Object indexValue : indexValues) { - try (KVStoreIterator iterator = - view.first(indexValue).last(indexValue).closeableIterator()) { - while (iterator.hasNext()) { - T value = iterator.next(); - Object itemKey = naturalIndex.getValue(value); - delete(klass, itemKey); - removed = true; - } - } - } - - return removed; - } - - @Override - public long count(Class type) throws Exception { - LevelDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex(); - return idx.getCount(idx.end(null)); - } - - @Override - public long count(Class type, String index, Object indexedValue) throws Exception { - LevelDBTypeInfo.Index idx = getTypeInfo(type).index(index); - return idx.getCount(idx.end(null, indexedValue)); - } - - @Override - public void close() throws IOException { - synchronized (this._db) { - DB _db = this._db.getAndSet(null); - if (_db == null) { - return; - } - - try { - if (iteratorTracker != null) { - for (Reference> ref: iteratorTracker) { - LevelDBIterator it = ref.get(); - if (it != null) { - it.close(); - } - } - } - _db.close(); - } catch (IOException ioe) { - throw ioe; - } catch (Exception e) { - throw new IOException(e.getMessage(), e); - } - } - } - - /** - * Closes the given iterator if the DB is still open. Trying to close a JNI LevelDB handle - * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. - */ - void closeIterator(DBIterator it) throws IOException { - notifyIteratorClosed(it); - synchronized (this._db) { - DB _db = this._db.get(); - if (_db != null) { - it.close(); - } - } - } - - /** - * Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify - * iterator is closed. - */ - void notifyIteratorClosed(DBIterator dbIterator) { - iteratorTracker.removeIf(ref -> { - LevelDBIterator it = ref.get(); - return it != null && dbIterator.equals(it.internalIterator()); - }); - } - - /** Returns metadata about indices for the given type. */ - LevelDBTypeInfo getTypeInfo(Class type) throws Exception { - LevelDBTypeInfo ti = types.get(type); - if (ti == null) { - LevelDBTypeInfo tmp = new LevelDBTypeInfo(this, type, getTypeAlias(type)); - ti = types.putIfAbsent(type, tmp); - if (ti == null) { - ti = tmp; - } - } - return ti; - } - - /** - * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't - * prevent methods that retrieved the instance from using it after close, but hopefully will - * catch most cases; otherwise, we'll need some kind of locking. - */ - DB db() { - DB _db = this._db.get(); - if (_db == null) { - throw new IllegalStateException("DB is closed."); - } - return _db; - } - - private byte[] getTypeAlias(Class klass) throws Exception { - byte[] alias = typeAliases.get(klass.getName()); - if (alias == null) { - synchronized (typeAliases) { - byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8); - alias = typeAliases.putIfAbsent(klass.getName(), tmp); - if (alias == null) { - alias = tmp; - put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases)); - } - } - } - return alias; - } - - /** Needs to be public for Jackson. */ - public static class TypeAliases { - - public Map aliases; - - TypeAliases(Map aliases) { - this.aliases = aliases; - } - - TypeAliases() { - this(null); - } - - } - - private static class PrefixCache { - - private final Object entity; - private final Map prefixes; - - PrefixCache(Object entity) { - this.entity = entity; - this.prefixes = new HashMap<>(); - } - - byte[] getPrefix(LevelDBTypeInfo.Index idx) throws Exception { - byte[] prefix = null; - if (idx.isChild()) { - prefix = prefixes.get(idx.parent()); - if (prefix == null) { - prefix = idx.parent().childPrefix(idx.parent().getValue(entity)); - prefixes.put(idx.parent(), prefix); - } - } - return prefix; - } - - } - -} diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java deleted file mode 100644 index 29ed37ffa44e5..0000000000000 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.kvstore; - -import java.io.IOException; -import java.lang.ref.Cleaner; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import org.iq80.leveldb.DBIterator; - -import org.apache.spark.internal.SparkLogger; -import org.apache.spark.internal.SparkLoggerFactory; - -class LevelDBIterator implements KVStoreIterator { - - private static final Cleaner CLEANER = Cleaner.create(); - - private final LevelDB db; - private final boolean ascending; - private final DBIterator it; - private final Class type; - private final LevelDBTypeInfo ti; - private final LevelDBTypeInfo.Index index; - private final byte[] indexKeyPrefix; - private final byte[] end; - private final long max; - - private final ResourceCleaner resourceCleaner; - private final Cleaner.Cleanable cleanable; - - private boolean checkedNext; - private byte[] next; - private boolean closed; - private long count; - - LevelDBIterator(Class type, LevelDB db, KVStoreView params) throws Exception { - this.db = db; - this.ascending = params.ascending; - this.it = db.db().iterator(); - this.type = type; - this.ti = db.getTypeInfo(type); - this.index = ti.index(params.index); - this.max = params.max; - this.resourceCleaner = new ResourceCleaner(it, db); - this.cleanable = CLEANER.register(this, this.resourceCleaner); - - Preconditions.checkArgument(!index.isChild() || params.parent != null, - "Cannot iterate over child index %s without parent value.", params.index); - byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null; - - this.indexKeyPrefix = index.keyPrefix(parent); - - byte[] firstKey; - if (params.first != null) { - if (ascending) { - firstKey = index.start(parent, params.first); - } else { - firstKey = index.end(parent, params.first); - } - } else if (ascending) { - firstKey = index.keyPrefix(parent); - } else { - firstKey = index.end(parent); - } - it.seek(firstKey); - - byte[] end = null; - if (ascending) { - if (params.last != null) { - end = index.end(parent, params.last); - } else { - end = index.end(parent); - } - } else { - if (params.last != null) { - end = index.start(parent, params.last); - } - if (it.hasNext()) { - // When descending, the caller may have set up the start of iteration at a non-existent - // entry that is guaranteed to be after the desired entry. For example, if you have a - // compound key (a, b) where b is a, integer, you may seek to the end of the elements that - // have the same "a" value by specifying Integer.MAX_VALUE for "b", and that value may not - // exist in the database. So need to check here whether the next value actually belongs to - // the set being returned by the iterator before advancing. - byte[] nextKey = it.peekNext().getKey(); - if (compare(nextKey, indexKeyPrefix) <= 0) { - it.next(); - } - } - } - this.end = end; - - if (params.skip > 0) { - skip(params.skip); - } - } - - @Override - public boolean hasNext() { - if (!checkedNext && !closed) { - next = loadNext(); - checkedNext = true; - } - if (!closed && next == null) { - try { - close(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - return next != null; - } - - @Override - public T next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - checkedNext = false; - - try { - T ret; - if (index == null || index.isCopy()) { - ret = db.serializer.deserialize(next, type); - } else { - byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next); - ret = db.get(key, type); - } - next = null; - return ret; - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } - } - - @Override - public List next(int max) { - List list = new ArrayList<>(max); - while (hasNext() && list.size() < max) { - list.add(next()); - } - return list; - } - - @Override - public boolean skip(long n) { - if (closed) return false; - - long skipped = 0; - while (skipped < n) { - if (next != null) { - checkedNext = false; - next = null; - skipped++; - continue; - } - - boolean hasNext = ascending ? it.hasNext() : it.hasPrev(); - if (!hasNext) { - checkedNext = true; - return false; - } - - Map.Entry e = ascending ? it.next() : it.prev(); - if (!isEndMarker(e.getKey())) { - skipped++; - } - } - - return hasNext(); - } - - @Override - public synchronized void close() throws IOException { - db.notifyIteratorClosed(it); - if (!closed) { - try { - it.close(); - } finally { - closed = true; - next = null; - cancelResourceClean(); - } - } - } - - /** - * Prevent ResourceCleaner from trying to release resources after close. - */ - private void cancelResourceClean() { - this.resourceCleaner.setStartedToFalse(); - this.cleanable.clean(); - } - - DBIterator internalIterator() { - return it; - } - - @VisibleForTesting - ResourceCleaner getResourceCleaner() { - return resourceCleaner; - } - - private byte[] loadNext() { - if (count >= max) { - return null; - } - - while (true) { - boolean hasNext = ascending ? it.hasNext() : it.hasPrev(); - if (!hasNext) { - return null; - } - - Map.Entry nextEntry; - try { - // Avoid races if another thread is updating the DB. - nextEntry = ascending ? it.next() : it.prev(); - } catch (NoSuchElementException e) { - return null; - } - - byte[] nextKey = nextEntry.getKey(); - // Next key is not part of the index, stop. - if (!startsWith(nextKey, indexKeyPrefix)) { - return null; - } - - // If the next key is an end marker, then skip it. - if (isEndMarker(nextKey)) { - continue; - } - - // If there's a known end key and iteration has gone past it, stop. - if (end != null) { - int comp = compare(nextKey, end) * (ascending ? 1 : -1); - if (comp > 0) { - return null; - } - } - - count++; - - // Next element is part of the iteration, return it. - return nextEntry.getValue(); - } - } - - @VisibleForTesting - static boolean startsWith(byte[] key, byte[] prefix) { - if (key.length < prefix.length) { - return false; - } - - for (int i = 0; i < prefix.length; i++) { - if (key[i] != prefix[i]) { - return false; - } - } - - return true; - } - - private boolean isEndMarker(byte[] key) { - return (key.length > 2 && - key[key.length - 2] == LevelDBTypeInfo.KEY_SEPARATOR && - key[key.length - 1] == LevelDBTypeInfo.END_MARKER[0]); - } - - static int compare(byte[] a, byte[] b) { - int diff = 0; - int minLen = Math.min(a.length, b.length); - for (int i = 0; i < minLen; i++) { - diff += (a[i] - b[i]); - if (diff != 0) { - return diff; - } - } - - return a.length - b.length; - } - - static class ResourceCleaner implements Runnable { - private static final SparkLogger LOG = SparkLoggerFactory.getLogger(ResourceCleaner.class); - - private final DBIterator dbIterator; - - private final LevelDB levelDB; - - private final AtomicBoolean started = new AtomicBoolean(true); - - ResourceCleaner(DBIterator dbIterator, LevelDB levelDB) { - this.dbIterator = dbIterator; - this.levelDB = levelDB; - } - - @Override - public void run() { - if (started.compareAndSet(true, false)) { - try { - levelDB.closeIterator(dbIterator); - } catch (IOException e) { - LOG.warn("Failed to close iterator", e); - } - } - } - - void setStartedToFalse() { - started.set(false); - } - - @VisibleForTesting - boolean isCompleted() { - return !started.get(); - } - } -} diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java deleted file mode 100644 index 21a412a36f39b..0000000000000 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java +++ /dev/null @@ -1,512 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.kvstore; - -import java.lang.reflect.Array; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.google.common.base.Preconditions; -import org.iq80.leveldb.WriteBatch; - -/** - * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected - * via reflection, to make it cheaper to access it multiple times. - * - *

- * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures - * that iteration over indices is easy, and that updating values in the store is not overly - * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping - * lists of pointers, which would be more expensive to update at runtime. - *

- * - *

- * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full - * key would be the concatenation of everything up to that point in the hierarchy, with each - * component separated by a NULL byte. - *

- * - *
- * +TYPE_NAME
- *   NATURAL_INDEX
- *     +NATURAL_KEY
- *     -
- *   -NATURAL_INDEX
- *   INDEX_NAME
- *     +INDEX_VALUE
- *       +NATURAL_KEY
- *     -INDEX_VALUE
- *     .INDEX_VALUE
- *       CHILD_INDEX_NAME
- *         +CHILD_INDEX_VALUE
- *           NATURAL_KEY_OR_DATA
- *         -
- *   -INDEX_NAME
- * 
- * - *

- * Entity data (either the entity's natural key or a copy of the data) is stored in all keys - * that end with "+". A count of all objects that match a particular top-level index - * value is kept at the end marker ("-"). A count is also kept at the natural index's end - * marker, to make it easy to retrieve the number of all elements of a particular type. - *

- * - *

- * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd - * have these keys and values in the store for two instances, one with natural key "key1" and the - * other "key2", both with value "yes" for "bar": - *

- * - *
- * Foo __main__ +key1   [data for instance 1]
- * Foo __main__ +key2   [data for instance 2]
- * Foo __main__ -       [count of all Foo]
- * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
- * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
- * Foo bar +yes -       [count of all Foo with "bar=yes" ]
- * 
- * - *

- * Note that all indexed values are prepended with "+", even if the index itself does not have an - * explicit end marker. This allows for easily skipping to the end of an index by telling LevelDB - * to seek to the "phantom" end marker of the index. Throughout the code and comments, this part - * of the full LevelDB key is generally referred to as the "index value" of the entity. - *

- * - *

- * Child indices are stored after their parent index. In the example above, let's assume there is - * a child index "child", whose parent is "bar". If both instances have value "no" for this field, - * the data in the store would look something like the following: - *

- * - *
- * ...
- * Foo bar +yes -
- * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on index type]
- * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on index type]
- * ...
- * 
- */ -class LevelDBTypeInfo { - - static final byte[] END_MARKER = new byte[] { '-' }; - static final byte ENTRY_PREFIX = (byte) '+'; - static final byte KEY_SEPARATOR = 0x0; - static byte TRUE = (byte) '1'; - static byte FALSE = (byte) '0'; - - private static final byte SECONDARY_IDX_PREFIX = (byte) '.'; - private static final byte POSITIVE_MARKER = (byte) '='; - private static final byte NEGATIVE_MARKER = (byte) '*'; - private static final byte[] HEX_BYTES = new byte[] { - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' - }; - - private final LevelDB db; - private final Class type; - private final Map indices; - private final byte[] typePrefix; - - LevelDBTypeInfo(LevelDB db, Class type, byte[] alias) throws Exception { - this.db = db; - this.type = type; - this.indices = new HashMap<>(); - - KVTypeInfo ti = new KVTypeInfo(type); - - // First create the parent indices, then the child indices. - ti.indices().forEach(idx -> { - // In LevelDB, there is no parent index for the NATURAL INDEX. - if (idx.parent().isEmpty() || idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) { - indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null)); - } - }); - ti.indices().forEach(idx -> { - if (!idx.parent().isEmpty() && !idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) { - indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), - indices.get(idx.parent()))); - } - }); - - this.typePrefix = alias; - } - - Class type() { - return type; - } - - byte[] keyPrefix() { - return typePrefix; - } - - Index naturalIndex() { - return index(KVIndex.NATURAL_INDEX_NAME); - } - - Index index(String name) { - Index i = indices.get(name); - Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name, - type.getName()); - return i; - } - - Collection indices() { - return indices.values(); - } - - byte[] buildKey(byte[]... components) { - return buildKey(true, components); - } - - byte[] buildKey(boolean addTypePrefix, byte[]... components) { - int len = 0; - if (addTypePrefix) { - len += typePrefix.length + 1; - } - for (byte[] comp : components) { - len += comp.length; - } - len += components.length - 1; - - byte[] dest = new byte[len]; - int written = 0; - - if (addTypePrefix) { - System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length); - dest[typePrefix.length] = KEY_SEPARATOR; - written += typePrefix.length + 1; - } - - for (byte[] comp : components) { - System.arraycopy(comp, 0, dest, written, comp.length); - written += comp.length; - if (written < dest.length) { - dest[written] = KEY_SEPARATOR; - written++; - } - } - - return dest; - } - - /** - * Models a single index in LevelDB. See top-level class's javadoc for a description of how the - * keys are generated. - */ - class Index { - - private final boolean copy; - private final boolean isNatural; - private final byte[] name; - private final KVTypeInfo.Accessor accessor; - private final Index parent; - - private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) { - byte[] name = self.value().getBytes(UTF_8); - if (parent != null) { - byte[] child = new byte[name.length + 1]; - child[0] = SECONDARY_IDX_PREFIX; - System.arraycopy(name, 0, child, 1, name.length); - } - - this.name = name; - this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME); - this.copy = isNatural || self.copy(); - this.accessor = accessor; - this.parent = parent; - } - - boolean isCopy() { - return copy; - } - - boolean isChild() { - return parent != null; - } - - Index parent() { - return parent; - } - - /** - * Creates a key prefix for child indices of this index. This allows the prefix to be - * calculated only once, avoiding redundant work when multiple child indices of the - * same parent index exist. - */ - byte[] childPrefix(Object value) { - Preconditions.checkState(parent == null, "Not a parent index."); - return buildKey(name, toParentKey(value)); - } - - /** - * Gets the index value for a particular entity (which is the value of the field or method - * tagged with the index annotation). This is used as part of the LevelDB key where the - * entity (or its id) is stored. - */ - Object getValue(Object entity) throws Exception { - return accessor.get(entity); - } - - private void checkParent(byte[] prefix) { - if (prefix != null) { - Preconditions.checkState(parent != null, "Parent prefix provided for parent index."); - } else { - Preconditions.checkState(parent == null, "Parent prefix missing for child index."); - } - } - - /** The prefix for all keys that belong to this index. */ - byte[] keyPrefix(byte[] prefix) { - checkParent(prefix); - return (parent != null) ? buildKey(false, prefix, name) : buildKey(name); - } - - /** - * The key where to start ascending iteration for entities whose value for the indexed field - * match the given value. - */ - byte[] start(byte[] prefix, Object value) { - checkParent(prefix); - return (parent != null) ? buildKey(false, prefix, name, toKey(value)) - : buildKey(name, toKey(value)); - } - - /** The key for the index's end marker. */ - byte[] end(byte[] prefix) { - checkParent(prefix); - return (parent != null) ? buildKey(false, prefix, name, END_MARKER) - : buildKey(name, END_MARKER); - } - - /** The key for the end marker for entries with the given value. */ - byte[] end(byte[] prefix, Object value) { - checkParent(prefix); - return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER) - : buildKey(name, toKey(value), END_MARKER); - } - - /** The full key in the index that identifies the given entity. */ - byte[] entityKey(byte[] prefix, Object entity) throws Exception { - Object indexValue = getValue(entity); - Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.", - name, type.getName()); - byte[] entityKey = start(prefix, indexValue); - if (!isNatural) { - entityKey = buildKey(false, entityKey, toKey(naturalIndex().getValue(entity))); - } - return entityKey; - } - - private void updateCount(WriteBatch batch, byte[] key, long delta) { - long updated = getCount(key) + delta; - if (updated > 0) { - batch.put(key, db.serializer.serialize(updated)); - } else { - batch.delete(key); - } - } - - private void addOrRemove( - WriteBatch batch, - Object entity, - Object existing, - byte[] data, - byte[] naturalKey, - byte[] prefix) throws Exception { - Object indexValue = getValue(entity); - Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.", - name, type.getName()); - - byte[] entityKey = start(prefix, indexValue); - if (!isNatural) { - entityKey = buildKey(false, entityKey, naturalKey); - } - - boolean needCountUpdate = (existing == null); - - // Check whether there's a need to update the index. The index needs to be updated in two - // cases: - // - // - There is no existing value for the entity, so a new index value will be added. - // - If there is a previously stored value for the entity, and the index value for the - // current index does not match the new value, the old entry needs to be deleted and - // the new one added. - // - // Natural indices don't need to be checked, because by definition both old and new entities - // will have the same key. The put() call is all that's needed in that case. - // - // Also check whether we need to update the counts. If the indexed value is changing, we - // need to decrement the count at the old index value, and the new indexed value count needs - // to be incremented. - if (existing != null && !isNatural) { - byte[] oldPrefix = null; - Object oldIndexedValue = getValue(existing); - boolean removeExisting = !indexValue.equals(oldIndexedValue); - if (!removeExisting && isChild()) { - oldPrefix = parent().childPrefix(parent().getValue(existing)); - removeExisting = LevelDBIterator.compare(prefix, oldPrefix) != 0; - } - - if (removeExisting) { - if (oldPrefix == null && isChild()) { - oldPrefix = parent().childPrefix(parent().getValue(existing)); - } - - byte[] oldKey = entityKey(oldPrefix, existing); - batch.delete(oldKey); - - // If the indexed value has changed, we need to update the counts at the old and new - // end markers for the indexed value. - if (!isChild()) { - byte[] oldCountKey = end(null, oldIndexedValue); - updateCount(batch, oldCountKey, -1L); - needCountUpdate = true; - } - } - } - - if (data != null) { - byte[] stored = copy ? data : naturalKey; - batch.put(entityKey, stored); - } else { - batch.delete(entityKey); - } - - if (needCountUpdate && !isChild()) { - long delta = data != null ? 1L : -1L; - byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue); - updateCount(batch, countKey, delta); - } - } - - /** - * Add an entry to the index. - * - * @param batch Write batch with other related changes. - * @param entity The entity being added to the index. - * @param existing The entity being replaced in the index, or null. - * @param data Serialized entity to store (when storing the entity, not a reference). - * @param naturalKey The value's natural key (to avoid re-computing it for every index). - * @param prefix The parent index prefix, if this is a child index. - */ - void add( - WriteBatch batch, - Object entity, - Object existing, - byte[] data, - byte[] naturalKey, - byte[] prefix) throws Exception { - addOrRemove(batch, entity, existing, data, naturalKey, prefix); - } - - /** - * Remove a value from the index. - * - * @param batch Write batch with other related changes. - * @param entity The entity being removed, to identify the index entry to modify. - * @param naturalKey The value's natural key (to avoid re-computing it for every index). - * @param prefix The parent index prefix, if this is a child index. - */ - void remove( - WriteBatch batch, - Object entity, - byte[] naturalKey, - byte[] prefix) throws Exception { - addOrRemove(batch, entity, null, null, naturalKey, prefix); - } - - long getCount(byte[] key) { - byte[] data = db.db().get(key); - return data != null ? db.serializer.deserializeLong(data) : 0; - } - - byte[] toParentKey(Object value) { - return toKey(value, SECONDARY_IDX_PREFIX); - } - - byte[] toKey(Object value) { - return toKey(value, ENTRY_PREFIX); - } - - /** - * Translates a value to be used as part of the store key. - * - * Integral numbers are encoded as a string in a way that preserves lexicographical - * ordering. The string is prepended with a marker telling whether the number is negative - * or positive ("*" for negative and "=" for positive are used since "-" and "+" have the - * opposite of the desired order), and then the number is encoded into a hex string (so - * it occupies twice the number of bytes as the original type). - * - * Arrays are encoded by encoding each element separately, separated by KEY_SEPARATOR. - */ - byte[] toKey(Object value, byte prefix) { - final byte[] result; - - if (value instanceof String str) { - byte[] bytes = str.getBytes(UTF_8); - result = new byte[bytes.length + 1]; - result[0] = prefix; - System.arraycopy(bytes, 0, result, 1, bytes.length); - } else if (value instanceof Boolean bool) { - result = new byte[] { prefix, bool ? TRUE : FALSE }; - } else if (value.getClass().isArray()) { - int length = Array.getLength(value); - byte[][] components = new byte[length][]; - for (int i = 0; i < length; i++) { - components[i] = toKey(Array.get(value, i)); - } - result = buildKey(false, components); - } else { - int bytes; - - if (value instanceof Integer) { - bytes = Integer.SIZE; - } else if (value instanceof Long) { - bytes = Long.SIZE; - } else if (value instanceof Short) { - bytes = Short.SIZE; - } else if (value instanceof Byte) { - bytes = Byte.SIZE; - } else { - throw new IllegalArgumentException(String.format("Type %s not allowed as key.", - value.getClass().getName())); - } - - bytes = bytes / Byte.SIZE; - - byte[] key = new byte[bytes * 2 + 2]; - long longValue = ((Number) value).longValue(); - key[0] = prefix; - key[1] = longValue >= 0 ? POSITIVE_MARKER : NEGATIVE_MARKER; - - for (int i = 0; i < key.length - 2; i++) { - int masked = (int) ((longValue >>> (4 * i)) & 0xF); - key[key.length - i - 1] = HEX_BYTES[masked]; - } - - result = key; - } - - return result; - } - - } - -} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java deleted file mode 100644 index ff6db8fc34c96..0000000000000 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBBenchmark.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.kvstore; - -import java.io.File; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Slf4jReporter; -import com.codahale.metrics.Snapshot; -import com.codahale.metrics.Timer; -import org.apache.commons.io.FileUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -// checkstyle.off: RegexpSinglelineJava -import org.slf4j.LoggerFactory; -// checkstyle.on: RegexpSinglelineJava -import static org.junit.jupiter.api.Assertions.*; - -/** - * A set of small benchmarks for the LevelDB implementation. - * - * The benchmarks are run over two different types (one with just a natural index, and one - * with a ref index), over a set of 2^20 elements, and the following tests are performed: - * - * - write (then update) elements in sequential natural key order - * - write (then update) elements in random natural key order - * - iterate over natural index, ascending and descending - * - iterate over ref index, ascending and descending - */ -@Disabled -public class LevelDBBenchmark { - - private static final int COUNT = 1024; - private static final AtomicInteger IDGEN = new AtomicInteger(); - private static final MetricRegistry metrics = new MetricRegistry(); - private static final Timer dbCreation = metrics.timer("dbCreation"); - private static final Timer dbClose = metrics.timer("dbClose"); - - private LevelDB db; - private File dbpath; - - @BeforeEach - public void setup() throws Exception { - dbpath = File.createTempFile("test.", ".ldb"); - dbpath.delete(); - try(Timer.Context ctx = dbCreation.time()) { - db = new LevelDB(dbpath); - } - } - - @AfterEach - public void cleanup() throws Exception { - if (db != null) { - try(Timer.Context ctx = dbClose.time()) { - db.close(); - } - } - if (dbpath != null) { - FileUtils.deleteQuietly(dbpath); - } - } - - @AfterAll - public static void report() { - if (metrics.getTimers().isEmpty()) { - return; - } - - int headingPrefix = 0; - for (Map.Entry e : metrics.getTimers().entrySet()) { - headingPrefix = Math.max(e.getKey().length(), headingPrefix); - } - headingPrefix += 4; - - StringBuilder heading = new StringBuilder(); - for (int i = 0; i < headingPrefix; i++) { - heading.append(" "); - } - heading.append("\tcount"); - heading.append("\tmean"); - heading.append("\tmin"); - heading.append("\tmax"); - heading.append("\t95th"); - System.out.println(heading); - - for (Map.Entry e : metrics.getTimers().entrySet()) { - StringBuilder row = new StringBuilder(); - row.append(e.getKey()); - for (int i = 0; i < headingPrefix - e.getKey().length(); i++) { - row.append(" "); - } - - Snapshot s = e.getValue().getSnapshot(); - row.append("\t").append(e.getValue().getCount()); - row.append("\t").append(toMs(s.getMean())); - row.append("\t").append(toMs(s.getMin())); - row.append("\t").append(toMs(s.getMax())); - row.append("\t").append(toMs(s.get95thPercentile())); - - System.out.println(row); - } - - Slf4jReporter.forRegistry(metrics).outputTo(LoggerFactory.getLogger(LevelDBBenchmark.class)) - .build().report(); - } - - private static String toMs(double nanos) { - return String.format("%.3f", nanos / 1000 / 1000); - } - - @Test - public void sequentialWritesNoIndex() throws Exception { - List entries = createSimpleType(); - writeAll(entries, "sequentialWritesNoIndex"); - writeAll(entries, "sequentialUpdatesNoIndex"); - deleteNoIndex(entries, "sequentialDeleteNoIndex"); - } - - @Test - public void randomWritesNoIndex() throws Exception { - List entries = createSimpleType(); - - Collections.shuffle(entries); - writeAll(entries, "randomWritesNoIndex"); - - Collections.shuffle(entries); - writeAll(entries, "randomUpdatesNoIndex"); - - Collections.shuffle(entries); - deleteNoIndex(entries, "randomDeletesNoIndex"); - } - - @Test - public void sequentialWritesIndexedType() throws Exception { - List entries = createIndexedType(); - writeAll(entries, "sequentialWritesIndexed"); - writeAll(entries, "sequentialUpdatesIndexed"); - deleteIndexed(entries, "sequentialDeleteIndexed"); - } - - @Test - public void randomWritesIndexedTypeAndIteration() throws Exception { - List entries = createIndexedType(); - - Collections.shuffle(entries); - writeAll(entries, "randomWritesIndexed"); - - Collections.shuffle(entries); - writeAll(entries, "randomUpdatesIndexed"); - - // Run iteration benchmarks here since we've gone through the trouble of writing all - // the data already. - KVStoreView view = db.view(IndexedType.class); - iterate(view, "naturalIndex"); - iterate(view.reverse(), "naturalIndexDescending"); - iterate(view.index("name"), "refIndex"); - iterate(view.index("name").reverse(), "refIndexDescending"); - - Collections.shuffle(entries); - deleteIndexed(entries, "randomDeleteIndexed"); - } - - private void iterate(KVStoreView view, String name) throws Exception { - Timer create = metrics.timer(name + "CreateIterator"); - Timer iter = metrics.timer(name + "Iteration"); - KVStoreIterator it = null; - { - // Create the iterator several times, just to have multiple data points. - for (int i = 0; i < 1024; i++) { - if (it != null) { - it.close(); - } - try(Timer.Context ctx = create.time()) { - it = view.closeableIterator(); - } - } - } - - try { - while (it.hasNext()) { - try (Timer.Context ctx = iter.time()) { - it.next(); - } - } - } finally { - if (it != null) { - it.close(); - } - } - } - - private void writeAll(List entries, String timerName) throws Exception { - Timer timer = newTimer(timerName); - for (Object o : entries) { - try(Timer.Context ctx = timer.time()) { - db.write(o); - } - } - } - - private void deleteNoIndex(List entries, String timerName) throws Exception { - Timer delete = newTimer(timerName); - for (SimpleType i : entries) { - try(Timer.Context ctx = delete.time()) { - db.delete(i.getClass(), i.key); - } - } - } - - private void deleteIndexed(List entries, String timerName) throws Exception { - Timer delete = newTimer(timerName); - for (IndexedType i : entries) { - try(Timer.Context ctx = delete.time()) { - db.delete(i.getClass(), i.key); - } - } - } - - private List createSimpleType() { - List entries = new ArrayList<>(); - for (int i = 0; i < COUNT; i++) { - SimpleType t = new SimpleType(); - t.key = IDGEN.getAndIncrement(); - t.name = "name" + (t.key % 1024); - entries.add(t); - } - return entries; - } - - private List createIndexedType() { - List entries = new ArrayList<>(); - for (int i = 0; i < COUNT; i++) { - IndexedType t = new IndexedType(); - t.key = IDGEN.getAndIncrement(); - t.name = "name" + (t.key % 1024); - entries.add(t); - } - return entries; - } - - private Timer newTimer(String name) { - assertNull(metrics.getTimers().get(name), "Timer already exists: " + name); - return metrics.timer(name); - } - - public static class SimpleType { - - @KVIndex - public int key; - - public String name; - - } - - public static class IndexedType { - - @KVIndex - public int key; - - @KVIndex("name") - public String name; - - } - -} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java deleted file mode 100644 index 6ff6286654450..0000000000000 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBIteratorSuite.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.kvstore; - -import java.io.File; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.SystemUtils; -import org.junit.jupiter.api.AfterAll; -import static org.junit.jupiter.api.Assumptions.assumeFalse; - -public class LevelDBIteratorSuite extends DBIteratorSuite { - - private static File dbpath; - private static LevelDB db; - - @AfterAll - public static void cleanup() throws Exception { - if (db != null) { - db.close(); - } - if (dbpath != null) { - FileUtils.deleteQuietly(dbpath); - } - } - - @Override - protected KVStore createStore() throws Exception { - assumeFalse(SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64")); - dbpath = File.createTempFile("test.", ".ldb"); - dbpath.delete(); - db = new LevelDB(dbpath); - return db; - } - -} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java deleted file mode 100644 index 040ccce70b5a1..0000000000000 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ /dev/null @@ -1,492 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.kvstore; - -import java.io.File; -import java.lang.ref.Reference; -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Spliterators; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import com.google.common.collect.ImmutableSet; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.SystemUtils; -import org.iq80.leveldb.DBIterator; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.*; -import static org.junit.jupiter.api.Assumptions.assumeFalse; - -public class LevelDBSuite { - - private LevelDB db; - private File dbpath; - - @AfterEach - public void cleanup() throws Exception { - if (db != null) { - db.close(); - } - if (dbpath != null) { - FileUtils.deleteQuietly(dbpath); - } - } - - @BeforeEach - public void setup() throws Exception { - assumeFalse(SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64")); - dbpath = File.createTempFile("test.", ".ldb"); - dbpath.delete(); - db = new LevelDB(dbpath); - } - - @Test - public void testReopenAndVersionCheckDb() throws Exception { - db.close(); - db = null; - assertTrue(dbpath.exists()); - - db = new LevelDB(dbpath); - assertEquals(LevelDB.STORE_VERSION, - db.serializer.deserializeLong(db.db().get(LevelDB.STORE_VERSION_KEY))); - db.db().put(LevelDB.STORE_VERSION_KEY, db.serializer.serialize(LevelDB.STORE_VERSION + 1)); - db.close(); - db = null; - - assertThrows(UnsupportedStoreVersionException.class, () -> db = new LevelDB(dbpath)); - } - - @Test - public void testObjectWriteReadDelete() throws Exception { - CustomType1 t = createCustomType1(1); - - assertThrows(NoSuchElementException.class, () -> db.read(CustomType1.class, t.key)); - - db.write(t); - assertEquals(t, db.read(t.getClass(), t.key)); - assertEquals(1L, db.count(t.getClass())); - - db.delete(t.getClass(), t.key); - assertThrows(NoSuchElementException.class, () -> db.read(t.getClass(), t.key)); - - // Look into the actual DB and make sure that all the keys related to the type have been - // removed. - assertEquals(0, countKeys(t.getClass())); - } - - @Test - public void testMultipleObjectWriteReadDelete() throws Exception { - CustomType1 t1 = createCustomType1(1); - CustomType1 t2 = createCustomType1(2); - t2.id = t1.id; - - db.write(t1); - db.write(t2); - - assertEquals(t1, db.read(t1.getClass(), t1.key)); - assertEquals(t2, db.read(t2.getClass(), t2.key)); - assertEquals(2L, db.count(t1.getClass())); - - // There should be one "id" index entry with two values. - assertEquals(2, db.count(t1.getClass(), "id", t1.id)); - - // Delete the first entry; now there should be 3 remaining keys, since one of the "name" - // index entries should have been removed. - db.delete(t1.getClass(), t1.key); - - // Make sure there's a single entry in the "id" index now. - assertEquals(1, db.count(t2.getClass(), "id", t2.id)); - - // Delete the remaining entry, make sure all data is gone. - db.delete(t2.getClass(), t2.key); - assertEquals(0, countKeys(t2.getClass())); - } - - @Test - public void testMultipleTypesWriteReadDelete() throws Exception { - CustomType1 t1 = createCustomType1(1); - - IntKeyType t2 = new IntKeyType(); - t2.key = 2; - t2.id = "2"; - t2.values = Arrays.asList("value1", "value2"); - - ArrayKeyIndexType t3 = new ArrayKeyIndexType(); - t3.key = new int[] { 42, 84 }; - t3.id = new String[] { "id1", "id2" }; - - db.write(t1); - db.write(t2); - db.write(t3); - - assertEquals(t1, db.read(t1.getClass(), t1.key)); - assertEquals(t2, db.read(t2.getClass(), t2.key)); - assertEquals(t3, db.read(t3.getClass(), t3.key)); - - // There should be one "id" index with a single entry for each type. - assertEquals(1, db.count(t1.getClass(), "id", t1.id)); - assertEquals(1, db.count(t2.getClass(), "id", t2.id)); - assertEquals(1, db.count(t3.getClass(), "id", t3.id)); - - // Delete the first entry; this should not affect the entries for the second type. - db.delete(t1.getClass(), t1.key); - assertEquals(0, countKeys(t1.getClass())); - assertEquals(1, db.count(t2.getClass(), "id", t2.id)); - assertEquals(1, db.count(t3.getClass(), "id", t3.id)); - - // Delete the remaining entries, make sure all data is gone. - db.delete(t2.getClass(), t2.key); - assertEquals(0, countKeys(t2.getClass())); - - db.delete(t3.getClass(), t3.key); - assertEquals(0, countKeys(t3.getClass())); - } - - @Test - public void testMetadata() throws Exception { - assertNull(db.getMetadata(CustomType1.class)); - - CustomType1 t = createCustomType1(1); - - db.setMetadata(t); - assertEquals(t, db.getMetadata(CustomType1.class)); - - db.setMetadata(null); - assertNull(db.getMetadata(CustomType1.class)); - } - - @Test - public void testUpdate() throws Exception { - CustomType1 t = createCustomType1(1); - - db.write(t); - - t.name = "anotherName"; - - db.write(t); - - assertEquals(1, db.count(t.getClass())); - assertEquals(1, db.count(t.getClass(), "name", "anotherName")); - assertEquals(0, db.count(t.getClass(), "name", "name")); - } - - @Test - public void testRemoveAll() throws Exception { - for (int i = 0; i < 2; i++) { - for (int j = 0; j < 2; j++) { - ArrayKeyIndexType o = new ArrayKeyIndexType(); - o.key = new int[] { i, j, 0 }; - o.id = new String[] { "things" }; - db.write(o); - - o = new ArrayKeyIndexType(); - o.key = new int[] { i, j, 1 }; - o.id = new String[] { "more things" }; - db.write(o); - } - } - - ArrayKeyIndexType o = new ArrayKeyIndexType(); - o.key = new int[] { 2, 2, 2 }; - o.id = new String[] { "things" }; - db.write(o); - - assertEquals(9, db.count(ArrayKeyIndexType.class)); - - db.removeAllByIndexValues( - ArrayKeyIndexType.class, - KVIndex.NATURAL_INDEX_NAME, - ImmutableSet.of(new int[] {0, 0, 0}, new int[] { 2, 2, 2 })); - assertEquals(7, db.count(ArrayKeyIndexType.class)); - - db.removeAllByIndexValues( - ArrayKeyIndexType.class, - "id", - ImmutableSet.of(new String[] { "things" })); - assertEquals(4, db.count(ArrayKeyIndexType.class)); - - db.removeAllByIndexValues( - ArrayKeyIndexType.class, - "id", - ImmutableSet.of(new String[] { "more things" })); - assertEquals(0, db.count(ArrayKeyIndexType.class)); - } - - @Test - public void testSkip() throws Exception { - for (int i = 0; i < 10; i++) { - db.write(createCustomType1(i)); - } - - try (KVStoreIterator it = db.view(CustomType1.class).closeableIterator()) { - assertTrue(it.hasNext()); - assertTrue(it.skip(5)); - assertEquals("key5", it.next().key); - assertTrue(it.skip(3)); - assertEquals("key9", it.next().key); - assertFalse(it.hasNext()); - } - } - - @Test - public void testNegativeIndexValues() throws Exception { - List expected = Arrays.asList(-100, -50, 0, 50, 100); - - expected.forEach(i -> { - try { - db.write(createCustomType1(i)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - - try (KVStoreIterator iterator = - db.view(CustomType1.class).index("int").closeableIterator()) { - List results = StreamSupport - .stream(Spliterators.spliteratorUnknownSize(iterator, 0), false) - .map(e -> e.num) - .collect(Collectors.toList()); - - assertEquals(expected, results); - } - } - - @Test - public void testCloseLevelDBIterator() throws Exception { - // SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators - // are closed. And files opened by iterators are also closed. - File dbPathForCloseTest = File - .createTempFile( - "test_db_close.", - ".ldb"); - dbPathForCloseTest.delete(); - LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest); - for (int i = 0; i < 8192; i++) { - dbForCloseTest.write(createCustomType1(i)); - } - String key = dbForCloseTest - .view(CustomType1.class).iterator().next().key; - assertEquals("key0", key); - Iterator it0 = dbForCloseTest - .view(CustomType1.class).max(1).iterator(); - while (it0.hasNext()) { - it0.next(); - } - System.gc(); - Iterator it1 = dbForCloseTest - .view(CustomType1.class).iterator(); - assertEquals("key0", it1.next().key); - try (KVStoreIterator it2 = dbForCloseTest - .view(CustomType1.class).closeableIterator()) { - assertEquals("key0", it2.next().key); - } - dbForCloseTest.close(); - assertTrue(dbPathForCloseTest.exists()); - FileUtils.deleteQuietly(dbPathForCloseTest); - assertTrue(!dbPathForCloseTest.exists()); - } - - @Test - public void testHasNextAfterIteratorClose() throws Exception { - db.write(createCustomType1(0)); - KVStoreIterator iter = - db.view(CustomType1.class).closeableIterator(); - // iter should be true - assertTrue(iter.hasNext()); - // close iter - iter.close(); - // iter.hasNext should be false after iter close - assertFalse(iter.hasNext()); - } - - @Test - public void testHasNextAfterDBClose() throws Exception { - db.write(createCustomType1(0)); - KVStoreIterator iter = - db.view(CustomType1.class).closeableIterator(); - // iter should be true - assertTrue(iter.hasNext()); - // close db - db.close(); - // iter.hasNext should be false after db close - assertFalse(iter.hasNext()); - } - - @Test - public void testNextAfterIteratorClose() throws Exception { - db.write(createCustomType1(0)); - KVStoreIterator iter = - db.view(CustomType1.class).closeableIterator(); - // iter should be true - assertTrue(iter.hasNext()); - // close iter - iter.close(); - // iter.next should throw NoSuchElementException after iter close - assertThrows(NoSuchElementException.class, iter::next); - } - - @Test - public void testNextAfterDBClose() throws Exception { - db.write(createCustomType1(0)); - KVStoreIterator iter = - db.view(CustomType1.class).closeableIterator(); - // iter should be true - assertTrue(iter.hasNext()); - // close db - iter.close(); - // iter.next should throw NoSuchElementException after db close - assertThrows(NoSuchElementException.class, iter::next); - } - - @Test - public void testSkipAfterIteratorClose() throws Exception { - db.write(createCustomType1(0)); - KVStoreIterator iter = - db.view(CustomType1.class).closeableIterator(); - // close iter - iter.close(); - // skip should always return false after iter close - assertFalse(iter.skip(0)); - assertFalse(iter.skip(1)); - } - - @Test - public void testSkipAfterDBClose() throws Exception { - db.write(createCustomType1(0)); - KVStoreIterator iter = - db.view(CustomType1.class).closeableIterator(); - // iter should be true - assertTrue(iter.hasNext()); - // close db - db.close(); - // skip should always return false after db close - assertFalse(iter.skip(0)); - assertFalse(iter.skip(1)); - } - - @Test - public void testResourceCleaner() throws Exception { - File dbPathForCleanerTest = File.createTempFile( - "test_db_cleaner.", ".rdb"); - dbPathForCleanerTest.delete(); - - LevelDB dbForCleanerTest = new LevelDB(dbPathForCleanerTest); - try { - for (int i = 0; i < 8192; i++) { - dbForCleanerTest.write(createCustomType1(i)); - } - LevelDBIterator levelDBIterator = - (LevelDBIterator) dbForCleanerTest.view(CustomType1.class).iterator(); - Reference> reference = new WeakReference<>(levelDBIterator); - assertNotNull(reference); - LevelDBIterator.ResourceCleaner resourceCleaner = levelDBIterator.getResourceCleaner(); - assertFalse(resourceCleaner.isCompleted()); - // Manually set levelDBIterator to null, to be GC. - levelDBIterator = null; - // 100 times gc, the levelDBIterator should be GCed. - int count = 0; - while (count < 100 && !reference.refersTo(null)) { - System.gc(); - count++; - Thread.sleep(100); - } - // check rocksDBIterator should be GCed - assertTrue(reference.refersTo(null)); - // Verify that the Cleaner will be executed after a period of time, isAllocated is true. - assertTrue(resourceCleaner.isCompleted()); - } finally { - dbForCleanerTest.close(); - FileUtils.deleteQuietly(dbPathForCleanerTest); - } - } - - @Test - public void testMultipleTypesWriteAll() throws Exception { - - List type1List = Arrays.asList( - createCustomType1(1), - createCustomType1(2), - createCustomType1(3), - createCustomType1(4) - ); - - List type2List = Arrays.asList( - createCustomType2(10), - createCustomType2(11), - createCustomType2(12), - createCustomType2(13) - ); - - List fullList = new ArrayList(); - fullList.addAll(type1List); - fullList.addAll(type2List); - - db.writeAll(fullList); - for (CustomType1 value : type1List) { - assertEquals(value, db.read(value.getClass(), value.key)); - } - for (CustomType2 value : type2List) { - assertEquals(value, db.read(value.getClass(), value.key)); - } - } - - - private CustomType1 createCustomType1(int i) { - CustomType1 t = new CustomType1(); - t.key = "key" + i; - t.id = "id" + i; - t.name = "name" + i; - t.num = i; - t.child = "child" + i; - return t; - } - - private CustomType2 createCustomType2(int i) { - CustomType2 t = new CustomType2(); - t.key = "key" + i; - t.id = "id" + i; - t.parentId = "parent_id" + (i / 2); - return t; - } - - private int countKeys(Class type) throws Exception { - byte[] prefix = db.getTypeInfo(type).keyPrefix(); - int count = 0; - - try (DBIterator it = db.db().iterator()) { - it.seek(prefix); - - while (it.hasNext()) { - byte[] key = it.next().getKey(); - if (LevelDBIterator.startsWith(key, prefix)) { - count++; - } - } - } - - return count; - } -} diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBTypeInfoSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBTypeInfoSuite.java deleted file mode 100644 index bc1b039316912..0000000000000 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBTypeInfoSuite.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.kvstore; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; - -public class LevelDBTypeInfoSuite { - - @Test - public void testIndexAnnotation() throws Exception { - KVTypeInfo ti = new KVTypeInfo(CustomType1.class); - assertEquals(5, ti.indices().count()); - - CustomType1 t1 = new CustomType1(); - t1.key = "key"; - t1.id = "id"; - t1.name = "name"; - t1.num = 42; - t1.child = "child"; - - assertEquals(t1.key, ti.getIndexValue(KVIndex.NATURAL_INDEX_NAME, t1)); - assertEquals(t1.id, ti.getIndexValue("id", t1)); - assertEquals(t1.name, ti.getIndexValue("name", t1)); - assertEquals(t1.num, ti.getIndexValue("int", t1)); - assertEquals(t1.child, ti.getIndexValue("child", t1)); - } - - @Test - public void testNoNaturalIndex() { - assertThrows(IllegalArgumentException.class, - () -> newTypeInfo(NoNaturalIndex.class)); - } - - @Test - public void testNoNaturalIndex2() { - assertThrows(IllegalArgumentException.class, - () -> newTypeInfo(NoNaturalIndex2.class)); - } - - @Test - public void testDuplicateIndex() { - assertThrows(IllegalArgumentException.class, - () -> newTypeInfo(DuplicateIndex.class)); - } - - @Test - public void testEmptyIndexName() { - assertThrows(IllegalArgumentException.class, - () -> newTypeInfo(EmptyIndexName.class)); - } - - @Test - public void testIllegalIndexName() { - assertThrows(IllegalArgumentException.class, - () -> newTypeInfo(IllegalIndexName.class)); - } - - @Test - public void testIllegalIndexMethod() { - assertThrows(IllegalArgumentException.class, - () -> newTypeInfo(IllegalIndexMethod.class)); - } - - @Test - public void testKeyClashes() throws Exception { - LevelDBTypeInfo ti = newTypeInfo(CustomType1.class); - - CustomType1 t1 = new CustomType1(); - t1.key = "key1"; - t1.name = "a"; - - CustomType1 t2 = new CustomType1(); - t2.key = "key2"; - t2.name = "aa"; - - CustomType1 t3 = new CustomType1(); - t3.key = "key3"; - t3.name = "aaa"; - - // Make sure entries with conflicting names are sorted correctly. - assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t2)); - assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t3)); - assertBefore(ti.index("name").entityKey(null, t2), ti.index("name").entityKey(null, t3)); - } - - @Test - public void testNumEncoding() throws Exception { - LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next(); - - assertEquals("+=00000001", new String(idx.toKey(1), UTF_8)); - assertEquals("+=00000010", new String(idx.toKey(16), UTF_8)); - assertEquals("+=7fffffff", new String(idx.toKey(Integer.MAX_VALUE), UTF_8)); - - assertBefore(idx.toKey(1), idx.toKey(2)); - assertBefore(idx.toKey(-1), idx.toKey(2)); - assertBefore(idx.toKey(-11), idx.toKey(2)); - assertBefore(idx.toKey(-11), idx.toKey(-1)); - assertBefore(idx.toKey(1), idx.toKey(11)); - assertBefore(idx.toKey(Integer.MIN_VALUE), idx.toKey(Integer.MAX_VALUE)); - - assertBefore(idx.toKey(1L), idx.toKey(2L)); - assertBefore(idx.toKey(-1L), idx.toKey(2L)); - assertBefore(idx.toKey(Long.MIN_VALUE), idx.toKey(Long.MAX_VALUE)); - - assertBefore(idx.toKey((short) 1), idx.toKey((short) 2)); - assertBefore(idx.toKey((short) -1), idx.toKey((short) 2)); - assertBefore(idx.toKey(Short.MIN_VALUE), idx.toKey(Short.MAX_VALUE)); - - assertBefore(idx.toKey((byte) 1), idx.toKey((byte) 2)); - assertBefore(idx.toKey((byte) -1), idx.toKey((byte) 2)); - assertBefore(idx.toKey(Byte.MIN_VALUE), idx.toKey(Byte.MAX_VALUE)); - - byte prefix = LevelDBTypeInfo.ENTRY_PREFIX; - assertSame(new byte[] { prefix, LevelDBTypeInfo.FALSE }, idx.toKey(false)); - assertSame(new byte[] { prefix, LevelDBTypeInfo.TRUE }, idx.toKey(true)); - } - - @Test - public void testArrayIndices() throws Exception { - LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next(); - - assertBefore(idx.toKey(new String[] { "str1" }), idx.toKey(new String[] { "str2" })); - assertBefore(idx.toKey(new String[] { "str1", "str2" }), - idx.toKey(new String[] { "str1", "str3" })); - - assertBefore(idx.toKey(new int[] { 1 }), idx.toKey(new int[] { 2 })); - assertBefore(idx.toKey(new int[] { 1, 2 }), idx.toKey(new int[] { 1, 3 })); - } - - private LevelDBTypeInfo newTypeInfo(Class type) throws Exception { - return new LevelDBTypeInfo(null, type, type.getName().getBytes(UTF_8)); - } - - private void assertBefore(byte[] key1, byte[] key2) { - assertBefore(new String(key1, UTF_8), new String(key2, UTF_8)); - } - - private void assertBefore(String str1, String str2) { - assertTrue(str1.compareTo(str2) < 0, String.format("%s < %s failed", str1, str2)); - } - - private void assertSame(byte[] key1, byte[] key2) { - assertEquals(new String(key1, UTF_8), new String(key2, UTF_8)); - } - - public static class NoNaturalIndex { - - public String id; - - } - - public static class NoNaturalIndex2 { - - @KVIndex("id") - public String id; - - } - - public static class DuplicateIndex { - - @KVIndex - public String key; - - @KVIndex("id") - public String id; - - @KVIndex("id") - public String id2; - - } - - public static class EmptyIndexName { - - @KVIndex("") - public String id; - - } - - public static class IllegalIndexName { - - @KVIndex("__invalid") - public String id; - - } - - public static class IllegalIndexMethod { - - @KVIndex("id") - public String id(boolean illegalParam) { - return null; - } - - } - -} diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 2f2be88ac9dd1..e5d7c7a44a11b 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -87,11 +87,6 @@ org.apache.commons commons-lang3
- - ${leveldbjni.group} - leveldbjni-all - 1.8 - org.rocksdb rocksdbjni diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DB.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DB.java index 0ac0c9e8fe4d1..f26093818b99b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DB.java +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DB.java @@ -23,7 +23,7 @@ /** * The local KV storage used to persist the shuffle state, - * the implementations may include LevelDB, RocksDB, etc. + * the implementations may include RocksDB, etc. */ @Private public interface DB extends Closeable { diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java index e09ccd37b4d9b..06a793a5f65dd 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java @@ -21,10 +21,10 @@ /** * The enum `DBBackend` use to specify a disk-based store used in shuffle service local db. - * Support the use of LevelDB and RocksDB. + * Support the use of RocksDB. */ public enum DBBackend { - LEVELDB(".ldb"), ROCKSDB(".rdb"); + ROCKSDB(".rdb"); private final String fileSuffix; diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDB.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDB.java deleted file mode 100644 index 62e6450a9c7c9..0000000000000 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDB.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.shuffledb; - -import java.io.IOException; - -public class LevelDB implements DB { - private final org.iq80.leveldb.DB db; - - public LevelDB(org.iq80.leveldb.DB db) { - this.db = db; - } - - @Override - public void put(byte[] key, byte[] value) { - db.put(key, value); - } - - @Override - public byte[] get(byte[] key) { - return db.get(key); - } - - @Override - public void delete(byte[] key) { - db.delete(key); - } - - @Override - public void close() throws IOException { - db.close(); - } - - @Override - public DBIterator iterator() { - return new LevelDBIterator(db.iterator()); - } -} diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java deleted file mode 100644 index 2ac549775449a..0000000000000 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.shuffledb; - -import java.io.IOException; -import java.util.Map; -import java.util.NoSuchElementException; - -public class LevelDBIterator implements DBIterator { - - private final org.iq80.leveldb.DBIterator it; - - private boolean checkedNext; - - private boolean closed; - - private Map.Entry next; - - public LevelDBIterator(org.iq80.leveldb.DBIterator it) { - this.it = it; - } - - @Override - public boolean hasNext() { - if (!checkedNext && !closed) { - next = loadNext(); - checkedNext = true; - } - if (!closed && next == null) { - try { - close(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - return next != null; - } - - @Override - public Map.Entry next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - checkedNext = false; - Map.Entry ret = next; - next = null; - return ret; - } - - @Override - public void close() throws IOException { - if (!closed) { - it.close(); - closed = true; - next = null; - } - } - - @Override - public void seek(byte[] key) { - it.seek(key); - } - - private Map.Entry loadNext() { - boolean hasNext = it.hasNext(); - if (!hasNext) { - return null; - } - return it.next(); - } -} diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java index 94a64b3f4037c..9d204bfc00867 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java @@ -22,16 +22,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import org.apache.spark.internal.SparkLogger; -import org.apache.spark.internal.SparkLoggerFactory; import org.apache.spark.network.shuffledb.DB; import org.apache.spark.network.shuffledb.DBBackend; -import org.apache.spark.network.shuffledb.LevelDB; import org.apache.spark.network.shuffledb.RocksDB; import org.apache.spark.network.shuffledb.StoreVersion; public class DBProvider { - private static final SparkLogger logger = SparkLoggerFactory.getLogger(DBProvider.class); public static DB initDB( DBBackend dbBackend, File dbFile, @@ -39,11 +35,6 @@ public static DB initDB( ObjectMapper mapper) throws IOException { if (dbFile != null) { return switch (dbBackend) { - case LEVELDB -> { - org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version, mapper); - logger.warn("The LEVELDB is deprecated. Please use ROCKSDB instead."); - yield levelDB != null ? new LevelDB(levelDB) : null; - } case ROCKSDB -> { org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile, version, mapper); yield rocksDB != null ? new RocksDB(rocksDB) : null; @@ -57,10 +48,6 @@ public static DB initDB( public static DB initDB(DBBackend dbBackend, File file) throws IOException { if (file != null) { return switch (dbBackend) { - case LEVELDB -> { - logger.warn("The LEVELDB is deprecated. Please use ROCKSDB instead."); - yield new LevelDB(LevelDBProvider.initLevelDB(file)); - } case ROCKSDB -> new RocksDB(RocksDBProvider.initRocksDB(file)); }; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java deleted file mode 100644 index 391931961a474..0000000000000 --- a/common/network-common/src/main/java/org/apache/spark/network/util/LevelDBProvider.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.util; - -import java.io.File; -import java.io.IOException; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; -import org.fusesource.leveldbjni.JniDBFactory; -import org.fusesource.leveldbjni.internal.NativeDB; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.Options; - -import org.apache.spark.internal.SparkLogger; -import org.apache.spark.internal.SparkLoggerFactory; -import org.apache.spark.internal.LogKeys; -import org.apache.spark.internal.MDC; -import org.apache.spark.network.shuffledb.StoreVersion; - -/** - * LevelDB utility class available in the network package. - */ -public class LevelDBProvider { - private static final SparkLogger logger = SparkLoggerFactory.getLogger(LevelDBProvider.class); - - public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper mapper) throws - IOException { - DB tmpDb = null; - if (dbFile != null) { - Options options = new Options(); - options.createIfMissing(false); - options.logger(new LevelDBLogger()); - try { - tmpDb = JniDBFactory.factory.open(dbFile, options); - } catch (NativeDB.DBException e) { - if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - logger.info("Creating state database at {}", MDC.of(LogKeys.PATH$.MODULE$, dbFile)); - options.createIfMissing(true); - try { - tmpDb = JniDBFactory.factory.open(dbFile, options); - } catch (NativeDB.DBException dbExc) { - throw new IOException("Unable to create state store", dbExc); - } - } else { - // the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new - // one, so we can keep processing new apps - logger.error("error opening leveldb file {}. Creating new file, will not be able to " + - "recover state for existing applications", e, MDC.of(LogKeys.PATH$.MODULE$, dbFile)); - if (dbFile.isDirectory()) { - for (File f : dbFile.listFiles()) { - if (!f.delete()) { - logger.warn("error deleting {}", MDC.of(LogKeys.PATH$.MODULE$, f.getPath())); - } - } - } - if (!dbFile.delete()) { - logger.warn("error deleting {}", MDC.of(LogKeys.PATH$.MODULE$, dbFile.getPath())); - } - options.createIfMissing(true); - try { - tmpDb = JniDBFactory.factory.open(dbFile, options); - } catch (NativeDB.DBException dbExc) { - throw new IOException("Unable to create state store", dbExc); - } - - } - } - // if there is a version mismatch, we throw an exception, which means the service is unusable - try { - checkVersion(tmpDb, version, mapper); - } catch (IOException ioe) { - tmpDb.close(); - throw ioe; - } - } - return tmpDb; - } - - @VisibleForTesting - static DB initLevelDB(File file) throws IOException { - Options options = new Options(); - options.createIfMissing(true); - JniDBFactory factory = new JniDBFactory(); - return factory.open(file, options); - } - - private static class LevelDBLogger implements org.iq80.leveldb.Logger { - private static final SparkLogger LOG = SparkLoggerFactory.getLogger(LevelDBLogger.class); - - @Override - public void log(String message) { - LOG.info(message); - } - } - - /** - * Simple major.minor versioning scheme. Any incompatible changes should be across major - * versions. Minor version differences are allowed -- meaning we should be able to read - * dbs that are either earlier *or* later on the minor version. - */ - public static void checkVersion(DB db, StoreVersion newversion, ObjectMapper mapper) throws - IOException { - byte[] bytes = db.get(StoreVersion.KEY); - if (bytes == null) { - storeVersion(db, newversion, mapper); - } else { - StoreVersion version = mapper.readValue(bytes, StoreVersion.class); - if (version.major != newversion.major) { - throw new IOException("cannot read state DB with version " + version + ", incompatible " + - "with current version " + newversion); - } - storeVersion(db, newversion, mapper); - } - } - - public static void storeVersion(DB db, StoreVersion version, ObjectMapper mapper) - throws IOException { - db.put(StoreVersion.KEY, mapper.writeValueAsBytes(version)); - } -} diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java index 81bfc55264c4c..62dff6b8d17cc 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java @@ -18,7 +18,6 @@ package org.apache.spark.network.util; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang3.SystemUtils; import org.apache.spark.network.shuffledb.DBBackend; import org.apache.spark.network.shuffledb.StoreVersion; import org.junit.jupiter.api.Assertions; @@ -27,8 +26,6 @@ import java.io.File; import java.io.IOException; -import static org.junit.jupiter.api.Assumptions.assumeFalse; - public class DBProviderSuite { @Test @@ -36,12 +33,6 @@ public void testRockDBCheckVersionFailed() throws IOException, InterruptedExcept testCheckVersionFailed(DBBackend.ROCKSDB, "rocksdb"); } - @Test - public void testLevelDBCheckVersionFailed() throws IOException, InterruptedException { - assumeFalse(SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64")); - testCheckVersionFailed(DBBackend.LEVELDB, "leveldb"); - } - private void testCheckVersionFailed(DBBackend dbBackend, String namePrefix) throws IOException, InterruptedException { String root = System.getProperty("java.io.tmpdir"); diff --git a/common/tags/src/test/java/org/apache/spark/tags/ExtendedLevelDBTest.java b/common/tags/src/test/java/org/apache/spark/tags/ExtendedLevelDBTest.java deleted file mode 100644 index e76c477c52656..0000000000000 --- a/common/tags/src/test/java/org/apache/spark/tags/ExtendedLevelDBTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.tags; - -import org.scalatest.TagAnnotation; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@TagAnnotation -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.METHOD, ElementType.TYPE}) -public @interface ExtendedLevelDBTest { } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala index 122ed299242f5..0ffd1fe11c34a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala @@ -27,9 +27,7 @@ import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.config.History import org.apache.spark.internal.config.History._ -import org.apache.spark.internal.config.History.HybridStoreDiskBackend.ROCKSDB import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils._ import org.apache.spark.util.{Clock, Utils} @@ -58,8 +56,7 @@ private class HistoryServerDiskManager( if (!appStoreDir.isDirectory() && !appStoreDir.mkdir()) { throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).") } - private val extension = - if (conf.get(History.HYBRID_STORE_DISK_BACKEND) == ROCKSDB.toString) ".rdb" else ".ldb" + private val extension = ".rdb" private val tmpStoreDir = new File(path, "temp") if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala index b8df873ba871c..c5f54654fd9a4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala @@ -149,7 +149,6 @@ private[history] class HybridStore extends KVStore { val values = Lists.newArrayList( inMemoryStore.view(klass).closeableIterator()) diskStore match { - case db: LevelDB => db.writeAll(values) case db: RocksDB => db.writeAll(values) case _ => throw new IllegalStateException("Unknown disk-based KVStore") } diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index bbd4afcaebab4..974f223da88ac 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -293,11 +293,11 @@ private[spark] object History { .createWithDefaultString("2g") object HybridStoreDiskBackend extends Enumeration { - val LEVELDB, ROCKSDB = Value + val ROCKSDB = Value } val HYBRID_STORE_DISK_BACKEND = ConfigBuilder("spark.history.store.hybridStore.diskBackend") - .doc("Specifies a disk-based store used in hybrid store; ROCKSDB or LEVELDB (deprecated).") + .doc("Specifies a disk-based store used in hybrid store; only supported ROCKSDB at now.") .version("3.3.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7cb3d068b676f..eb16e91cfa444 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -813,7 +813,7 @@ package object config { private[spark] val SHUFFLE_SERVICE_DB_BACKEND = ConfigBuilder(Constants.SHUFFLE_SERVICE_DB_BACKEND) .doc("Specifies a disk-based store used in shuffle service local db. " + - "ROCKSDB or LEVELDB (deprecated).") + "Only supported ROCKSDB at now.") .version("3.4.0") .enumConf(classOf[DBBackend]) .createWithDefault(DBBackend.ROCKSDB) diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index e334626413dc0..a0cd394c36f25 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -26,7 +26,6 @@ import scala.reflect.{classTag, ClassTag} import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.fusesource.leveldbjni.internal.NativeDB import org.rocksdb.RocksDBException import org.apache.spark.SparkConf @@ -48,9 +47,7 @@ private[spark] object KVUtils extends Logging { private def backend(conf: SparkConf, live: Boolean) = { if (live) { - // For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now, - // instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with - // improvements on writes and reads. + // For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now. HybridStoreDiskBackend.ROCKSDB } else { HybridStoreDiskBackend.withName(conf.get(HYBRID_STORE_DISK_BACKEND)) @@ -96,9 +93,6 @@ private[spark] object KVUtils extends Logging { val kvSerializer = serializer(conf, live) val db = backend(conf, live) match { - case LEVELDB => - logWarning("The LEVELDB is deprecated. Please use ROCKSDB instead.") - new LevelDB(path, kvSerializer) case ROCKSDB => new RocksDB(path, kvSerializer) } val dbMeta = db.getMetadata(classTag[M].runtimeClass) @@ -131,7 +125,6 @@ private[spark] object KVUtils extends Logging { val diskBackend = backend(conf, live) val dir = diskBackend match { - case LEVELDB => "listing.ldb" case ROCKSDB => "listing.rdb" } @@ -153,7 +146,7 @@ private[spark] object KVUtils extends Logging { logInfo("Detected incompatible DB versions, deleting...") path.listFiles().foreach(Utils.deleteRecursively) open(dbPath, metadata, conf, live) - case dbExc @ (_: NativeDB.DBException | _: RocksDBException) => + case dbExc: RocksDBException => // Get rid of the corrupted data and re-create it. logWarning(log"Failed to load disk store ${MDC(PATH, dbPath)} :", dbExc) Utils.deleteRecursively(dbPath) diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala index 921175bd41038..66850305522e3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.{ExternalBlockHandler, ExternalShuffleBlockResolver} import org.apache.spark.network.shuffle.TestShuffleDataContext import org.apache.spark.network.shuffledb.DBBackend -import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.Utils /** @@ -148,11 +147,6 @@ abstract class ExternalShuffleServiceDbSuite extends SparkFunSuite { } } -@ExtendedLevelDBTest -class ExternalShuffleServiceLevelDBSuite extends ExternalShuffleServiceDbSuite { - override protected def shuffleDBBackend(): DBBackend = DBBackend.LEVELDB -} - class ExternalShuffleServiceRocksDBSuite extends ExternalShuffleServiceDbSuite { override protected def shuffleDBBackend(): DBBackend = DBBackend.ROCKSDB } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala index ec9278f81b6c2..a2adc48c6aa95 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala @@ -21,7 +21,7 @@ import org.openqa.selenium.WebDriver import org.openqa.selenium.chrome.{ChromeDriver, ChromeOptions} import org.apache.spark.internal.config.History.HybridStoreDiskBackend -import org.apache.spark.tags.{ChromeUITest, ExtendedLevelDBTest, WebBrowserTest} +import org.apache.spark.tags.ChromeUITest /** @@ -50,14 +50,6 @@ abstract class ChromeUIHistoryServerSuite } } -@WebBrowserTest -@ChromeUITest -@ExtendedLevelDBTest -class LevelDBBackendChromeUIHistoryServerSuite extends ChromeUIHistoryServerSuite { - override protected def diskBackend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.LEVELDB -} - -@WebBrowserTest @ChromeUITest class RocksDBBackendChromeUIHistoryServerSuite extends ChromeUIHistoryServerSuite { override protected def diskBackend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d58c996f23655..f8609a9462c06 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -53,7 +53,6 @@ import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils.KVStoreScalaSerializer import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.status.protobuf.KVStoreProtobufSerializer -import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} import org.apache.spark.util.kvstore.InMemoryStore import org.apache.spark.util.logging.DriverLogger @@ -1890,17 +1889,6 @@ class TestGroupsMappingProvider extends GroupMappingServiceProvider { } } -@ExtendedLevelDBTest -class LevelDBBackendFsHistoryProviderSuite extends FsHistoryProviderSuite { - override protected def diskBackend: HybridStoreDiskBackend.Value = - HybridStoreDiskBackend.LEVELDB -} - -@ExtendedLevelDBTest -class LevelDBBackendWithProtobufSerializerSuite extends LevelDBBackendFsHistoryProviderSuite { - override protected def serializer: LocalStoreSerializer.Value = LocalStoreSerializer.PROTOBUF -} - class RocksDBBackendFsHistoryProviderSuite extends FsHistoryProviderSuite { override protected def diskBackend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala index e4248a49b90a9..b7d75aee131e5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.History.HybridStoreDiskBackend import org.apache.spark.status.KVUtils -import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.{ManualClock, Utils} import org.apache.spark.util.kvstore.KVStore @@ -227,12 +226,6 @@ abstract class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAn } } -@ExtendedLevelDBTest -class HistoryServerDiskManagerUseLevelDBSuite extends HistoryServerDiskManagerSuite { - override protected def backend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.LEVELDB - override protected def extension: String = ".ldb" -} - class HistoryServerDiskManagerUseRocksDBSuite extends HistoryServerDiskManagerSuite { override protected def backend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB override protected def extension: String = ".rdb" diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 10092f416f9e1..0e47087b270f2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -49,7 +49,7 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.status.api.v1.ApplicationInfo import org.apache.spark.status.api.v1.JobData -import org.apache.spark.tags.{ExtendedLevelDBTest, WebBrowserTest} +import org.apache.spark.tags.WebBrowserTest import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ResetSystemProperties, ShutdownHookManager, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -808,13 +808,6 @@ object FakeAuthFilter { val FAKE_HTTP_USER = "HTTP_USER" } -@WebBrowserTest -@ExtendedLevelDBTest -class LevelDBBackendHistoryServerSuite extends HistoryServerSuite { - override protected def diskBackend: History.HybridStoreDiskBackend.Value = - HybridStoreDiskBackend.LEVELDB -} - @WebBrowserTest class RocksDBBackendHistoryServerSuite extends HistoryServerSuite { override protected def diskBackend: History.HybridStoreDiskBackend.Value = diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala index f59ee63cccf73..ae031b7b49fe5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala @@ -28,7 +28,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.status.KVUtils._ -import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.Utils import org.apache.spark.util.kvstore._ @@ -79,7 +78,7 @@ abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with T store.setMetadata(t1) assert(store.getMetadata(classOf[CustomType1]) === t1) - // Switch to RocksDB/LevelDB and set a new metadata + // Switch to RocksDB and set a new metadata switchHybridStore(store) val t2 = createCustomType1(2) @@ -172,7 +171,7 @@ abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with T failAfter(2.seconds) { assert(listener.waitUntilDone()) } - while (!store.getStore().isInstanceOf[LevelDB] && !store.getStore().isInstanceOf[RocksDB]) { + while (!store.getStore().isInstanceOf[RocksDB]) { Thread.sleep(10) } } @@ -206,15 +205,6 @@ abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with T } } -@ExtendedLevelDBTest -class LevelDBHybridStoreSuite extends HybridStoreSuite { - before { - dbpath = File.createTempFile("test.", ".ldb") - dbpath.delete() - db = new LevelDB(dbpath, new KVStoreScalaSerializer()) - } -} - class RocksDBHybridStoreSuite extends HybridStoreSuite { before { dbpath = File.createTempFile("test.", ".rdb") diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index f3bae2066e146..2435265f83c0f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -347,12 +347,6 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with P testWorkDirCleanupAndRemoveMetadataWithConfig(true, DBBackend.ROCKSDB) } - test("WorkDirCleanup cleans app dirs and shuffle metadata when " + - "spark.shuffle.service.db.enabled=true, spark.shuffle.service.db.backend=LevelDB") { - assume(!Utils.isMacOnAppleSilicon) - testWorkDirCleanupAndRemoveMetadataWithConfig(true, DBBackend.LEVELDB) - } - test("WorkDirCleanup cleans only app dirs when" + "spark.shuffle.service.db.enabled=false") { testWorkDirCleanupAndRemoveMetadataWithConfig(false) diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index b02424ff6d5e6..ebefe3cc5bb5d 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -21,8 +21,8 @@ import java.util.Locale import java.util.concurrent.TimeUnit import org.apache.spark.{SparkConf, SparkFunSuite, SparkIllegalArgumentException} -import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.util.ByteUnit +import org.apache.spark.storage.StorageLevelMapper import org.apache.spark.util.SparkConfWithEnv class ConfigEntrySuite extends SparkFunSuite { @@ -474,21 +474,21 @@ class ConfigEntrySuite extends SparkFunSuite { test("SPARK-51896: Add Java enum support to ConfigBuilder") { val conf = new SparkConf() val enumConf = ConfigBuilder("spark.test.java.enum.key") - .enumConf(classOf[DBBackend]) - .createWithDefault(DBBackend.LEVELDB) - assert(conf.get(enumConf) === DBBackend.LEVELDB) - conf.set(enumConf, DBBackend.ROCKSDB) - assert(conf.get(enumConf) === DBBackend.ROCKSDB) + .enumConf(classOf[StorageLevelMapper]) + .createWithDefault(StorageLevelMapper.MEMORY_ONLY) + assert(conf.get(enumConf) === StorageLevelMapper.MEMORY_ONLY) + conf.set(enumConf, StorageLevelMapper.OFF_HEAP) + assert(conf.get(enumConf) === StorageLevelMapper.OFF_HEAP) checkError( exception = intercept[SparkIllegalArgumentException] { - conf.set(enumConf.key, "ANYDB") + conf.set(enumConf.key, "ANY_LEVEL") conf.get(enumConf) }, condition = "INVALID_CONF_VALUE.OUT_OF_RANGE_OF_OPTIONS", parameters = Map( "confName" -> enumConf.key, - "confValue" -> "ANYDB", - "confOptions" -> DBBackend.values.mkString(", ")) + "confValue" -> "ANY_LEVEL", + "confOptions" -> StorageLevelMapper.values.mkString(", ")) ) } } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 4d75f5d7a1fc7..3d137c4118b24 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -35,7 +35,6 @@ import org.apache.spark.scheduler.cluster._ import org.apache.spark.status.ListenerEventsTestHelper._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ -import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} @@ -1975,12 +1974,6 @@ class AppStatusListenerWithInMemoryStoreSuite extends AppStatusListenerSuite { override def createKVStore: KVStore = new InMemoryStore() } -@ExtendedLevelDBTest -class AppStatusListenerWithLevelDBSuite extends AppStatusListenerSuite { - override def conf: SparkConf = super.conf - .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString) -} - class AppStatusListenerWithRocksDBSuite extends AppStatusListenerSuite { override def conf: SparkConf = super.conf .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index f2b795764b7e8..16528b1d71c61 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -115,19 +115,12 @@ class AppStatusStoreSuite extends SparkFunSuite { } private val cases = { - val baseCases = Seq( + Seq( "disk rocksdb" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = false), "in memory" -> createAppStore(disk = false, live = false), "in memory live" -> createAppStore(disk = false, live = true), "rocksdb live" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = true) ) - if (Utils.isMacOnAppleSilicon) { - baseCases - } else { - Seq( - "disk leveldb" -> createAppStore(disk = true, HybridStoreDiskBackend.LEVELDB, live = false) - ) ++ baseCases - } } cases.foreach { case (hint, appStore) => diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index f3f7ad4bcd1f9..ff62c88fe837e 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -185,7 +185,6 @@ kubernetes-model-resource/7.3.1//kubernetes-model-resource-7.3.1.jar kubernetes-model-scheduling/7.3.1//kubernetes-model-scheduling-7.3.1.jar kubernetes-model-storageclass/7.3.1//kubernetes-model-storageclass-7.3.1.jar lapack/3.0.3//lapack-3.0.3.jar -leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.16.0//libthrift-0.16.0.jar listenablefuture/9999.0-empty-to-avoid-conflict-with-guava//listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar diff --git a/docs/configuration.md b/docs/configuration.md index 6347869916470..3d99b03537eb8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1370,7 +1370,7 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.service.db.backend ROCKSDB - Specifies a disk-based store used in shuffle service local db. Setting as ROCKSDB or LEVELDB (deprecated). + Specifies a disk-based store used in shuffle service local db. Only supported ROCKSDB at now. 3.4.0 diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 7d51801edc67c..9882494eebaa4 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -27,6 +27,7 @@ license: | - Since Spark 4.1, Spark Master deamon provides REST API by default. To restore the behavior before Spark 4.1, you can set `spark.master.rest.enabled` to `false`. - Since Spark 4.1, Spark will compress RDD checkpoints by default. To restore the behavior before Spark 4.1, you can set `spark.checkpoint.compress` to `false`. - Since Spark 4.1, Spark uses Apache Hadoop Magic Committer for all S3 buckets by default. To restore the behavior before Spark 4.0, you can set `spark.hadoop.fs.s3a.committer.magic.enabled=false`. +- Since Spark 4.1, `spark.shuffle.service.db.backend` is no longer supported `LEVELDB` as a backend for shuffle service. ## Upgrading from Core 3.5 to 4.0 diff --git a/docs/monitoring.md b/docs/monitoring.md index 957ee555191a4..1d87519cd7dcd 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -416,7 +416,7 @@ Security options for the Spark History Server are covered more detail in the spark.history.store.hybridStore.diskBackend ROCKSDB - Specifies a disk-based store used in hybrid store; ROCKSDB or LEVELDB (deprecated). + Specifies a disk-based store used in hybrid store; only supported ROCKSDB at now. 3.3.0 diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 465f3a9d075a2..74417f3f9ff80 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -906,10 +906,7 @@ The following extra configuration options are available when the shuffle service ROCKSDB When work-preserving restart is enabled in YARN, this is used to specify the disk-base store used - in shuffle service state store, supports `ROCKSDB` and `LEVELDB` (deprecated) with `ROCKSDB` as default value. - The original data store in `RocksDB/LevelDB` will not be automatically converted to another kind - of storage now. The original data store will be retained and the new type data store will be - created when switching storage types. + in shuffle service state store, only supported `ROCKSDB` at now. 3.4.0 diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 5a8eb3f1e0602..e542ca4c797c4 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -493,8 +493,7 @@ SPARK_WORKER_OPTS supports the following system properties: ROCKSDB When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based - store used in shuffle service state store. This supports ROCKSDB and LEVELDB (deprecated) now and ROCKSDB as default value. - The original data store in RocksDB/LevelDB will not be automatically convert to another kind of storage now. + store used in shuffle service state store. Only supported ROCKSDB at now. 3.4.0 diff --git a/pom.xml b/pom.xml index 077bbea4380a7..6ed1e38481b77 100644 --- a/pom.xml +++ b/pom.xml @@ -235,8 +235,6 @@ 3.0.1 0.12.6 - - org.fusesource.leveldbjni 7.3.1 ${java.home} @@ -766,11 +764,6 @@ rocksdbjni 9.8.4 - - ${leveldbjni.group} - leveldbjni-all - 1.8 - org.seleniumhq.selenium selenium-java @@ -3559,23 +3552,10 @@ jjwt-provided - - - aarch64 - - org.openlabtesting.leveldbjni - - - - linux - aarch64 - - - applesilicon - org.apache.spark.tags.ChromeUITest,org.apache.spark.tags.ExtendedLevelDBTest + org.apache.spark.tags.ChromeUITest diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 730837ad611ee..9ef25ddebf4cc 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -77,6 +77,10 @@ object MimaExcludes { // SPARK-51267: Match local Spark Connect server logic between Python and Scala ProblemFilters.exclude[MissingFieldProblem]("org.apache.spark.launcher.SparkLauncher.SPARK_LOCAL_REMOTE"), + // SPARK-44223: remove LevelDB support + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.kvstore.LevelDB"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.kvstore.LevelDB$TypeAliases"), + (problem: Problem) => problem match { case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") && !cls.fullName.startsWith("org.sparkproject.dmg.pmml") diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 77001e6bdf227..ecff2080e0297 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1586,11 +1586,7 @@ object TestSettings { import BuildCommons._ private val defaultExcludedTags = Seq("org.apache.spark.tags.ChromeUITest", "org.apache.spark.deploy.k8s.integrationtest.YuniKornTag", - "org.apache.spark.internal.io.cloud.IntegrationTestSuite") ++ - (if (System.getProperty("os.name").startsWith("Mac OS X") && - System.getProperty("os.arch").equals("aarch64")) { - Seq("org.apache.spark.tags.ExtendedLevelDBTest") - } else Seq.empty) + "org.apache.spark.internal.io.cloud.IntegrationTestSuite") lazy val settings = Seq ( // Fork new JVMs for tests and set Java options for those diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala index 3857fedb7aabf..a87a5d24ec6c7 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} -import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedYarnTest} +import org.apache.spark.tags.ExtendedYarnTest /** * SPARK-34828: Integration test for the external shuffle service with an alternate name and @@ -77,12 +77,6 @@ abstract class YarnShuffleAlternateNameConfigSuite extends YarnShuffleIntegratio } } } -@ExtendedLevelDBTest -@ExtendedYarnTest -class YarnShuffleAlternateNameConfigWithLevelDBBackendSuite - extends YarnShuffleAlternateNameConfigSuite { - override protected def dbBackend: DBBackend = DBBackend.LEVELDB -} @ExtendedYarnTest class YarnShuffleAlternateNameConfigWithRocksDBBackendSuite diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index f8d69c0ae568e..6503a3c51b255 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.config.Network._ import org.apache.spark.network.shuffle.ShuffleTestAccessor import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} -import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedYarnTest} +import org.apache.spark.tags.ExtendedYarnTest /** * Integration test for the external shuffle service with a yarn mini-cluster @@ -86,13 +86,6 @@ abstract class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { } } -@ExtendedLevelDBTest -@ExtendedYarnTest -class YarnShuffleIntegrationWithLevelDBBackendSuite - extends YarnShuffleIntegrationSuite { - override protected def dbBackend: DBBackend = DBBackend.LEVELDB -} - @ExtendedYarnTest class YarnShuffleIntegrationWithRocksDBBackendSuite extends YarnShuffleIntegrationSuite { @@ -118,12 +111,6 @@ abstract class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { } } -@ExtendedLevelDBTest -@ExtendedYarnTest -class YarnShuffleAuthWithLevelDBBackendSuite extends YarnShuffleAuthSuite { - override protected def dbBackend: DBBackend = DBBackend.LEVELDB -} - @ExtendedYarnTest class YarnShuffleAuthWithRocksDBBackendSuite extends YarnShuffleAuthSuite { override protected def dbBackend: DBBackend = DBBackend.ROCKSDB diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 56d7b7ff6a09e..fce2746a09f96 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -55,7 +55,6 @@ import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.util.JavaUtils import org.apache.spark.network.util.TransportConf import org.apache.spark.network.yarn.util.HadoopConfigProvider -import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.Utils abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { @@ -1257,11 +1256,6 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { } } -@ExtendedLevelDBTest -class YarnShuffleServiceWithLevelDBBackendSuite extends YarnShuffleServiceSuite { - override protected def shuffleDBBackend(): DBBackend = DBBackend.LEVELDB -} - class YarnShuffleServiceWithRocksDBBackendSuite extends YarnShuffleServiceSuite { override protected def shuffleDBBackend(): DBBackend = DBBackend.ROCKSDB } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index 1eb3ce6bd82ee..958711a012f60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -320,20 +320,6 @@ class StreamingQueryStatusListenerWithDiskStoreSuite extends StreamingQueryStatu } } - test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") { - assume(!Utils.isMacOnAppleSilicon) - val conf = new SparkConf() - .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString) - val testDir = Utils.createTempDir() - val kvStore = KVUtils.open(testDir, getClass.getName, conf, live = false) - try { - testStreamingQueryData(kvStore) - } finally { - kvStore.close() - Utils.deleteRecursively(testDir) - } - } - test("SPARK-38056: test writing StreamingQueryData to a RocksDB store") { val conf = new SparkConf() .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)