Skip to content
This repository was archived by the owner on Apr 28, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions src/genegraph/database/dataset.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
(ns genegraph.database.dataset
"Namespace for handling operations on persistent Jena datasets.
Specifically designed around handling asychronous writes. "
(:require [clojure.spec.alpha :as spec]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't appear to be used currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now my plan is to use spec to describe and validate the expected parameters for dataset commands and the parameters for opening a dataset. Have not put that in yet though...

[clojure.core.async :as async])
(:import [org.apache.jena.tdb2 TDB2Factory]
[org.apache.jena.query Dataset ReadWrite]
[java.util.concurrent BlockingQueue ArrayBlockingQueue TimeUnit]
[java.util List ArrayList]
[org.apache.jena.query.text TextDatasetFactory]))


(defrecord PersistentDataset [dataset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know defrecord doesn't take a doc string, but it would be nice to describe the args here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, per above comment would like to use spec for this.

run-atom
complete-promise
write-queue
write-queue-size
name]

java.io.Closeable
(close [this]
(reset! run-atom false)
(if (deref complete-promise (* 5 1000) false)
(.close dataset)
(do (throw (ex-info "Timeout closing dataset."
(select-keys this [:name])))
false))))


(defn execute
"Execute command on dataset"
[dataset command]
;; consider raising exception if bad command
(case (:command command)
:replace (.replaceNamedModel dataset
(:model-name command)
(:model command))
:remove (.removeNamedModel dataset
(:model-name command))
(throw (ex-info "Invalid command" command))))

(defn deliver-commit-promise
"Deliver promise when command is committed to the database."
[command]
(when-let [committed-promise (:committed command)]
(deliver committed-promise true)))

(defn write-loop
"Read commands from queue and execute them on dataset. Pulls multiple records
from the queue if possible given buffer limit and availability"
[{:keys [write-queue-size write-queue run-atom complete-promise dataset] :as dataset-record}]
(let [write-buffer (ArrayList. write-queue-size)]
(while @run-atom
(try
(when-let [first-command (.poll write-queue 1000 TimeUnit/MILLISECONDS)]
(.drainTo write-queue write-buffer write-queue-size)
(let [commands (cons first-command write-buffer)]
(.begin dataset ReadWrite/WRITE)
(run! #(execute dataset %) commands)
(.clear write-buffer)
(.commit dataset)
(.end dataset)
(run! deliver-commit-promise commands)))
(catch Exception e (println e))))
(deliver complete-promise true)))

(defn execute-async
"execute command list asynchronously"
[{:keys [run-atom write-queue] :as dataset} commands]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataset not referenced

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, I ought to clean that up.

(when @run-atom
(run! #(.put write-queue %) commands)))

(defn execute-sync
"Execute command list synchronously. Will claim write transaction."
[{:keys [dataset]} commands]
(.begin dataset ReadWrite/WRITE)
(run! #(execute dataset %) commands)
(.commit dataset)
(.end dataset))

(defn dataset [opts]
(let [write-queue-size (or (:write-queue-size opts) 100)
persistent-dataset (map->PersistentDataset
(merge
opts
{:dataset (cond
(:assembly-path opts)
(TextDatasetFactory/create (:assembly-path))
(:path opts)
(TDB2Factory/connectDataset (:path opts))
:else (TDB2Factory/createDataset))
:run-atom (atom true)
:complete-promise (promise)
:write-queue-size write-queue-size
:write-queue (ArrayBlockingQueue.
write-queue-size)}))]
(.start (Thread. #(write-loop persistent-dataset)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concerned about how many threads are going to be started here. Wouldn't a configurable threadpool be more prudent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be just one thread per dataset. All the thread is doing is looking at the write queue and writing out the effects of the commands there to the dataset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what would be the delineation of a dataset? What datasets are you thinking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands now, there might be just one, the one we define in genegraph.database.instance. This would leave the flexibility to have more if needed, much in the same way we have a few rocks-db instances per running Genegraph. Perhaps the data aggregations necessary to put together the data for ClinVar could happen in their own dataset, for instance.

persistent-dataset))
2 changes: 1 addition & 1 deletion src/genegraph/database/query/algebra.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns genegraph.database.query.algebra
(:require [genegraph.database.instance :refer [db]]
(:require #_[genegraph.database.instance :refer [db]] ; TODO remove if this works
[genegraph.database.query.types :as types]
[genegraph.database.util :as util :refer [tx]]
[genegraph.database.names :as names :refer
Expand Down
3 changes: 2 additions & 1 deletion src/genegraph/source/graphql/gene_dosage.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[genegraph.database.query :as q]
[genegraph.source.graphql.common.cache :refer [defresolver]]
[com.walmartlabs.lacinia.schema :refer [tag-with-type]]
[genegraph.database.instance :refer [db]])
#_[genegraph.database.instance :refer [db]] ; TODO remove if this works
)
(:import [org.apache.jena.query ReadWrite QueryFactory QueryExecutionFactory]
[org.apache.jena.sparql.util QueryExecUtils]))

Expand Down
2 changes: 1 addition & 1 deletion src/genegraph/suggest/infix_suggester.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(:require [clojure.string :as str]
[mount.core :as mount :refer [defstate]]
[genegraph.env :as env]
[genegraph.database.instance :refer [db]]
#_[genegraph.database.instance :refer [db]] ;; TODO remove if this works
[genegraph.database.query :as q]
[genegraph.source.graphql.condition :as condition]
[genegraph.source.graphql.common.curation :as curation]
Expand Down