-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Hello, I'm writing a Storm bolt that will insert data into a Titan graph backed by Cassandra (by DataStax, version ReleaseVersion: 1.2.5).
Titan Version: 0.3.1 (depends on Kryo version: 2.21)
Storm Version: 0.8.2 (depends on Kryo version: 2.17)
I think I have a Catch-22. I can either tell the build tool to use Titan's Kryo dependency or Storm's Kryo dependency. No matter what I do, I see a problem.
Use Titan's Kryo Dependency
First, when I specify that Titan's version of Kryo should be used (version 2.21), I get the following stack trace when I run my Storm topology:
3657 [Thread-7] ERROR backtype.storm.daemon.worker - Error on initialization of server mk-worker
java.lang.NoSuchMethodError: backtype.storm.serialization.DefaultKryoFactory$KryoSerializableDefault.setReferences(Z)V
at backtype.storm.serialization.DefaultKryoFactory.getKryo(DefaultKryoFactory.java:32)
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:32)
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15)
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22)
at backtype.storm.daemon.executor$executor_data$fn__3909.invoke(executor.clj:195)
at backtype.storm.util$assoc_apply_self.invoke(util.clj:731)
at backtype.storm.daemon.executor$executor_data.invoke(executor.clj:195)
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:263)
at backtype.storm.daemon.worker$fn__4348$exec_fn__1228__auto____4349$iter__4354__4358$fn__4359.invoke(worker.clj:354)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$dorun.invoke(core.clj:2725)
at clojure.core$doall.invoke(core.clj:2741)
at backtype.storm.daemon.worker$fn__4348$exec_fn__1228__auto____4349.invoke(worker.clj:354)
at clojure.lang.AFn.applyToHelper(AFn.java:185)
at clojure.lang.AFn.applyTo(AFn.java:151)
at clojure.core$apply.invoke(core.clj:601)
at backtype.storm.daemon.worker$fn__4348$mk_worker__4404.doInvoke(worker.clj:323)
at clojure.lang.RestFn.invoke(RestFn.java:512)
at backtype.storm.daemon.supervisor$fn__4807.invoke(supervisor.clj:467)
at clojure.lang.MultiFn.invoke(MultiFn.java:177)
at backtype.storm.daemon.supervisor$sync_processes$iter__4684__4688$fn__4689.invoke(supervisor.clj:249)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$dorun.invoke(core.clj:2725)
at clojure.core$doall.invoke(core.clj:2741)
at backtype.storm.daemon.supervisor$sync_processes.invoke(supervisor.clj:237)
at clojure.lang.AFn.applyToHelper(AFn.java:161)
at clojure.lang.AFn.applyTo(AFn.java:151)
at clojure.core$apply.invoke(core.clj:603)
at clojure.core$partial$fn__4070.doInvoke(core.clj:2343)
at clojure.lang.RestFn.invoke(RestFn.java:397)
at backtype.storm.event$event_manager$fn__2507.invoke(event.clj:24)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:722)
Storm's DefaultKryoFactory defines a nested class named KryoSerializableDefault which extends Kryo. DefaultKryoFactory calls Kryo's setReferences method.
Kryo's setReferences method currently has the following signature:
public boolean setReferences (boolean references)
On August 17, 2012 nathan.sweet checked in a change that changed the return type of setReferences to boolean.
You'll note that the stack trace above shows the missing method to be:
KryoSerializableDefault.setReferences(Z)V
where the V stands for void and Z stands for boolean (see here for more details on type codes).
Use Storm's Kryo Dependency
When I use Storm's Kryo dependency (version 2.17), drop the titan keyspace in cassandra-cli, and then run my Storm topology, every thing seems to work fine at first but then when I want to view my database with gremlin, I start having problems:
I start up gremlin and enter the following:
c = new BaseConfiguration()
c.setProperty("storage.backend", "cassandra")
c.setProperty("storage.hostname", "127.0.0.1")
g = TitanFactory.open(c)
So far, everything works great. So I want to see how many vertices I have:
g.V.count()
And I get this:
13/06/13 11:51:00 INFO thrift.ThriftKeyspaceImpl: Detected partitioner org.apache.cassandra.dht.Murmur3Partitioner for keyspace titan
java.lang.IllegalArgumentException: Can not set com.thinkaurelius.titan.core.TypeGroup field com.thinkaurelius.titan.graphdb.types.AbstractTypeDefinition.group to com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition
Serialization trace:
group (com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition)
I hit Y to see the stack trace and I get this:
com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set com.thinkaurelius.titan.core.TypeGroup field com.thinkaurelius.titan.graphdb.types.AbstractTypeDefinition.group to com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition
Serialization trace:
group (com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
at com.thinkaurelius.titan.graphdb.database.serialize.kryo.KryoSerializer.readObjectNotNull(KryoSerializer.java:109)
at com.thinkaurelius.titan.graphdb.database.EdgeSerializer.parseProperties(EdgeSerializer.java:176)
at com.thinkaurelius.titan.graphdb.database.EdgeSerializer.getProperties(EdgeSerializer.java:120)
at com.thinkaurelius.titan.graphdb.database.EdgeSerializer.readRelation(EdgeSerializer.java:65)
at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx$5$3.apply(StandardTitanTx.java:599)
at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx$5$3.apply(StandardTitanTx.java:595)
at com.google.common.collect.Iterators$9.transform(Iterators.java:893)
at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48)
at com.thinkaurelius.titan.graphdb.query.QueryProcessor$OuterIterator.nextInternal(QueryProcessor.java:148)
at com.thinkaurelius.titan.graphdb.query.QueryProcessor$OuterIterator.<init>(QueryProcessor.java:137)
at com.thinkaurelius.titan.graphdb.query.QueryProcessor.iterator(QueryProcessor.java:48)
at com.google.common.collect.Iterables$7.iterator(Iterables.java:611)
at com.google.common.collect.Iterables.getOnlyElement(Iterables.java:282)
at com.thinkaurelius.titan.graphdb.query.QueryUtil.queryHiddenUniqueProperty(QueryUtil.java:16)
at com.thinkaurelius.titan.graphdb.types.vertices.TitanKeyVertex.getDefinition(TitanKeyVertex.java:24)
at com.thinkaurelius.titan.graphdb.types.vertices.TitanKeyVertex.getDefinition(TitanKeyVertex.java:11)
at com.thinkaurelius.titan.graphdb.types.vertices.TitanTypeVertex.getName(TitanTypeVertex.java:20)
at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx$3.get(StandardTitanTx.java:241)
at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx$3.get(StandardTitanTx.java:225)
at com.thinkaurelius.titan.graphdb.transaction.vertexcache.SimpleVertexCache.get(SimpleVertexCache.java:32)
at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx.getExistingVertex(StandardTitanTx.java:221)
at com.thinkaurelius.titan.graphdb.transaction.VertexIterable$1.nextVertex(VertexIterable.java:39)
at com.thinkaurelius.titan.graphdb.transaction.VertexIterable$1.next(VertexIterable.java:58)
at com.thinkaurelius.titan.graphdb.transaction.VertexIterable$1.next(VertexIterable.java:29)
at com.tinkerpop.pipes.util.iterators.HistoryIterator.next(HistoryIterator.java:25)
at com.tinkerpop.pipes.IdentityPipe.processNextStart(IdentityPipe.java:19)
at com.tinkerpop.pipes.AbstractPipe.next(AbstractPipe.java:89)
at com.tinkerpop.pipes.util.Pipeline.next(Pipeline.java:115)
at com.tinkerpop.pipes.util.PipeHelper.counter(PipeHelper.java:108)
at com.tinkerpop.gremlin.java.GremlinPipeline.count(GremlinPipeline.java:1397)
at com.tinkerpop.pipes.util.PipesFluentPipeline$count.call(Unknown Source)
at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:42)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:112)
at groovysh_evaluate.run(groovysh_evaluate:56)
at groovysh_evaluate$run.call(Unknown Source)
at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:42)
at groovysh_evaluate$run.call(Unknown Source)
at org.codehaus.groovy.tools.shell.Interpreter.evaluate(Interpreter.groovy:67)
at org.codehaus.groovy.tools.shell.Interpreter$evaluate.call(Unknown Source)
at org.codehaus.groovy.tools.shell.Groovysh.execute(Groovysh.groovy:152)
at org.codehaus.groovy.tools.shell.Shell.leftShift(Shell.groovy:114)
at org.codehaus.groovy.tools.shell.Shell$leftShift$0.call(Unknown Source)
at org.codehaus.groovy.tools.shell.ShellRunner.work(ShellRunner.groovy:88)
at org.codehaus.groovy.tools.shell.InteractiveShellRunner.super$2$work(InteractiveShellRunner.groovy)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90)
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:233)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1079)
at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuperN(ScriptBytecodeAdapter.java:128)
at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuper0(ScriptBytecodeAdapter.java:148)
at org.codehaus.groovy.tools.shell.InteractiveShellRunner.work(InteractiveShellRunner.groovy:100)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite$PogoCachedMethodSiteNoUnwrapNoCoerce.invoke(PogoMetaMethodSite.java:272)
at org.codehaus.groovy.runtime.callsite.PogoMetaMethodSite.callCurrent(PogoMetaMethodSite.java:52)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:137)
at org.codehaus.groovy.tools.shell.ShellRunner.run(ShellRunner.groovy:57)
at org.codehaus.groovy.tools.shell.InteractiveShellRunner.super$2$run(InteractiveShellRunner.groovy)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90)
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:233)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1079)
at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuperN(ScriptBytecodeAdapter.java:128)
at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuper0(ScriptBytecodeAdapter.java:148)
at org.codehaus.groovy.tools.shell.InteractiveShellRunner.run(InteractiveShellRunner.groovy:66)
at com.thinkaurelius.titan.tinkerpop.gremlin.Console.<init>(Console.java:65)
at com.thinkaurelius.titan.tinkerpop.gremlin.Console.<init>(Console.java:78)
at com.thinkaurelius.titan.tinkerpop.gremlin.Console.main(Console.java:104)
Caused by: java.lang.IllegalArgumentException: Can not set com.thinkaurelius.titan.core.TypeGroup field com.thinkaurelius.titan.graphdb.types.AbstractTypeDefinition.group to com.thinkaurelius.titan.graphdb.types.StandardKeyDefinition
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:164)
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:168)
at sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81)
at java.lang.reflect.Field.set(Field.java:680)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:619)
... 79 more
This is where I get confused. If we're just using one version of Kryo, then you'd think that we would be able to read anything we had written (unless there was a bug in Kryo).
Do you know of any workarounds, or fixes I could employ here?
Thanks!
Tim Stewart