diff --git a/config.edn b/config.edn index c24742a96..dfa2bceb6 100644 --- a/config.edn +++ b/config.edn @@ -31,6 +31,7 @@ :native {;; jvm build :src-dirs ["src" "libdatahike/src"] :java-src-dirs ["java"] + :resource-dir "resources" :target-dir "target" :class-dir "target/classes" :deps-file "deps.edn" diff --git a/deps.edn b/deps.edn index 4ae527b1d..8b21c24d3 100644 --- a/deps.edn +++ b/deps.edn @@ -3,13 +3,13 @@ :exclusions [org.clojure/clojurescript]} io.replikativ/hitchhiker-tree {:mvn/version "0.2.222" :exclusions [org.clojure/clojurescript]} - io.replikativ/konserve {:mvn/version "0.7.311" + io.replikativ/konserve {:mvn/version "0.7.314" :exclusions [org.clojure/clojurescript]} io.replikativ/superv.async {:mvn/version "0.3.46" :exclusions [org.clojure/clojurescript]} io.replikativ/datalog-parser {:mvn/version "0.2.28"} io.replikativ/zufall {:mvn/version "0.2.9"} - persistent-sorted-set/persistent-sorted-set {:mvn/version "0.2.3"} + persistent-sorted-set/persistent-sorted-set {:mvn/version "0.3.0"} environ/environ {:mvn/version "1.2.0"} nrepl/bencode {:mvn/version "1.1.0"} com.taoensso/timbre {:mvn/version "6.3.1"} diff --git a/http-server/datahike/http/server.clj b/http-server/datahike/http/server.clj index 2fc6ff6f8..ca9dc1f2d 100644 --- a/http-server/datahike/http/server.clj +++ b/http-server/datahike/http/server.clj @@ -5,13 +5,15 @@ (:require [clojure.string :as str] [clojure.edn :as edn] + [datahike.connections :refer [*connections*]] [datahike.api.specification :refer [api-specification ->url]] [datahike.http.middleware :as middleware] [datahike.readers :refer [edn-readers]] [datahike.transit :as transit] [datahike.json :as json] [datahike.api :refer :all :as api] - [datahike.writing :as datahike.writing] + [datahike.writing] + [datahike.writer] [reitit.ring :as ring] [reitit.coercion.spec] [reitit.swagger :as swagger] @@ -34,33 +36,38 @@ (defn generic-handler [config f] (fn [request] - (let [{{body :body} :parameters - :keys [headers params method]} request - _ (log/trace "req-body" f body (= f #'api/create-database)) + (try + (let [{{body :body} :parameters + :keys [headers params method]} request + _ (log/trace "request body" f body) ;; TODO move this to client - ret-body - (cond (= f #'api/create-database) + ret-body + (cond (= f #'api/create-database) ;; remove remote-peer and re-add - (assoc - (apply f (dissoc (first body) :remote-peer) (rest body)) - :remote-peer (:remote-peer (first body))) + (assoc + (apply f (dissoc (first body) :remote-peer) (rest body)) + :remote-peer (:remote-peer (first body))) - (= f #'api/delete-database) - (apply f (dissoc (first body) :remote-peer) (rest body)) + (= f #'api/delete-database) + (apply f (dissoc (first body) :remote-peer) (rest body)) - :else - (apply f body))] - (log/trace "ret-body" ret-body) - (merge - {:status 200 - :body - (when-not (headers "no-return-value") - ret-body)} - (when (and (= method :get) - (get params "args-id") - (get-in config [:cache :get :max-age])) - {:headers {"Cache-Control" (str (when-not (:token config) "public, ") - "max-age=" (get-in config [:cache :get :max-age]))}}))))) + :else + (apply f body))] + (log/trace "return body" ret-body) + (merge + {:status 200 + :body + (when-not (headers "no-return-value") + ret-body)} + (when (and (= method :get) + (get params "args-id") + (get-in config [:cache :get :max-age])) + {:headers {"Cache-Control" (str (when-not (:token config) "public, ") + "max-age=" (get-in config [:cache :get :max-age]))}}))) + (catch Exception e + {:status 500 + :body {:msg (ex-message e) + :ex-data (ex-data e)}})))) (declare create-routes) @@ -100,11 +107,7 @@ {:handlers transit/write-handlers})))) (defn default-route-opts [muuntaja-with-opts] - {;; :reitit.middleware/transform dev/print-request-diffs ;; pretty diffs - ;; :validate spec/validate ;; enable spec validation for route data - ;; :reitit.spec/wrap spell/closed ;; strict top-level validation - ;; :exception pretty/exception - :data {:coercion reitit.coercion.spec/coercion + {:data {:coercion reitit.coercion.spec/coercion :muuntaja muuntaja-with-opts :middleware [swagger/swagger-feature parameters/parameters-middleware @@ -119,15 +122,25 @@ multipart/multipart-middleware middleware/patch-swagger-json]}}) -(def internal-writer-routes +(defn internal-writer-routes [server-connections] [["/delete-database-writer" {:post {:parameters {:body (st/spec {:spec any? :name "delete-database-writer"})}, :summary "Internal endpoint. DO NOT USE!" :no-doc true :handler (fn [{{:keys [body]} :parameters}] - {:status 200 - :body (apply datahike.writing/delete-database body)}) + (binding [*connections* server-connections] + (let [cfg (dissoc (first body) :remote-peer :writer)] + (try + (try + (api/release (api/connect cfg) true) + (catch Exception _)) + {:status 200 + :body (apply datahike.writing/delete-database cfg (rest body))} + (catch Exception e + {:status 500 + :body {:msg (ex-message e) + :ex-data (ex-data e)}}))))) :operationId "delete-database"}, :swagger {:tags ["Internal"]}}] ["/create-database-writer" @@ -136,25 +149,38 @@ :summary "Internal endpoint. DO NOT USE!" :no-doc true :handler (fn [{{:keys [body]} :parameters}] - {:status 200 - :body (apply datahike.writing/create-database - (dissoc (first body) :remote-peer) - (rest body))}) + (let [cfg (dissoc (first body) :remote-peer :writer)] + (try + {:status 200 + :body (apply datahike.writing/create-database + cfg + (rest body))} + (catch Exception e + {:status 500 + :body {:msg (ex-message e) + :ex-data (ex-data e)}})))) :operationId "create-database"}, :swagger {:tags ["Internal"]}}] - ["/transact-writer" + ["/transact!-writer" {:post {:parameters {:body (st/spec {:spec any? :name "transact-writer"})}, :summary "Internal endpoint. DO NOT USE!" :no-doc true :handler (fn [{{:keys [body]} :parameters}] - (let [res (apply datahike.writing/transact! body)] - {:status 200 - :body res})) + (binding [*connections* server-connections] + (try + (let [conn (api/connect (dissoc (first body) :remote-peer :writer)) ;; TODO maybe release? + res @(apply datahike.writer/transact! conn (rest body))] + {:status 200 + :body res}) + (catch Exception e + {:status 500 + :body {:msg (ex-message e) + :ex-data (ex-data e)}})))) :operationId "transact"}, :swagger {:tags ["Internal"]}}]]) -(defn app [config route-opts] +(defn app [config route-opts server-connections] (-> (ring/ring-handler (ring/router (concat @@ -169,7 +195,7 @@ [(partial middleware/token-auth config) (partial middleware/auth config)]))) (concat (create-routes config) - internal-writer-routes))) route-opts) + (internal-writer-routes server-connections)))) route-opts) (ring/routes (swagger-ui/create-swagger-ui-handler {:path "/" @@ -181,7 +207,7 @@ :access-control-allow-methods [:get :put :post :delete]))) (defn start-server [config] - (run-jetty (app config (default-route-opts muuntaja-with-opts)) config)) + (run-jetty (app config (default-route-opts muuntaja-with-opts) (atom {})) config)) (defn stop-server [^org.eclipse.jetty.server.Server server] (.stop server)) diff --git a/resources/datahike-logo.txt b/resources/datahike-logo.txt index 95b8ac6a9..538884280 100644 --- a/resources/datahike-logo.txt +++ b/resources/datahike-logo.txt @@ -1,40 +1,25 @@ - :; - .==. - ::-= .. - :: -= ==: - ::. -;. .=:=. - .= ==: =. - .= :=: =. .. - .;. .= - =: ==: - :=: .= :; .=::=: - .=-:; =` :: ;:- -=. - .= -=. =- :;._=- -:; - .;` -=. =- ==: -=. - =- ===: .=` -:+: - =- == :=: - =- -- - :: - :: - .::. - .== - =: - - - - - . . .: . - .== =: ==. =: - .== . =: - =: - .== .=: =: =: - .== .=: =: =: - .::...== _::.. _;=::. .;:_. =: .::. .. =: ._ .._.. - .=====:== .======: .======. .======. =::=;==: .=. =: :=- :=====. -.==:---=== ==- -== -:=:-- :=: -== ==: :=. .=. =: :=- .=- :=. -:=: ==; :-. ==. .=; :: ==. =; =: .=. =: :=- =: =: -;=. .=; ... ==. .+: ....:=. =: =: .=. =:.=- =: . .;; -==. .== .=======. :=: .=======. =: =: .=. =:==. .========= -==. .== .==----==. .=: .==----:=. =: =: .=. =: =: =:- - - - -:=: :=; :=: ;=. .+: .=: :=. =: =: .=. =: -=: =: -.==. .==; :=: .==. .+: .=: .==. =: =: .=. =: -=. := =: - :==;;===; .==:::===. .===:. +==.:===. =: =: .=. =: :=. =:_..== - ====- == -;===-:=. -===. -====-.=. =: =: .=. =: == -:===: + :: + ::. . + ; .:. .=. + : ::: -: + : = -. .. + .= :` -: .;;. + ;-:. .- -: :- -:. + ; :. .- .=: -: + : -=: - -=. + : - - + :` + :- + =: + + .. .. . + =: :. ; = + =. .=. :. = + .. =. ... :=.. .._ :. .. . = . .. + =====. :=:== :===; .==:= :::-=: = = .; .=-:= +:= .=..:` =: .= =- =: :: = = = :; :` .; +=: =. ._.=: .= ._.=: :. = = =:: =....= +=. ;. ==--=: .= =:--=: :. = = =.=. =----- +:: =;.=. =: .= .= =: :. = = = -: = . +.=:.==..=:.;=: =:. =:.;=: :. = = = :; -:..:: + -:=-- -:=--` -=: -:=--- -` - - - - -::- diff --git a/src/datahike/api/specification.cljc b/src/datahike/api/specification.cljc index 7716acdea..c2767b897 100644 --- a/src/datahike/api/specification.cljc +++ b/src/datahike/api/specification.cljc @@ -136,7 +136,7 @@ Exists for Datomic API compatibility. Prefer using `@conn` directly if possible. :doc "Same as transact, but asynchronously returns a future." :supports-remote? false :referentially-transparent? false - :impl datahike.writing/transact!} + :impl datahike.writer/transact!} transact {:args (s/cat :conn spec/SConnection :txs spec/STransactions) diff --git a/src/datahike/connections.cljc b/src/datahike/connections.cljc index 5026d3a3a..a29939bd5 100644 --- a/src/datahike/connections.cljc +++ b/src/datahike/connections.cljc @@ -1,16 +1,16 @@ (ns ^:no-doc datahike.connections) -(def connections (atom {})) +(def ^:dynamic *connections* (atom {})) (defn get-connection [conn-id] - (when-let [conn (get-in @connections [conn-id :conn])] - (swap! connections update-in [conn-id :count] inc) + (when-let [conn (get-in @*connections* [conn-id :conn])] + (swap! *connections* update-in [conn-id :count] inc) conn)) (defn add-connection! [conn-id conn] - (swap! connections assoc conn-id {:conn conn :count 1})) + (swap! *connections* assoc conn-id {:conn conn :count 1})) (defn delete-connection! [conn-id] (when-let [conn (get-connection conn-id)] (reset! conn :released) - (swap! connections dissoc conn-id))) + (swap! *connections* dissoc conn-id))) diff --git a/src/datahike/connector.cljc b/src/datahike/connector.cljc index f6d276b60..7b9d8c5dc 100644 --- a/src/datahike/connector.cljc +++ b/src/datahike/connector.cljc @@ -1,6 +1,6 @@ (ns ^:no-doc datahike.connector (:require [datahike.connections :refer [get-connection add-connection! delete-connection! - connections]] + *connections*]] [datahike.readers] [datahike.store :as ds] [datahike.writing :as dsi] @@ -38,7 +38,7 @@ IMeta (meta [_] (meta wrapped-atom)) - IRef + IRef ;; TODO This is unoffically supported, it triggers watches on each update, not on commits. For proper listeners use the API. (addWatch [_ key f] (add-watch wrapped-atom key f)) (removeWatch [_ key] (remove-watch wrapped-atom key))) @@ -133,6 +133,11 @@ :stored-config stored-config :diff (diff config stored-config)})))) +(defn- normalize-config [cfg] + (-> cfg + (update :store ds/store-identity) + (dissoc :writer))) + (extend-protocol PConnector String (-connect [uri] @@ -148,8 +153,8 @@ (if-let [conn (get-connection conn-id)] (let [conn-config (:config @(:wrapped-atom conn)) ;; replace store config with its identity - cfg (update config :store ds/store-identity) - conn-cfg (update conn-config :store ds/store-identity)] + cfg (normalize-config config) + conn-cfg (normalize-config conn-config)] (when-not (= cfg conn-cfg) (dt/raise "Configuration does not match existing connections." {:type :config-does-not-match-existing-connections @@ -201,9 +206,9 @@ (let [db @(:wrapped-atom connection) conn-id [(ds/store-identity (get-in db [:config :store])) (get-in db [:config :branch])]] - (if-not (get @connections conn-id) + (if-not (get @*connections* conn-id) (log/info "Connection already released." conn-id) - (let [new-conns (swap! connections update-in [conn-id :count] dec)] + (let [new-conns (swap! *connections* update-in [conn-id :count] dec)] (when (or release-all? (zero? (get-in new-conns [conn-id :count]))) (delete-connection! conn-id) (w/shutdown (:writer db)) diff --git a/src/datahike/db/transaction.cljc b/src/datahike/db/transaction.cljc index b88544351..eac7ad990 100644 --- a/src/datahike/db/transaction.cljc +++ b/src/datahike/db/transaction.cljc @@ -9,7 +9,7 @@ [datahike.db.search :as dbs] [datahike.db.utils :as dbu] [datahike.constants :refer [tx0]] - [datahike.tools :refer [get-time raise]] + [datahike.tools :refer [get-date raise]] [datahike.schema :as ds] [me.tonsky.persistent-sorted-set.arrays :as arrays]) #?(:cljs (:require-macros [datahike.datom :refer [datom]] @@ -488,7 +488,7 @@ (defn flush-tx-meta "Generates add-operations for transaction meta data." [{:keys [tx-meta db-before] :as report}] - (let [;; tx-meta (merge {:db/txInstant (get-time)} tx-meta) + (let [;; tx-meta (merge {:db/txInstant (get-date)} tx-meta) tid (current-tx report) {:keys [attribute-refs?]} (dbi/-config db-before)] (reduce-kv @@ -749,7 +749,7 @@ (interleave initial-es (repeat ::flush-tuples)) initial-es) initial-report (update initial-report :tx-meta - #(merge {:db/txInstant (get-time)} %)) + #(merge {:db/txInstant (get-date)} %)) meta-entities (flush-tx-meta initial-report)] (loop [report (update initial-report :db-after transient) es (if (dbi/-keep-history? db-before) diff --git a/src/datahike/experimental/versioning.cljc b/src/datahike/experimental/versioning.cljc index d29e2e11d..e6e83ea05 100644 --- a/src/datahike/experimental/versioning.cljc +++ b/src/datahike/experimental/versioning.cljc @@ -5,7 +5,8 @@ [datahike.core :refer [with]] [datahike.store :refer [store-identity]] [datahike.writing :refer [stored->db db->stored stored-db? - update-and-commit! create-commit-id]] + update-connection! commit! add-commit-meta! + create-commit-id flush-pending-writes]] [superv.async :refer [ - (request-transit :post - "create-database-writer" - writer - (vec (concat [(assoc config :remote-peer writer)] (rest args)))) - (dissoc :remote-peer))) + (deliver p (try (-> + (request-json :post + "create-database-writer" + writer + (vec (concat [(-> config + (assoc :remote-peer writer) + (dissoc :writer))] + (rest args)))) + (dissoc :remote-peer)) + (catch Exception e + e))) p)) (defmethod delete-database :datahike-server @@ -49,9 +56,15 @@ (let [p (throwable-promise) {:keys [writer] :as config} (first args)] ;; redirect call to remote-peer as writer config - (deliver p (-> (request-transit :post - "delete-database-writer" - writer - (vec (concat [(assoc config :remote-peer writer)] (rest args)))) - (dissoc :remote-peer))) + (deliver p (try + (-> (request-json :post + "delete-database-writer" + writer + (vec (concat [(-> config + (assoc :remote-peer writer) + (dissoc :writer))] + (rest args)))) + (dissoc :remote-peer)) + (catch Exception e + e))) p)) diff --git a/src/datahike/index/persistent_set.cljc b/src/datahike/index/persistent_set.cljc index 4d955e51b..3ee125a81 100644 --- a/src/datahike/index/persistent_set.cljc +++ b/src/datahike/index/persistent_set.cljc @@ -6,13 +6,14 @@ [datahike.datom :as dd] [datahike.constants :refer [tx0 txmax]] [datahike.index.interface :as di :refer [IIndex]] + [datahike.tools :as dt] [konserve.core :as k] [konserve.serializers :refer [fressian-serializer]] [hasch.core :refer [uuid]] [taoensso.timbre :refer [trace]]) #?(:clj (:import [datahike.datom Datom] [org.fressian.handlers WriteHandler ReadHandler] - [me.tonsky.persistent_sorted_set PersistentSortedSet IStorage Leaf Branch ANode] + [me.tonsky.persistent_sorted_set PersistentSortedSet IStorage Leaf Branch ANode Settings] [java.util UUID List]))) (defn index-type->cmp @@ -147,13 +148,13 @@ (uuid (mapv (comp vec seq) (.keys node)))) (uuid))) -(defrecord CachedStorage [store config cache stats] +(defrecord CachedStorage [store config cache stats pending-writes] IStorage (store [_ node] (swap! stats update :writes inc) (let [address (gen-address node (:crypto-hash? config)) _ (trace "writing storage: " address " crypto: " (:crypto-hash? config))] - (k/assoc store address node {:sync? true}) + (swap! pending-writes conj (k/assoc store address node {:sync? false})) (wrapped/miss cache address node) address)) (accessed [_ address] @@ -166,6 +167,10 @@ (if-let [cached (wrapped/lookup cache address)] cached (let [node (k/get store address nil {:sync? true})] + (when (nil? node) + (dt/raise "Node not found in storage." {:type :node-not-found + :address address + :store store})) (swap! stats update :reads inc) (wrapped/miss cache address node) node)))) @@ -177,19 +182,19 @@ (defn create-storage [store config] (CachedStorage. store config (atom (cache/lru-cache-factory {} :threshold (:store-cache-size config))) - (atom init-stats))) + (atom init-stats) + (atom []))) -(def ^:const BRANCHING_FACTOR 512) +(def ^:const DEFAULT_BRANCHING_FACTOR 512) (defmethod di/empty-index :datahike.index/persistent-set [_index-name store index-type _] - (psset/set-branching-factor! BRANCHING_FACTOR) - (let [^PersistentSortedSet pset (psset/sorted-set-by (index-type->cmp-quick index-type false))] - (set! (.-_storage pset) (:storage store)) + (let [^PersistentSortedSet pset (psset/sorted-set* {:cmp (index-type->cmp-quick index-type false) + :storage (:storage store) + :branching-factor DEFAULT_BRANCHING_FACTOR})] (with-meta pset {:index-type index-type}))) (defmethod di/init-index :datahike.index/persistent-set [_index-name store datoms index-type _ {:keys [indexed]}] - (psset/set-branching-factor! BRANCHING_FACTOR) (let [arr (if (= index-type :avet) (->> datoms (filter #(contains? indexed (.-a ^Datom %))) @@ -198,14 +203,24 @@ (not (arrays/array? datoms)) (arrays/into-array))) _ (arrays/asort arr (index-type->cmp-quick index-type false)) - ^PersistentSortedSet pset (psset/from-sorted-array (index-type->cmp-quick index-type false) arr)] + ^PersistentSortedSet pset (psset/from-sorted-array (index-type->cmp-quick index-type false) + arr + (alength arr) + {:branching-factor DEFAULT_BRANCHING_FACTOR})] (set! (.-_storage pset) (:storage store)) (with-meta pset {:index-type index-type}))) +;; temporary import from psset until public +(defn- map->settings ^Settings [m] + (Settings. + (int (or (:branching-factor m) 0)) + nil ;; weak ref default + )) (defmethod di/add-konserve-handlers :datahike.index/persistent-set [config store] ;; deal with circular reference between storage and store - (let [storage (atom nil) + (let [settings (map->settings {:branching-factor DEFAULT_BRANCHING_FACTOR}) + storage (atom nil) store (assoc store :serializers {:FressianSerializer (fressian-serializer @@ -217,17 +232,17 @@ ;; The following fields are reset as they cannot be accessed from outside: ;; - 'edit' is set to false, i.e. the set is assumed to be persistent, not transient ;; - 'version' is set back to 0 - (PersistentSortedSet. meta cmp address @storage nil count nil 0)))) + (PersistentSortedSet. meta cmp address @storage nil count settings 0)))) "datahike.index.PersistentSortedSet.Leaf" (reify ReadHandler (read [_ reader _tag _component-count] (let [{:keys [keys level]} (.readObject reader)] - (Leaf. keys)))) + (Leaf. ^List keys settings)))) "datahike.index.PersistentSortedSet.Branch" (reify ReadHandler (read [_ reader _tag _component-count] (let [{:keys [keys level addresses]} (.readObject reader)] - (Branch. (int level) ^List keys ^List (seq addresses))))) + (Branch. (int level) ^List keys ^List (seq addresses) settings)))) "datahike.datom.Datom" (reify ReadHandler (read [_ reader _tag _component-count] @@ -236,8 +251,9 @@ {"datahike.index.PersistentSortedSet" (reify WriteHandler (write [_ writer pset] - (assert (not (nil? (.-_address ^PersistentSortedSet pset))) - "Must be flushed.") + (when (nil? (.-_address ^PersistentSortedSet pset)) + (dt/raise "Must be flushed." {:type :must-be-flushed + :pset pset})) (.writeTag writer "datahike.index.PersistentSortedSet" 1) (.writeObject writer {:meta (meta pset) :address (.-_address ^PersistentSortedSet pset) diff --git a/src/datahike/readers.cljc b/src/datahike/readers.cljc index 538885fc0..80344baa5 100644 --- a/src/datahike/readers.cljc +++ b/src/datahike/readers.cljc @@ -1,5 +1,5 @@ (ns datahike.readers - (:require [datahike.connections :refer [get-connection connections]] + (:require [datahike.connections :refer [get-connection *connections*]] [datahike.writing :as dw] [datahike.datom :refer [datom] :as dd] [datahike.impl.entity :as de] @@ -17,7 +17,7 @@ (defn db-from-reader [{:keys [schema datoms store-id commit-id] :as raw-db}] (if (and store-id commit-id) #?(:cljs (throw (ex-info "Reader not supported." {:type :reader-not-supported - :raw-db db})) + :raw-db raw-db})) :clj (if-let [conn (get-connection store-id)] (let [store (:store @conn)] @@ -38,7 +38,7 @@ (SinceDB. origin time-point)) (defn connection-from-reader [conn-id] - (:conn (@connections conn-id))) + (:conn (@*connections* conn-id))) (defn entity-from-reader [{:keys [db eid]}] (de/entity db eid)) diff --git a/src/datahike/tools.cljc b/src/datahike/tools.cljc index 516ce8e42..4419b0129 100644 --- a/src/datahike/tools.cljc +++ b/src/datahike/tools.cljc @@ -24,10 +24,14 @@ (defmacro case-tree [qs vs] (-case-tree qs vs))) -(defn ^:dynamic get-time [] - #?(:clj (java.util.Date.) +(defn ^:dynamic get-date [] + #?(:clj (Date.) :cljs (js/Date.))) +(defn ^:dynamic get-time-ms [] + #?(:clj (.getTime (Date.)) + :cljs (.getTime (js/Date.)))) + (defmacro raise "Logging an error and throwing an exception with message and structured data. Arguments: diff --git a/src/datahike/writer.cljc b/src/datahike/writer.cljc index 0d3e4d57d..78fd0af06 100644 --- a/src/datahike/writer.cljc +++ b/src/datahike/writer.cljc @@ -1,10 +1,10 @@ (ns ^:no-doc datahike.writer - (:require [superv.async :refer [S thread-try]] + (:require [superv.async :refer [S thread-try ! poll! buffer timeout]]) (:import [clojure.lang ExceptionInfo])) (defprotocol PWriter @@ -12,50 +12,109 @@ (-shutdown [_] "Returns a channel that resolves when the writer has shut down.") (-streaming? [_] "Returns whether the transactor is streaming updates directly into the connection, so it does not need to fetch from store on read.")) -(defrecord LocalWriter [queue thread streaming?] +(defrecord LocalWriter [thread streaming? transaction-queue-size commit-queue-size + transaction-queue commit-queue] PWriter (-dispatch! [_ arg-map] (let [p (promise-chan)] - (put! queue (assoc arg-map :callback p)) + (put! transaction-queue (assoc arg-map :callback p)) p)) (-shutdown [_] - (close! queue) + (close! transaction-queue) thread) (-streaming? [_] streaming?)) +(def ^:const DEFAULT_QUEUE_SIZE 120000) + +;; minimum wait time between commits in ms +;; this reduces write pressure on the storage +(def ^:const DEFAULT_COMMIT_WAIT_TIME 0) ;; in ms + (defn create-thread "Creates new transaction thread" - [connection queue write-fn-map] - (thread-try - S - (go-loop [] - (if-let [{:keys [op args callback] :as invocation} ( (count transaction-queue-buffer) (* 0.9 transaction-queue-size)) + (log/warn "Transaction queue buffer more than 90% full, " + (count transaction-queue-buffer) "of" transaction-queue-size " filled." + "Reduce transaction frequency.")) + (let [op-fn (write-fn-map op) + res (try + (apply op-fn connection args) + ;; Only catch ExceptionInfo here (intentionally rejected transactions). + ;; Any other exceptions should crash the writer and signal the supervisor. + (catch Exception e + (log/error "Error during invocation" invocation e args) + ;; take a guess that a NPE was triggered by an invalid connection + ;; short circuit on errors + (put! callback + (if (= (type e) NullPointerException) + (ex-info "Null pointer encountered in invocation. Connection may have been invalidated, e.g. through db deletion, and needs to be released everywhere." + {:type :writer-error-during-invocation + :invocation invocation + :connection connection + :error e}) + e)) + :error))] + (when-not (= res :error) + (when (> (count commit-queue-buffer) (/ commit-queue-size 2)) + (log/warn "Commit queue buffer more than 50% full, " + (count commit-queue-buffer) "of" commit-queue-size " filled." + "Throttling transaction processing. Reduce transaction frequency and check your storage throughput.") + ( res + (assoc-in [:tx-meta :db/commitId] commit-id) + (assoc :db-after commit-db))] + (put! callback res)))) + (catch Exception e + (doseq [[_ callback] @txs] + (put! callback e)) + (log/error "Writer thread shutting down because of commit error " e) + (close! commit-queue) + (close! transaction-queue))) + (LocalWriter - {:queue queue + {:transaction-queue transaction-queue + :transaction-queue-size transaction-queue-size + :commit-queue commit-queue + :commit-queue-size commit-queue-size :thread thread :streaming? true}))) @@ -105,8 +173,11 @@ writer (:writer @(:wrapped-atom connection))] (go (let [tx-report ( (:listeners (meta connection)) (deref))] + (callback tx-report))) (deliver p tx-report))) p)) diff --git a/src/datahike/writing.cljc b/src/datahike/writing.cljc index 157ba6d42..0c3c60cee 100644 --- a/src/datahike/writing.cljc +++ b/src/datahike/writing.cljc @@ -1,6 +1,6 @@ (ns datahike.writing "Manage all state changes and access to state of durable store." - (:require [datahike.connections :refer [delete-connection! connections]] + (:require [datahike.connections :refer [delete-connection! *connections*]] [datahike.db :as db] [datahike.db.utils :as dbu] [datahike.index :as di] @@ -10,7 +10,10 @@ [datahike.config :as dc] [konserve.core :as k] [taoensso.timbre :as log] - [hasch.core :refer [uuid]])) + [hasch.core :refer [uuid]] + [superv.async :refer [go-try- stored "Maps memory db to storage layout and flushes dirty indices." [db flush?] @@ -86,57 +109,65 @@ (defn branch-heads-as-commits [store parents] (set (doall (for [p parents] - (if-not (keyword? p) p - (let [{{:keys [datahike/commit-id]} :meta :as old-db} - (k/get store p nil {:sync? true})] - (when-not old-db - (dt/raise "Parent does not exist in store." - {:type :parent-does-not-exist-in-store - :parent p})) - commit-id)))))) + (do + (when (nil? p) + (dt/raise "Parent cannot be nil." {:type :parent-cannot-be-nil + :parent p})) + (if-not (keyword? p) p + (let [{{:keys [datahike/commit-id]} :meta :as old-db} + (k/get store p nil {:sync? true})] + (when-not old-db + (dt/raise "Parent does not exist in store." + {:type :parent-does-not-exist-in-store + :parent p})) + commit-id))))))) (defn create-commit-id [db] - (let [{:keys [hash max-tx max-eid config] - {:keys [parents]} :meta} db] - (if (:crypto-hash? config) - (uuid [hash max-tx max-eid parents]) - (uuid)))) - -(defn commit! [store config db parents] - (let [parents (or parents #{(get config :branch)}) - parents (branch-heads-as-commits store parents) - cid (create-commit-id db) - db (-> db - (assoc-in [:meta :datahike/commit-id] cid) - (assoc-in [:meta :datahike/parents] parents)) - db-to-store (db->stored db true)] - (k/assoc store cid db-to-store {:sync? true}) - (k/assoc store (:branch config) db-to-store {:sync? true}) - db)) - -(defn update-and-commit! - ([connection tx-data tx-meta update-fn] - (update-and-commit! connection tx-data tx-meta update-fn nil)) - ([connection tx-data tx-meta update-fn parents] - (let [{:keys [db/noCommit]} tx-meta - {:keys [db-after] - {:keys [db/txInstant]} - :tx-meta - :as tx-report} (update-fn connection tx-data tx-meta) - {:keys [config]} db-after - {:keys [store writer]} @(:wrapped-atom connection) - new-meta (assoc (:meta db-after) :datahike/updated-at txInstant) - db (assoc db-after :meta new-meta :writer writer) - db (if noCommit db (commit! store config db parents)) - tx-report (assoc tx-report :db-after db) - tx-report (if noCommit - tx-report - (assoc-in tx-report [:tx-meta :db/commitId] - (get-in db [:meta :datahike/commit-id])))] - (reset! connection db) - (doseq [[_ callback] (some-> (:listeners (meta connection)) (deref))] - (callback tx-report)) - tx-report))) + (let [{:keys [hash max-tx max-eid meta]} db] + (uuid [hash max-tx max-eid meta]))) + +(defn commit! + ([db parents] + (commit! db parents true)) + ([db parents sync?] + (async+sync sync? *default-sync-translation* + (go-try- + (let [{:keys [store config]} db + parents (or parents #{(get config :branch)}) + parents (branch-heads-as-commits store parents) + cid (create-commit-id db) + db (-> db + (assoc-in [:meta :datahike/commit-id] cid) + (assoc-in [:meta :datahike/parents] parents)) + db-to-store (db->stored db true) + _ ( % + (assoc-in [:meta :datahike/parents] parents) + (assoc-in [:meta :datahike/commit-id] commit-id))))) + +(defn update-connection! [connection tx-data tx-meta update-fn] + (let [ret-atom (atom nil)] + (swap! connection + (fn [old] + (let [{:keys [writer]} old + {:keys [db-after] + {:keys [db/txInstant]} + :tx-meta + :as tx-report} (update-fn old tx-data tx-meta) + new-meta (assoc (:meta db-after) :datahike/updated-at txInstant) + db (assoc db-after :meta new-meta :writer writer) + tx-report (assoc tx-report :db-after db)] + (reset! ret-atom tx-report) + db))) + @ret-atom)) (defprotocol PDatabaseManager (-create-database [config opts]) @@ -201,6 +232,7 @@ {:temporal-eavt-key (di/-flush temporal-eavt backend) :temporal-aevt-key (di/-flush temporal-aevt backend) :temporal-avet-key (di/-flush temporal-avet backend)}))] + (flush-pending-writes store true) (k/assoc store :branches #{:db} {:sync? true}) (k/assoc store cid db-to-store {:sync? true}) (k/assoc store :db db-to-store {:sync? true}) @@ -212,7 +244,7 @@ config-store-id (ds/store-identity (:store config)) active-conns (filter (fn [[store-id _branch]] (= store-id config-store-id)) - (keys @connections))] + (keys @*connections*))] (doseq [conn active-conns] (log/warn "Deleting database without releasing all connections first: " conn "." "All connections will be released now, but this cannot be ensured for remote readers.") @@ -245,8 +277,8 @@ (defn transact! [connection {:keys [tx-data tx-meta]}] (log/debug "Transacting" (count tx-data) " objects with meta: " tx-meta) (log/trace "Transaction data" tx-data) - (update-and-commit! connection tx-data tx-meta #(core/with @%1 %2 %3))) + (update-connection! connection tx-data tx-meta #(core/with %1 %2 %3))) (defn load-entities [connection entities] (log/debug "Loading" (count entities) " entities.") - (update-and-commit! connection entities nil #(core/load-entities-with @%1 %2 %3))) + (update-connection! connection entities nil #(core/load-entities-with %1 %2 %3))) diff --git a/test/datahike/norm/norm_test.clj b/test/datahike/norm/norm_test.clj index 474e9fae3..80e152ff5 100644 --- a/test/datahike/norm/norm_test.clj +++ b/test/datahike/norm/norm_test.clj @@ -21,7 +21,8 @@ (dissoc :db/id)))) (is (= #:db{:ident :tx/norm, :valueType :db.type/keyword, :cardinality :db.cardinality/one} (-> (schema :tx/norm) - (dissoc :db/id)))))) + (dissoc :db/id)))) + (d/release conn))) (defn tx-fn-test-fn [conn] (-> (for [[eid value] (d/q '[:find ?e ?v @@ -44,7 +45,8 @@ (d/q '[:find ?v :where [_ :character/place-of-occupation ?v]] - (d/db conn)))))) + (d/db conn)))) + (d/release conn))) (defn tx-data-and-tx-fn-test-fn [conn] (-> (for [[eid] @@ -89,7 +91,8 @@ [#:character{:name "Bart Simpson"} #:character{:name "Lisa Simpson"} #:character{:name "Maggie Simpson"}]}] - (d/pull-many (d/db conn) '[:character/name {:character/children [:character/name]}] margehomer))))) + (d/pull-many (d/db conn) '[:character/name {:character/children [:character/name]}] margehomer))) + (d/release conn))) (defn naming-and-sorting-test-fn [conn] (-> (for [[eid] (d/q '[:find ?e @@ -117,4 +120,5 @@ {:db/id 11, :character/name "Lisa Simpson", :character/occupation :student}] - (d/pull-many (d/db conn) '[*] lisabart))))) + (d/pull-many (d/db conn) '[*] lisabart))) + (d/release conn))) diff --git a/test/datahike/test/http/writer_test.clj b/test/datahike/test/http/writer_test.clj index e5ffeab18..1263874b5 100644 --- a/test/datahike/test/http/writer_test.clj +++ b/test/datahike/test/http/writer_test.clj @@ -7,13 +7,13 @@ (deftest test-http-writer (testing "Testing distributed datahike.http.writer implementation." - (let [port 31283 + (let [port 31283 server (start-server {:port port :join? false :dev-mode false :token "securerandompassword"})] (try - (let [cfg {:store {:backend :mem :id "distributed_writer"} + (let [cfg {:store {:backend :file :path "/tmp/distributed_writer"} :keep-history? true :schema-flexibility :read :writer {:backend :datahike-server @@ -63,21 +63,21 @@ (do (d/delete-database cfg) (d/create-database cfg) - (d/connect cfg))))) - (testing "Transact fails without writer connection." - (let [port 38217 - cfg {:store {:backend :mem :id "distributed_writer_transact"} - :keep-history? true - :schema-flexibility :read - :writer {:backend :datahike-server - :url (str "http://localhost:" port) - :token "securerandompassword"}} - server-cfg {:store {:backend :mem :id "distributed_writer_transact"} - :keep-history? true - :schema-flexibility :read}] + (d/connect cfg)))))) + (testing "Transact fails without writer connection." + (let [port 38217 + cfg {:store {:backend :mem :id "distributed_writer_transact"} + :keep-history? true + :schema-flexibility :read + :writer {:backend :datahike-server + :url (str "http://localhost:" port) + :token "securerandompassword"}} + server-cfg {:store {:backend :mem :id "distributed_writer_transact"} + :keep-history? true + :schema-flexibility :read}] ;; make sure the database exists before testing transact - (do (d/delete-database server-cfg) - (d/create-database server-cfg)) - (is (thrown? Exception - (d/transact (d/connect cfg) - [{:name "Should fail."}]))))))) + (do (d/delete-database server-cfg) + (d/create-database server-cfg)) + (is (thrown? Exception + (d/transact (d/connect cfg) + [{:name "Should fail."}])))))) diff --git a/test/datahike/test/stress_test.cljc b/test/datahike/test/stress_test.cljc new file mode 100644 index 000000000..df094b8d6 --- /dev/null +++ b/test/datahike/test/stress_test.cljc @@ -0,0 +1,56 @@ +(ns datahike.test.stress-test + (:require [datahike.api :as d] + #?(:cljs [cljs.test :as t :refer-macros [is deftest testing]] + :clj [clojure.test :as t :refer [is deftest testing]]))) + +(deftest ^:no-spec stress-test + (testing "Test lots of parallel reads and writes." + (let [avet? true + num-writes 10000 + num-reads 1000 + + schema [{:db/ident :name + :db/cardinality :db.cardinality/one + :db/index true + :db/unique :db.unique/identity + :db/valueType :db.type/string} + {:db/ident :sibling + :db/cardinality :db.cardinality/many + :db/valueType :db.type/ref} + {:db/ident :age + :db/cardinality :db.cardinality/one + :db/index avet? + :db/valueType :db.type/long}] + + cfg {:store {:backend :file :path "/tmp/dh-stress" + :config {:sync-blob? true :in-place? false}} + :keep-history? false + :schema-flexibility :read + :initial-tx []} + + _ (d/delete-database cfg) + _ (d/create-database cfg) + conn (d/connect cfg) + + _ (d/transact conn schema) + + ;; write in parallel and force the transactor to keep flusing + last-transact + (last + (for [i (shuffle (range num-writes))] + (do #_(prn "write") + (d/transact! conn {:tx-data [[:db/add (inc i) :age i]]}))))] + + ;; read while we are writing + (dotimes [_ num-reads] + #_(prn "read") + (d/q '[:find ?e :where [?e :age ?a]] + @conn)) + + @last-transact + (is (= num-writes + (d/q '[:find (count ?e) . + :where + [?e :age ?a]] + @conn))) + (d/release conn true)))) \ No newline at end of file diff --git a/test/datahike/test/transact_test.cljc b/test/datahike/test/transact_test.cljc index 83e6f86eb..1f18be4b5 100644 --- a/test/datahike/test/transact_test.cljc +++ b/test/datahike/test/transact_test.cljc @@ -432,7 +432,7 @@ (d/release conn))) (testing "manual txInstant is the same as auto-generated" (let [conn (du/setup-db) - date (tools/get-time) + date (tools/get-date) _ (d/transact conn {:tx-data [{:name "Sergey" :age 5}] :tx-meta {:db/txInstant date}})]