Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
d9ef34e
Release all test connections. Fix create-database without arguments.
whilo Mar 25, 2023
e41b843
Remove randomization from tests.
whilo Mar 25, 2023
54ac472
Release connections locally on deletions, but warn if they were not r…
whilo Mar 25, 2023
31927e7
Add connections file...
whilo Mar 25, 2023
b1baacb
Fix create-database and connection spec.
whilo Mar 25, 2023
319345c
First take on parallel operations for writer.
whilo Mar 20, 2023
52b0b9b
Fix format.
whilo Mar 20, 2023
3efce5b
Implement batching transactor.
whilo Mar 25, 2023
692e0c8
Expose buffer sizes and warn on back pressure.
whilo Mar 25, 2023
0e3ad93
Provide synchronous flushing of pending writes.
whilo Mar 26, 2023
49caee4
Fix outstanding tests.
whilo Mar 28, 2023
80d574b
Merge main.
whilo Jun 4, 2023
c363ae9
Merge branch 'main' into 617-improve-writer-latency
whilo Aug 8, 2023
8ea4d54
Merge branch 'main' into 617-improve-writer-latency
whilo Nov 13, 2023
ea4c165
Update Datahike API and writer
whilo Nov 13, 2023
b69466c
Update persistent-sorted-set version and branching
whilo Nov 13, 2023
5cebfaf
Fix formatting and whitespace in versioning and
whilo Nov 13, 2023
8edbb24
Add Leaf type hint for native compilation.
whilo Nov 13, 2023
fc89bd3
Fix indentation for persistent set
whilo Nov 13, 2023
0e58828
Add boolean argument to update-and-commit function
whilo Nov 13, 2023
2c00059
Deref transact! function call in server.clj
whilo Nov 13, 2023
bcc1201
Fix transact! function to handle updates correctly
whilo Nov 13, 2023
b96bc89
Add Leaf type hint for native compilation
whilo Nov 13, 2023
f3cb0aa
Minor cleanups.
whilo Nov 14, 2023
4547590
Fix commit-db variable assignment
whilo Nov 18, 2023
73cf977
Fix callback bug in commit function
whilo Nov 18, 2023
d616ce8
Refactor connection handling and fix writer test
whilo Nov 18, 2023
386f307
Add missing files
whilo Nov 18, 2023
1f7a428
Fix format
whilo Nov 18, 2023
64226de
Ensure atomic extraction of pending write operations
whilo Nov 18, 2023
b3b96e0
Refactor flush-pending-writes function
whilo Nov 18, 2023
c8583f4
Refactor config normalization in connector
whilo Nov 18, 2023
40ad4f3
Format.
whilo Nov 18, 2023
f8c14b7
Only update commit-id of connection in writer.
whilo Nov 18, 2023
6febe0a
Separate update logic from commiting.
whilo Nov 19, 2023
a642251
Fix commit! function arguments
whilo Nov 19, 2023
3d55ca5
Add parents to connection metadata
whilo Nov 19, 2023
648a2d2
Fix parents assignment in writer.cljc
whilo Nov 19, 2023
3f3467b
Fix commit metadata in writer.cljc
whilo Nov 19, 2023
e1807fe
Update commit-id in merge!
whilo Nov 20, 2023
d7c85ac
Add add-commit-meta function
whilo Nov 20, 2023
c8526cf
Fix commit-queue input channel in writer.cljc
whilo Nov 20, 2023
633d3fb
Try to simplify commit!
whilo Nov 20, 2023
eb5b132
Simplify commit!
whilo Nov 20, 2023
a9ed290
Add current branch head again.
whilo Nov 20, 2023
935d2c7
Send committed db as db-after to ensure data structure integrity.
whilo Nov 20, 2023
0eabf43
Improve error handling and use JSON for writer.
whilo Nov 20, 2023
6a364da
Add missing paren.
whilo Nov 20, 2023
efdd8da
Improve error handling.
whilo Nov 20, 2023
f7faf37
Fix format.
whilo Nov 20, 2023
7a1c9e0
Fix server connections.
whilo Nov 20, 2023
1e841b9
Update DEFAULT_COMMIT_WAIT_TIME to 0
whilo Nov 20, 2023
457b483
Add resources to native compilation, fixes libdatahike build.
whilo Nov 21, 2023
3c38e14
Use konserve git version konserve upstream filestore fix.
whilo Nov 21, 2023
302c822
Instrumented konserve release
whilo Nov 22, 2023
77886ed
Use new konserve version.
whilo Nov 22, 2023
75bcce2
Update git/sha for io.replikativ/konserve
whilo Nov 22, 2023
bfb2e97
Update git/sha for io.replikativ/konserve
whilo Nov 22, 2023
f347d69
Add simple stress test for file store
whilo Nov 23, 2023
c80cf14
Fix format
whilo Nov 23, 2023
14b2cd2
Update git/sha for io.replikativ/konserve
whilo Nov 23, 2023
2f0fb13
Don't run spec tests on stress-test
whilo Nov 23, 2023
f667a31
Fix JSON handlers in writer. Small fixes.
whilo Nov 24, 2023
05b91b7
Test with old konserve version
whilo Nov 24, 2023
a3167ff
Use konserve release and remove commented code
whilo Nov 25, 2023
4458d5e
Update datahike logo
whilo Nov 25, 2023
559f7b8
Add missing flush statement on db creation.
whilo Dec 1, 2023
c2e4d5e
Use Exception for node not found. last -> peek.
whilo Dec 3, 2023
1faaefb
Fix condition for node exception.
whilo Dec 3, 2023
d21e65a
Use dt/raise instead of throw.
whilo Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.edn
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
:native {;; jvm build
:src-dirs ["src" "libdatahike/src"]
:java-src-dirs ["java"]
:resource-dir "resources"
:target-dir "target"
:class-dir "target/classes"
:deps-file "deps.edn"
Expand Down
4 changes: 2 additions & 2 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
:exclusions [org.clojure/clojurescript]}
io.replikativ/hitchhiker-tree {:mvn/version "0.2.222"
:exclusions [org.clojure/clojurescript]}
io.replikativ/konserve {:mvn/version "0.7.311"
io.replikativ/konserve {:mvn/version "0.7.314"
:exclusions [org.clojure/clojurescript]}
io.replikativ/superv.async {:mvn/version "0.3.46"
:exclusions [org.clojure/clojurescript]}
io.replikativ/datalog-parser {:mvn/version "0.2.28"}
io.replikativ/zufall {:mvn/version "0.2.9"}
persistent-sorted-set/persistent-sorted-set {:mvn/version "0.2.3"}
persistent-sorted-set/persistent-sorted-set {:mvn/version "0.3.0"}
environ/environ {:mvn/version "1.2.0"}
nrepl/bencode {:mvn/version "1.1.0"}
com.taoensso/timbre {:mvn/version "6.3.1"}
Expand Down
112 changes: 69 additions & 43 deletions http-server/datahike/http/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
(:require
[clojure.string :as str]
[clojure.edn :as edn]
[datahike.connections :refer [*connections*]]
[datahike.api.specification :refer [api-specification ->url]]
[datahike.http.middleware :as middleware]
[datahike.readers :refer [edn-readers]]
[datahike.transit :as transit]
[datahike.json :as json]
[datahike.api :refer :all :as api]
[datahike.writing :as datahike.writing]
[datahike.writing]
[datahike.writer]
[reitit.ring :as ring]
[reitit.coercion.spec]
[reitit.swagger :as swagger]
Expand All @@ -34,33 +36,38 @@

(defn generic-handler [config f]
(fn [request]
(let [{{body :body} :parameters
:keys [headers params method]} request
_ (log/trace "req-body" f body (= f #'api/create-database))
(try
(let [{{body :body} :parameters
:keys [headers params method]} request
_ (log/trace "request body" f body)
;; TODO move this to client
ret-body
(cond (= f #'api/create-database)
ret-body
(cond (= f #'api/create-database)
;; remove remote-peer and re-add
(assoc
(apply f (dissoc (first body) :remote-peer) (rest body))
:remote-peer (:remote-peer (first body)))
(assoc
(apply f (dissoc (first body) :remote-peer) (rest body))
:remote-peer (:remote-peer (first body)))

(= f #'api/delete-database)
(apply f (dissoc (first body) :remote-peer) (rest body))
(= f #'api/delete-database)
(apply f (dissoc (first body) :remote-peer) (rest body))

:else
(apply f body))]
(log/trace "ret-body" ret-body)
(merge
{:status 200
:body
(when-not (headers "no-return-value")
ret-body)}
(when (and (= method :get)
(get params "args-id")
(get-in config [:cache :get :max-age]))
{:headers {"Cache-Control" (str (when-not (:token config) "public, ")
"max-age=" (get-in config [:cache :get :max-age]))}})))))
:else
(apply f body))]
(log/trace "return body" ret-body)
(merge
{:status 200
:body
(when-not (headers "no-return-value")
ret-body)}
(when (and (= method :get)
(get params "args-id")
(get-in config [:cache :get :max-age]))
{:headers {"Cache-Control" (str (when-not (:token config) "public, ")
"max-age=" (get-in config [:cache :get :max-age]))}})))
(catch Exception e
{:status 500
:body {:msg (ex-message e)
:ex-data (ex-data e)}}))))

(declare create-routes)

Expand Down Expand Up @@ -100,11 +107,7 @@
{:handlers transit/write-handlers}))))

(defn default-route-opts [muuntaja-with-opts]
{;; :reitit.middleware/transform dev/print-request-diffs ;; pretty diffs
;; :validate spec/validate ;; enable spec validation for route data
;; :reitit.spec/wrap spell/closed ;; strict top-level validation
;; :exception pretty/exception
:data {:coercion reitit.coercion.spec/coercion
{:data {:coercion reitit.coercion.spec/coercion
:muuntaja muuntaja-with-opts
:middleware [swagger/swagger-feature
parameters/parameters-middleware
Expand All @@ -119,15 +122,25 @@
multipart/multipart-middleware
middleware/patch-swagger-json]}})

(def internal-writer-routes
(defn internal-writer-routes [server-connections]
[["/delete-database-writer"
{:post {:parameters {:body (st/spec {:spec any?
:name "delete-database-writer"})},
:summary "Internal endpoint. DO NOT USE!"
:no-doc true
:handler (fn [{{:keys [body]} :parameters}]
{:status 200
:body (apply datahike.writing/delete-database body)})
(binding [*connections* server-connections]
(let [cfg (dissoc (first body) :remote-peer :writer)]
(try
(try
(api/release (api/connect cfg) true)
(catch Exception _))
{:status 200
:body (apply datahike.writing/delete-database cfg (rest body))}
(catch Exception e
{:status 500
:body {:msg (ex-message e)
:ex-data (ex-data e)}})))))
:operationId "delete-database"},
:swagger {:tags ["Internal"]}}]
["/create-database-writer"
Expand All @@ -136,25 +149,38 @@
:summary "Internal endpoint. DO NOT USE!"
:no-doc true
:handler (fn [{{:keys [body]} :parameters}]
{:status 200
:body (apply datahike.writing/create-database
(dissoc (first body) :remote-peer)
(rest body))})
(let [cfg (dissoc (first body) :remote-peer :writer)]
(try
{:status 200
:body (apply datahike.writing/create-database
cfg
(rest body))}
(catch Exception e
{:status 500
:body {:msg (ex-message e)
:ex-data (ex-data e)}}))))
:operationId "create-database"},
:swagger {:tags ["Internal"]}}]
["/transact-writer"
["/transact!-writer"
{:post {:parameters {:body (st/spec {:spec any?
:name "transact-writer"})},
:summary "Internal endpoint. DO NOT USE!"
:no-doc true
:handler (fn [{{:keys [body]} :parameters}]
(let [res (apply datahike.writing/transact! body)]
{:status 200
:body res}))
(binding [*connections* server-connections]
(try
(let [conn (api/connect (dissoc (first body) :remote-peer :writer)) ;; TODO maybe release?
res @(apply datahike.writer/transact! conn (rest body))]
{:status 200
:body res})
(catch Exception e
{:status 500
:body {:msg (ex-message e)
:ex-data (ex-data e)}}))))
:operationId "transact"},
:swagger {:tags ["Internal"]}}]])

(defn app [config route-opts]
(defn app [config route-opts server-connections]
(-> (ring/ring-handler
(ring/router
(concat
Expand All @@ -169,7 +195,7 @@
[(partial middleware/token-auth config)
(partial middleware/auth config)])))
(concat (create-routes config)
internal-writer-routes))) route-opts)
(internal-writer-routes server-connections)))) route-opts)
(ring/routes
(swagger-ui/create-swagger-ui-handler
{:path "/"
Expand All @@ -181,7 +207,7 @@
:access-control-allow-methods [:get :put :post :delete])))

(defn start-server [config]
(run-jetty (app config (default-route-opts muuntaja-with-opts)) config))
(run-jetty (app config (default-route-opts muuntaja-with-opts) (atom {})) config))

(defn stop-server [^org.eclipse.jetty.server.Server server]
(.stop server))
Expand Down
65 changes: 25 additions & 40 deletions resources/datahike-logo.txt
Original file line number Diff line number Diff line change
@@ -1,40 +1,25 @@
:;
.==.
::-= ..
:: -= ==:
::. -;. .=:=.
.= ==: =.
.= :=: =. ..
.;. .= - =: ==:
:=: .= :; .=::=:
.=-:; =` :: ;:- -=.
.= -=. =- :;._=- -:;
.;` -=. =- ==: -=.
=- ===: .=` -:+:
=- == :=:
=- --
::
::
.::.
.==
=:




. . .: .
.== =: ==. =:
.== . =: - =:
.== .=: =: =:
.== .=: =: =:
.::...== _::.. _;=::. .;:_. =: .::. .. =: ._ .._..
.=====:== .======: .======. .======. =::=;==: .=. =: :=- :=====.
.==:---=== ==- -== -:=:-- :=: -== ==: :=. .=. =: :=- .=- :=.
:=: ==; :-. ==. .=; :: ==. =; =: .=. =: :=- =: =:
;=. .=; ... ==. .+: ....:=. =: =: .=. =:.=- =: . .;;
==. .== .=======. :=: .=======. =: =: .=. =:==. .=========
==. .== .==----==. .=: .==----:=. =: =: .=. =: =: =:- - - -
:=: :=; :=: ;=. .+: .=: :=. =: =: .=. =: -=: =:
.==. .==; :=: .==. .+: .=: .==. =: =: .=. =: -=. := =:
:==;;===; .==:::===. .===:. +==.:===. =: =: .=. =: :=. =:_..==
====- == -;===-:=. -===. -====-.=. =: =: .=. =: == -:===:
::
::. .
; .:. .=.
: ::: -:
: = -. ..
.= :` -: .;;.
;-:. .- -: :- -:.
; :. .- .=: -:
: -=: - -=.
: - -
:`
:-
=:

.. .. .
=: :. ; =
=. .=. :. =
.. =. ... :=.. .._ :. .. . = . ..
=====. :=:== :===; .==:= :::-=: = = .; .=-:=
:= .=..:` =: .= =- =: :: = = = :; :` .;
=: =. ._.=: .= ._.=: :. = = =:: =....=
=. ;. ==--=: .= =:--=: :. = = =.=. =-----
:: =;.=. =: .= .= =: :. = = = -: = .
.=:.==..=:.;=: =:. =:.;=: :. = = = :; -:..::
-:=-- -:=--` -=: -:=--- -` - - - - -::-
2 changes: 1 addition & 1 deletion src/datahike/api/specification.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Exists for Datomic API compatibility. Prefer using `@conn` directly if possible.
:doc "Same as transact, but asynchronously returns a future."
:supports-remote? false
:referentially-transparent? false
:impl datahike.writing/transact!}
:impl datahike.writer/transact!}

transact
{:args (s/cat :conn spec/SConnection :txs spec/STransactions)
Expand Down
10 changes: 5 additions & 5 deletions src/datahike/connections.cljc
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
(ns ^:no-doc datahike.connections)

(def connections (atom {}))
(def ^:dynamic *connections* (atom {}))

(defn get-connection [conn-id]
(when-let [conn (get-in @connections [conn-id :conn])]
(swap! connections update-in [conn-id :count] inc)
(when-let [conn (get-in @*connections* [conn-id :conn])]
(swap! *connections* update-in [conn-id :count] inc)
conn))

(defn add-connection! [conn-id conn]
(swap! connections assoc conn-id {:conn conn :count 1}))
(swap! *connections* assoc conn-id {:conn conn :count 1}))

(defn delete-connection! [conn-id]
(when-let [conn (get-connection conn-id)]
(reset! conn :released)
(swap! connections dissoc conn-id)))
(swap! *connections* dissoc conn-id)))
17 changes: 11 additions & 6 deletions src/datahike/connector.cljc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns ^:no-doc datahike.connector
(:require [datahike.connections :refer [get-connection add-connection! delete-connection!
connections]]
*connections*]]
[datahike.readers]
[datahike.store :as ds]
[datahike.writing :as dsi]
Expand Down Expand Up @@ -38,7 +38,7 @@
IMeta
(meta [_] (meta wrapped-atom))

IRef
IRef ;; TODO This is unoffically supported, it triggers watches on each update, not on commits. For proper listeners use the API.
(addWatch [_ key f] (add-watch wrapped-atom key f))
(removeWatch [_ key] (remove-watch wrapped-atom key)))

Expand Down Expand Up @@ -133,6 +133,11 @@
:stored-config stored-config
:diff (diff config stored-config)}))))

(defn- normalize-config [cfg]
(-> cfg
(update :store ds/store-identity)
(dissoc :writer)))

(extend-protocol PConnector
String
(-connect [uri]
Expand All @@ -148,8 +153,8 @@
(if-let [conn (get-connection conn-id)]
(let [conn-config (:config @(:wrapped-atom conn))
;; replace store config with its identity
cfg (update config :store ds/store-identity)
conn-cfg (update conn-config :store ds/store-identity)]
cfg (normalize-config config)
conn-cfg (normalize-config conn-config)]
(when-not (= cfg conn-cfg)
(dt/raise "Configuration does not match existing connections."
{:type :config-does-not-match-existing-connections
Expand Down Expand Up @@ -201,9 +206,9 @@
(let [db @(:wrapped-atom connection)
conn-id [(ds/store-identity (get-in db [:config :store]))
(get-in db [:config :branch])]]
(if-not (get @connections conn-id)
(if-not (get @*connections* conn-id)
(log/info "Connection already released." conn-id)
(let [new-conns (swap! connections update-in [conn-id :count] dec)]
(let [new-conns (swap! *connections* update-in [conn-id :count] dec)]
(when (or release-all? (zero? (get-in new-conns [conn-id :count])))
(delete-connection! conn-id)
(w/shutdown (:writer db))
Expand Down
6 changes: 3 additions & 3 deletions src/datahike/db/transaction.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[datahike.db.search :as dbs]
[datahike.db.utils :as dbu]
[datahike.constants :refer [tx0]]
[datahike.tools :refer [get-time raise]]
[datahike.tools :refer [get-date raise]]
[datahike.schema :as ds]
[me.tonsky.persistent-sorted-set.arrays :as arrays])
#?(:cljs (:require-macros [datahike.datom :refer [datom]]
Expand Down Expand Up @@ -488,7 +488,7 @@
(defn flush-tx-meta
"Generates add-operations for transaction meta data."
[{:keys [tx-meta db-before] :as report}]
(let [;; tx-meta (merge {:db/txInstant (get-time)} tx-meta)
(let [;; tx-meta (merge {:db/txInstant (get-date)} tx-meta)
tid (current-tx report)
{:keys [attribute-refs?]} (dbi/-config db-before)]
(reduce-kv
Expand Down Expand Up @@ -749,7 +749,7 @@
(interleave initial-es (repeat ::flush-tuples))
initial-es)
initial-report (update initial-report :tx-meta
#(merge {:db/txInstant (get-time)} %))
#(merge {:db/txInstant (get-date)} %))
meta-entities (flush-tx-meta initial-report)]
(loop [report (update initial-report :db-after transient)
es (if (dbi/-keep-history? db-before)
Expand Down
Loading