Skip to content

Commit f67e064

Browse files
committed
static join enabled
1 parent 44041f2 commit f67e064

File tree

4 files changed

+130
-37
lines changed

4 files changed

+130
-37
lines changed

src/main/kotlin/no/uio/microobject/ast/stmt/PushStaticStmt.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,31 +40,37 @@ data class PushStaticStmt(val target : Location, val sources: Expression, val po
4040
"urlOntology" to false,
4141
"fmos" to false
4242
)
43+
var reasonerMode: ReasonerMode = ReasonerMode.off
4344
for (s in sourcesList) {
4445
val trimmed = s.trim()
4546
if (sourcesMap.containsKey(trimmed)) {
4647
sourcesMap[trimmed] = true
48+
} else if (trimmed == "reasoner") {
49+
reasonerMode = interpreter.settings.reasoner
4750
} else {
48-
throw Exception("Unknown source '$trimmed' in pushStatic statement, only comma-separated [heap, staticTable, vocabularyFile, externalOntology, urlOntology, fmos] are allowed")
51+
throw Exception("Unknown source '$trimmed' in pushStatic statement, only comma-separated [heap, staticTable, vocabularyFile, externalOntology, urlOntology, fmos (or reasoner)] are allowed")
4952
}
5053
}
5154

55+
// load settings from source map
5256
val ts = TripleSettings(
5357
sources = sourcesMap,
5458
guards = hashMapOf("heap" to true, "staticTable" to true),
5559
virtualization = hashMapOf("heap" to true, "staticTable" to true, "fmos" to true),
56-
jenaReasoner = interpreter.settings.reasoner,
60+
jenaReasoner = reasonerMode,
5761
cachedModel = null
5862
)
5963

64+
// write output to a file
6065
val model = interpreter.tripleManager.getModel(ts)
61-
// todo implement
6266
val file = "output.ttl"
6367
File(interpreter.settings.outdir).mkdirs()
6468
File("${interpreter.settings.outdir}/${file}").createNewFile()
6569
model.write(FileWriter("${interpreter.settings.outdir}/${file}"),"TTL")
6670
val resultPath = "<file://${interpreter.settings.outdir}/${file}>"
6771

72+
// return the file path as a string literal
73+
println("Saving static data to $resultPath")
6874
val resultLit = LiteralExpr(resultPath, STRINGTYPE)
6975
return replaceStmt(AssignStmt(target, resultLit, declares = declares), stackFrame)
7076
}

src/main/kotlin/no/uio/microobject/data/StreamManager.kt

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -230,18 +230,6 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
230230
return queryWithPrefixes
231231
}
232232

233-
// todo implement
234-
// public fun putStaticNamedGraph(iri: String, model: Model) {
235-
// if (!engineInitialized) initEngine()
236-
237-
// // serialize the model (RDF/XML matches the engine's first attempt)
238-
// val sw = StringWriter()
239-
// model.write(sw, "RDF/XML")
240-
241-
// // hand it to the C-SPARQL engine
242-
// engine.putStaticNamedModel(iri, sw.toString())
243-
// }
244-
245233
public fun getStaticNamedIri(): String {
246234
val s = "${settings.runPrefix}loadStatic${nStaticGraphsPushed}"
247235
nStaticGraphsPushed += 1

src/main/kotlin/no/uio/microobject/type/TypeChecker.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,6 +1106,26 @@ class TypeChecker(private val ctx: WhileParser.ProgramContext, private val setti
11061106
if(ctx.target != null && ctx.target !is WhileParser.Var_expressionContext && inRule)
11071107
log("Non-local access in rule method.", ctx)
11081108
}
1109+
is WhileParser.PushStatic_statementContext -> {
1110+
val innerType = getType(ctx.sources, inner, vars, thisType, inRule)
1111+
if(innerType != ERRORTYPE && innerType != STRINGTYPE)
1112+
log("PushStatic expects a string as its parameter", ctx)
1113+
if (ctx.declType != null) {
1114+
val lhs = ctx.target
1115+
if (lhs !is WhileParser.Var_expressionContext) {
1116+
log("Variable declaration must declare a variable.", ctx)
1117+
} else {
1118+
val name = lhs.NAME().text
1119+
if (vars.keys.contains(name)) log("Variable $name declared twice.", ctx)
1120+
else {
1121+
val expType = translateType(ctx.type(), className, generics)
1122+
vars[name] = expType
1123+
}
1124+
}
1125+
if (translateType(ctx.declType, className, generics) != STRINGTYPE)
1126+
log("PushStatic expects a String variable to assign to", ctx)
1127+
}
1128+
}
11091129
else -> {
11101130
log("Statements with class ${ctx.javaClass} cannot be type checked",ctx)
11111131
}
Lines changed: 101 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,63 @@
1-
streamer class C(Int x, Monitor<Int> m)
1+
// reada src/test/resources/csparql/13_static_join.smol
22

3-
Unit register()
4-
String static = pushStatic("heap,staticTable");
3+
class Observation(Int x, Int ts) end
4+
5+
class Controller(List<C> streamers)
6+
7+
Unit register(String static)
8+
List<C> objs = this.streamers;
9+
while objs != null do
10+
C o = objs.content;
11+
o.register(static);
12+
objs = objs.next;
13+
end
14+
end
15+
16+
Unit doStream(Int ts)
17+
List<C> objs = this.streamers;
18+
while objs != null do
19+
C o = objs.content;
20+
o.doStream(ts);
21+
objs = objs.next;
22+
end
23+
end
24+
25+
String windowToString()
26+
String s = "";
27+
List<C> objs = this.streamers;
28+
while objs != null do
29+
C o = objs.content;
30+
String res = o.windowToString();
31+
s = s ++ o ++ " ( " ++ res ++ ") ";
32+
objs = objs.next;
33+
end
34+
return s;
35+
end
36+
end
37+
38+
streamer class C(Int x, Int mult, Observation obs, Monitor<Int> m)
39+
40+
Unit register(String static)
541
Monitor<Int> m = monitor("
6-
SELECT (SUM(?x) as ?sumX)
7-
FROM STREAM %1 [RANGE 3s TUMBLING]
8-
FROM <%2>
9-
WHERE { ?s prog:C_x ?x . ?s a prog:C . }
10-
", this, static);
42+
SELECT (?x * ?k AS ?res)
43+
FROM NAMED WINDOW <win> ON %1 [RANGE PT5.001s STEP PT2s]
44+
FROM %2
45+
WHERE {
46+
?streamer prog:C_mult ?k.
47+
WINDOW ?w {
48+
?streamer prog:C_obs ?obs .
49+
?obs prog:Observation_x ?x .
50+
}
51+
}
52+
",
53+
this,
54+
static
55+
);
1156
this.m = m;
1257
end
1358

14-
Unit doStream() emits(this.x)
59+
Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts)
60+
this.obs = new Observation(this.x, ts);
1561
this.x = this.x + 1;
1662
end
1763

@@ -28,29 +74,62 @@ streamer class C(Int x, Monitor<Int> m)
2874
end
2975

3076
main
31-
C o = new C(10, null);
32-
o.register();
33-
77+
// configure simulation
3478
clock Int i = 100;
3579
Int endAt = 200;
3680

81+
Observation obs = new Observation(1, i);
82+
C o1 = new C(0, 2, obs, null);
83+
C o2 = new C(10, 3, null, null);
84+
C o3 = new C(20, 10, null, null);
85+
86+
// push static data
87+
String static = pushStatic("heap,staticTable");
88+
89+
// initialize stream
90+
List<C> l3 = new List<C>(o3, null);
91+
List<C> l2 = new List<C>(o2, l3);
92+
List<C> streamers = new List<C>(o1, l2);
93+
Controller ctrl = new Controller(streamers);
94+
95+
// initialize monitor
96+
ctrl.register(static);
97+
98+
// run simulation
3799
while i < endAt do
38-
o.doStream();
39-
String res = o.windowToString();
100+
101+
// put triples in the stream
102+
ctrl.doStream(i);
103+
104+
// read window contents
105+
String res = ctrl.windowToString();
40106
print(">>" ++ intToString(i) ++ ": " ++ res);
107+
108+
// advance clock (timestamp)
41109
i = i + 1;
42110
end
43111
end
44112

45113
// Output:
46-
// >>100:
47-
// >>101:
48-
// >>102:
49-
// >>103: 0
50-
// >>104: 0
51-
// >>105: 0
52-
// >>106: 0
114+
// >>100: obj8 ( ) obj9 ( ) obj10 ( )
115+
// >>101: obj8 ( ) obj9 ( ) obj10 ( )
116+
// >>102: obj8 ( ) obj9 ( ) obj10 ( )
117+
// >>103: obj8 ( 0 2 ) obj9 ( 30 33 ) obj10 ( 200 210 )
118+
// >>104: obj8 ( 0 2 ) obj9 ( 30 33 ) obj10 ( 200 210 )
119+
// >>105: obj8 ( 0 2 4 6 ) obj9 ( 30 33 36 39 ) obj10 ( 200 210 220 230 )
120+
// >>106: obj8 ( 0 2 4 6 ) obj9 ( 30 33 36 39 ) obj10 ( 200 210 220 230 )
121+
// >>107: obj8 ( 2 4 6 8 10 ) obj9 ( 33 36 39 42 45 ) obj10 ( 210 220 230 240 250 )
122+
// >>108: obj8 ( 2 4 6 8 10 ) obj9 ( 33 36 39 42 45 ) obj10 ( 210 220 230 240 250 )
123+
// >>109: obj8 ( 6 8 10 12 14 ) obj9 ( 39 42 45 48 51 ) obj10 ( 230 240 250 260 270 )
124+
// >>110: obj8 ( 6 8 10 12 14 ) obj9 ( 39 42 45 48 51 ) obj10 ( 230 240 250 260 270 )
125+
// >>111: obj8 ( 10 12 14 16 18 ) obj9 ( 45 48 51 54 57 ) obj10 ( 250 260 270 280 290 )
53126
// ..
54127

55128
// Notes:
56-
// Error: Static join does not work!
129+
// - pushStatic("heap, staticTable") dumps static data to output.ttl
130+
// - This file contains triples for C_mult property of each streamer object (static data)
131+
132+
// - Results from streamers are multiplied by {2, 3, 10} respectively
133+
// - Stream data is inside the window: ?x
134+
// - Static data is outside the window: ?k
135+
// - Static data is joined with stream data: ?x * ?k

0 commit comments

Comments
 (0)