From 67e8624e0dd2352ccda680e695f27715a87d5a15 Mon Sep 17 00:00:00 2001 From: exfly Date: Thu, 22 Oct 2020 23:36:59 +0000 Subject: [PATCH 1/5] fix(gomod): go mod supported --- README | 5 ++ cache-benchmark/main.go | 3 +- chapter1/server/http/server.go | 3 +- chapter1/server/main.go | 4 +- chapter1/test/test.sh | 4 +- chapter2/server/main.go | 6 +-- chapter2/server/tcp/new.go | 3 +- chapter2/test/test.sh | 10 ++-- chapter3/server/cache/rocksdb.go | 2 +- chapter3/server/cache/rocksdb_del.go | 2 +- chapter3/server/cache/rocksdb_get.go | 2 +- chapter3/server/cache/rocksdb_getstat.go | 2 +- chapter3/server/cache/rocksdb_new.go | 2 +- chapter3/server/cache/rocksdb_set.go | 2 +- chapter3/server/main.go | 7 +-- client/main.go | 3 +- go.mod | 9 ++++ go.sum | 67 ++++++++++++++++++++++++ 18 files changed, 111 insertions(+), 25 deletions(-) create mode 100644 go.mod create mode 100644 go.sum diff --git a/README b/README index 52522db..09950c8 100644 --- a/README +++ b/README @@ -6,3 +6,8 @@ go get stathat.com/c/consistent git submodule update --init cd rocksdb && make static_lib + +go build -o cache-benchmark/cache-benchmark cache-benchmark/main.go +go build -o client/client client/main.go + +sudo apt-get install make g++ libz-dev libsnappy-dev libboost-dev liblz4-dev libzstd-dev diff --git a/cache-benchmark/main.go b/cache-benchmark/main.go index 8e45c98..69e9b99 100644 --- a/cache-benchmark/main.go +++ b/cache-benchmark/main.go @@ -1,12 +1,13 @@ package main import ( - "./cacheClient" "flag" "fmt" "math/rand" "strings" "time" + + "github.com/stuarthu/go-implement-your-cache-server/cache-benchmark/cacheClient" ) type statistic struct { diff --git a/chapter1/server/http/server.go b/chapter1/server/http/server.go index 9109f51..ec786f9 100644 --- a/chapter1/server/http/server.go +++ b/chapter1/server/http/server.go @@ -1,7 +1,8 @@ package http import ( - "../cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter3/server/cache" + "net/http" ) diff --git a/chapter1/server/main.go b/chapter1/server/main.go index 86ef4fd..590a1c1 100644 --- a/chapter1/server/main.go +++ b/chapter1/server/main.go @@ -1,8 +1,8 @@ package main import ( - "./cache" - "./http" + "github.com/stuarthu/go-implement-your-cache-server/chapter1/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter1/server/http" ) func main() { diff --git a/chapter1/test/test.sh b/chapter1/test/test.sh index 6851eb0..b8c9205 100755 --- a/chapter1/test/test.sh +++ b/chapter1/test/test.sh @@ -10,8 +10,8 @@ curl 127.0.0.1:12345/cache/testkey -XDELETE curl 127.0.0.1:12345/status -./cache-benchmark -type http -n 100000 -r 100000 -t set +./cache-benchmark/cache-benchmark -type http -n 100000 -r 100000 -t set -./cache-benchmark -type http -n 100000 -r 100000 -t get +./cache-benchmark/cache-benchmark -type http -n 100000 -r 100000 -t get redis-benchmark -c 1 -n 100000 -d 1000 -t set,get -r 100000 diff --git a/chapter2/server/main.go b/chapter2/server/main.go index 344a38e..a53b918 100644 --- a/chapter2/server/main.go +++ b/chapter2/server/main.go @@ -1,9 +1,9 @@ package main import ( - "./cache" - "./http" - "./tcp" + "github.com/stuarthu/go-implement-your-cache-server/chapter2/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter2/server/http" + "github.com/stuarthu/go-implement-your-cache-server/chapter2/server/tcp" ) func main() { diff --git a/chapter2/server/tcp/new.go b/chapter2/server/tcp/new.go index 13c8cd0..136e25c 100644 --- a/chapter2/server/tcp/new.go +++ b/chapter2/server/tcp/new.go @@ -1,8 +1,9 @@ package tcp import ( - "../cache" "net" + + "github.com/stuarthu/go-implement-your-cache-server/chapter3/server/cache" ) type Server struct { diff --git a/chapter2/test/test.sh b/chapter2/test/test.sh index 85dc33b..313e884 100755 --- a/chapter2/test/test.sh +++ b/chapter2/test/test.sh @@ -1,13 +1,13 @@ -./client -c set -k testkey -v testvalue +./client/client -c set -k testkey -v testvalue -./client -c get -k testkey +./client/client -c get -k testkey curl 127.0.0.1:12345/status -./client -c del -k testkey +./client/client -c del -k testkey curl 127.0.0.1:12345/status -./cache-benchmark -type tcp -n 100000 -r 100000 -t set +./cache-benchmark/cache-benchmark -type tcp -n 100000 -r 100000 -t set -./cache-benchmark -type tcp -n 100000 -r 100000 -t get +./cache-benchmark/cache-benchmark -type tcp -n 100000 -r 100000 -t get diff --git a/chapter3/server/cache/rocksdb.go b/chapter3/server/cache/rocksdb.go index 0813522..43f597f 100644 --- a/chapter3/server/cache/rocksdb.go +++ b/chapter3/server/cache/rocksdb.go @@ -2,7 +2,7 @@ package cache // #include "rocksdb/c.h" // #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include -// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 import "C" type rocksdbCache struct { diff --git a/chapter3/server/cache/rocksdb_del.go b/chapter3/server/cache/rocksdb_del.go index 3022e30..0670c9f 100644 --- a/chapter3/server/cache/rocksdb_del.go +++ b/chapter3/server/cache/rocksdb_del.go @@ -3,7 +3,7 @@ package cache // #include // #include "rocksdb/c.h" // #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include -// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 import "C" import ( "errors" diff --git a/chapter3/server/cache/rocksdb_get.go b/chapter3/server/cache/rocksdb_get.go index 4445388..97b5ea7 100644 --- a/chapter3/server/cache/rocksdb_get.go +++ b/chapter3/server/cache/rocksdb_get.go @@ -3,7 +3,7 @@ package cache // #include // #include "rocksdb/c.h" // #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include -// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 import "C" import ( "errors" diff --git a/chapter3/server/cache/rocksdb_getstat.go b/chapter3/server/cache/rocksdb_getstat.go index 6795562..9f49c25 100644 --- a/chapter3/server/cache/rocksdb_getstat.go +++ b/chapter3/server/cache/rocksdb_getstat.go @@ -3,7 +3,7 @@ package cache // #include // #include "rocksdb/c.h" // #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include -// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 import "C" import ( "regexp" diff --git a/chapter3/server/cache/rocksdb_new.go b/chapter3/server/cache/rocksdb_new.go index f097b27..53101ac 100644 --- a/chapter3/server/cache/rocksdb_new.go +++ b/chapter3/server/cache/rocksdb_new.go @@ -2,7 +2,7 @@ package cache // #include "rocksdb/c.h" // #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include -// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 import "C" import "runtime" diff --git a/chapter3/server/cache/rocksdb_set.go b/chapter3/server/cache/rocksdb_set.go index dc1062b..87e58df 100644 --- a/chapter3/server/cache/rocksdb_set.go +++ b/chapter3/server/cache/rocksdb_set.go @@ -3,7 +3,7 @@ package cache // #include // #include "rocksdb/c.h" // #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include -// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 import "C" import ( "errors" diff --git a/chapter3/server/main.go b/chapter3/server/main.go index d06d942..2974e55 100644 --- a/chapter3/server/main.go +++ b/chapter3/server/main.go @@ -1,9 +1,10 @@ package main import ( - "./cache" - "./http" - "./tcp" + "github.com/stuarthu/go-implement-your-cache-server/chapter3/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter3/server/http" + "github.com/stuarthu/go-implement-your-cache-server/chapter3/server/tcp" + "flag" "log" ) diff --git a/client/main.go b/client/main.go index ea63bf7..55dc775 100644 --- a/client/main.go +++ b/client/main.go @@ -1,9 +1,10 @@ package main import ( - "../cache-benchmark/cacheClient" "flag" "fmt" + + "github.com/stuarthu/go-implement-your-cache-server/cache-benchmark/cacheClient" ) func main() { diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3ba6a2c --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/stuarthu/go-implement-your-cache-server + +go 1.14 + +require ( + github.com/go-redis/redis v6.15.9+incompatible + github.com/onsi/ginkgo v1.14.2 // indirect + github.com/onsi/gomega v1.10.3 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..471c874 --- /dev/null +++ b/go.sum @@ -0,0 +1,67 @@ +github.com/exfly/bookcode v0.0.0-20200604001342-1f6544c7d803 h1:mt/s8v5DWobxzXpHSlHYx4v/PE1CGr/PHCX9uO4oMm8= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.2 h1:8mVmC9kjFFmA8H4pKMUhcblgifdkOIXPvbhN1T36q1M= +github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA= +github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= +github.com/stuarthu/go-implement-your-cache-server v0.0.0-20180319190417-da1ea4d68b56 h1:cEM+OORtQKDc/RIaOMy4vj+/LLDhMHUP2Bl2h4aSUYM= +github.com/stuarthu/go-implement-your-cache-server v0.0.0-20180319190417-da1ea4d68b56/go.mod h1:TOMkdtgcOSs41p0N+n5hxWIyiv5wUHlZUrYf3cSNHh0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M= +golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= From 13566d471c864dab2d066e40f34a613cc507ea3c Mon Sep 17 00:00:00 2001 From: exfly Date: Thu, 22 Oct 2020 23:39:41 +0000 Subject: [PATCH 2/5] fix(chapter03): fix ln --- chapter3/server/cache/cache.go | 9 ++++- chapter3/server/cache/inmemory.go | 47 +++++++++++++++++++++++- chapter3/server/cache/stat.go | 20 +++++++++- chapter3/server/http | 1 - chapter3/server/http/cache.go | 59 ++++++++++++++++++++++++++++++ chapter3/server/http/server.go | 21 +++++++++++ chapter3/server/http/status.go | 29 +++++++++++++++ chapter3/server/tcp | 1 - chapter3/server/tcp/new.go | 29 +++++++++++++++ chapter3/server/tcp/process.go | 61 +++++++++++++++++++++++++++++++ chapter3/server/tcp/read_key.go | 41 +++++++++++++++++++++ chapter3/server/tcp/utils.go | 33 +++++++++++++++++ chapter3/test | 1 - chapter3/test/test.sh | 13 +++++++ 14 files changed, 359 insertions(+), 6 deletions(-) mode change 120000 => 100644 chapter3/server/cache/cache.go mode change 120000 => 100644 chapter3/server/cache/inmemory.go mode change 120000 => 100644 chapter3/server/cache/stat.go delete mode 120000 chapter3/server/http create mode 100644 chapter3/server/http/cache.go create mode 100644 chapter3/server/http/server.go create mode 100644 chapter3/server/http/status.go delete mode 120000 chapter3/server/tcp create mode 100644 chapter3/server/tcp/new.go create mode 100644 chapter3/server/tcp/process.go create mode 100644 chapter3/server/tcp/read_key.go create mode 100644 chapter3/server/tcp/utils.go delete mode 120000 chapter3/test create mode 100755 chapter3/test/test.sh diff --git a/chapter3/server/cache/cache.go b/chapter3/server/cache/cache.go deleted file mode 120000 index 877c86a..0000000 --- a/chapter3/server/cache/cache.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/cache/cache.go \ No newline at end of file diff --git a/chapter3/server/cache/cache.go b/chapter3/server/cache/cache.go new file mode 100644 index 0000000..16903e2 --- /dev/null +++ b/chapter3/server/cache/cache.go @@ -0,0 +1,8 @@ +package cache + +type Cache interface { + Set(string, []byte) error + Get(string) ([]byte, error) + Del(string) error + GetStat() Stat +} diff --git a/chapter3/server/cache/inmemory.go b/chapter3/server/cache/inmemory.go deleted file mode 120000 index effc873..0000000 --- a/chapter3/server/cache/inmemory.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/cache/inmemory.go \ No newline at end of file diff --git a/chapter3/server/cache/inmemory.go b/chapter3/server/cache/inmemory.go new file mode 100644 index 0000000..633bff6 --- /dev/null +++ b/chapter3/server/cache/inmemory.go @@ -0,0 +1,46 @@ +package cache + +import "sync" + +type inMemoryCache struct { + c map[string][]byte + mutex sync.RWMutex + Stat +} + +func (c *inMemoryCache) Set(k string, v []byte) error { + c.mutex.Lock() + defer c.mutex.Unlock() + tmp, exist := c.c[k] + if exist { + c.del(k, tmp) + } + c.c[k] = v + c.add(k, v) + return nil +} + +func (c *inMemoryCache) Get(k string) ([]byte, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.c[k], nil +} + +func (c *inMemoryCache) Del(k string) error { + c.mutex.Lock() + defer c.mutex.Unlock() + v, exist := c.c[k] + if exist { + delete(c.c, k) + c.del(k, v) + } + return nil +} + +func (c *inMemoryCache) GetStat() Stat { + return c.Stat +} + +func newInMemoryCache() *inMemoryCache { + return &inMemoryCache{make(map[string][]byte), sync.RWMutex{}, Stat{}} +} diff --git a/chapter3/server/cache/stat.go b/chapter3/server/cache/stat.go deleted file mode 120000 index 7e92408..0000000 --- a/chapter3/server/cache/stat.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/cache/stat.go \ No newline at end of file diff --git a/chapter3/server/cache/stat.go b/chapter3/server/cache/stat.go new file mode 100644 index 0000000..9b11a2e --- /dev/null +++ b/chapter3/server/cache/stat.go @@ -0,0 +1,19 @@ +package cache + +type Stat struct { + Count int64 + KeySize int64 + ValueSize int64 +} + +func (s *Stat) add(k string, v []byte) { + s.Count += 1 + s.KeySize += int64(len(k)) + s.ValueSize += int64(len(v)) +} + +func (s *Stat) del(k string, v []byte) { + s.Count -= 1 + s.KeySize -= int64(len(k)) + s.ValueSize -= int64(len(v)) +} diff --git a/chapter3/server/http b/chapter3/server/http deleted file mode 120000 index 408a195..0000000 --- a/chapter3/server/http +++ /dev/null @@ -1 +0,0 @@ -../../chapter1/server/http \ No newline at end of file diff --git a/chapter3/server/http/cache.go b/chapter3/server/http/cache.go new file mode 100644 index 0000000..2b6234b --- /dev/null +++ b/chapter3/server/http/cache.go @@ -0,0 +1,59 @@ +package http + +import ( + "io/ioutil" + "log" + "net/http" + "strings" +) + +type cacheHandler struct { + *Server +} + +func (h *cacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + key := strings.Split(r.URL.EscapedPath(), "/")[2] + if len(key) == 0 { + w.WriteHeader(http.StatusBadRequest) + return + } + m := r.Method + if m == http.MethodPut { + b, _ := ioutil.ReadAll(r.Body) + if len(b) != 0 { + e := h.Set(key, b) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + } + } + return + } + if m == http.MethodGet { + b, e := h.Get(key) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + if len(b) == 0 { + w.WriteHeader(http.StatusNotFound) + return + } + w.Write(b) + return + } + if m == http.MethodDelete { + e := h.Del(key) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + } + return + } + w.WriteHeader(http.StatusMethodNotAllowed) +} + +func (s *Server) cacheHandler() http.Handler { + return &cacheHandler{s} +} diff --git a/chapter3/server/http/server.go b/chapter3/server/http/server.go new file mode 100644 index 0000000..ec786f9 --- /dev/null +++ b/chapter3/server/http/server.go @@ -0,0 +1,21 @@ +package http + +import ( + "github.com/stuarthu/go-implement-your-cache-server/chapter3/server/cache" + + "net/http" +) + +type Server struct { + cache.Cache +} + +func (s *Server) Listen() { + http.Handle("/cache/", s.cacheHandler()) + http.Handle("/status", s.statusHandler()) + http.ListenAndServe(":12345", nil) +} + +func New(c cache.Cache) *Server { + return &Server{c} +} diff --git a/chapter3/server/http/status.go b/chapter3/server/http/status.go new file mode 100644 index 0000000..e05e140 --- /dev/null +++ b/chapter3/server/http/status.go @@ -0,0 +1,29 @@ +package http + +import ( + "encoding/json" + "log" + "net/http" +) + +type statusHandler struct { + *Server +} + +func (h *statusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + b, e := json.Marshal(h.GetStat()) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) +} + +func (s *Server) statusHandler() http.Handler { + return &statusHandler{s} +} diff --git a/chapter3/server/tcp b/chapter3/server/tcp deleted file mode 120000 index 45235d3..0000000 --- a/chapter3/server/tcp +++ /dev/null @@ -1 +0,0 @@ -../../chapter2/server/tcp \ No newline at end of file diff --git a/chapter3/server/tcp/new.go b/chapter3/server/tcp/new.go new file mode 100644 index 0000000..136e25c --- /dev/null +++ b/chapter3/server/tcp/new.go @@ -0,0 +1,29 @@ +package tcp + +import ( + "net" + + "github.com/stuarthu/go-implement-your-cache-server/chapter3/server/cache" +) + +type Server struct { + cache.Cache +} + +func (s *Server) Listen() { + l, e := net.Listen("tcp", ":12346") + if e != nil { + panic(e) + } + for { + c, e := l.Accept() + if e != nil { + panic(e) + } + go s.process(c) + } +} + +func New(c cache.Cache) *Server { + return &Server{c} +} diff --git a/chapter3/server/tcp/process.go b/chapter3/server/tcp/process.go new file mode 100644 index 0000000..a552f34 --- /dev/null +++ b/chapter3/server/tcp/process.go @@ -0,0 +1,61 @@ +package tcp + +import ( + "bufio" + "io" + "log" + "net" +) + +func (s *Server) get(conn net.Conn, r *bufio.Reader) error { + k, e := s.readKey(r) + if e != nil { + return e + } + v, e := s.Get(k) + return sendResponse(v, e, conn) +} + +func (s *Server) set(conn net.Conn, r *bufio.Reader) error { + k, v, e := s.readKeyAndValue(r) + if e != nil { + return e + } + return sendResponse(nil, s.Set(k, v), conn) +} + +func (s *Server) del(conn net.Conn, r *bufio.Reader) error { + k, e := s.readKey(r) + if e != nil { + return e + } + return sendResponse(nil, s.Del(k), conn) +} + +func (s *Server) process(conn net.Conn) { + defer conn.Close() + r := bufio.NewReader(conn) + for { + op, e := r.ReadByte() + if e != nil { + if e != io.EOF { + log.Println("close connection due to error:", e) + } + return + } + if op == 'S' { + e = s.set(conn, r) + } else if op == 'G' { + e = s.get(conn, r) + } else if op == 'D' { + e = s.del(conn, r) + } else { + log.Println("close connection due to invalid operation:", op) + return + } + if e != nil { + log.Println("close connection due to error:", e) + return + } + } +} diff --git a/chapter3/server/tcp/read_key.go b/chapter3/server/tcp/read_key.go new file mode 100644 index 0000000..09e7894 --- /dev/null +++ b/chapter3/server/tcp/read_key.go @@ -0,0 +1,41 @@ +package tcp + +import ( + "bufio" + "io" +) + +func (s *Server) readKey(r *bufio.Reader) (string, error) { + klen, e := readLen(r) + if e != nil { + return "", e + } + k := make([]byte, klen) + _, e = io.ReadFull(r, k) + if e != nil { + return "", e + } + return string(k), nil +} + +func (s *Server) readKeyAndValue(r *bufio.Reader) (string, []byte, error) { + klen, e := readLen(r) + if e != nil { + return "", nil, e + } + vlen, e := readLen(r) + if e != nil { + return "", nil, e + } + k := make([]byte, klen) + _, e = io.ReadFull(r, k) + if e != nil { + return "", nil, e + } + v := make([]byte, vlen) + _, e = io.ReadFull(r, v) + if e != nil { + return "", nil, e + } + return string(k), v, nil +} diff --git a/chapter3/server/tcp/utils.go b/chapter3/server/tcp/utils.go new file mode 100644 index 0000000..e7d65da --- /dev/null +++ b/chapter3/server/tcp/utils.go @@ -0,0 +1,33 @@ +package tcp + +import ( + "bufio" + "fmt" + "net" + "strconv" + "strings" +) + +func readLen(r *bufio.Reader) (int, error) { + tmp, e := r.ReadString(' ') + if e != nil { + return 0, e + } + l, e := strconv.Atoi(strings.TrimSpace(tmp)) + if e != nil { + return 0, e + } + return l, nil +} + +func sendResponse(value []byte, err error, conn net.Conn) error { + if err != nil { + errString := err.Error() + tmp := fmt.Sprintf("-%d ", len(errString)) + errString + _, e := conn.Write([]byte(tmp)) + return e + } + vlen := fmt.Sprintf("%d ", len(value)) + _, e := conn.Write(append([]byte(vlen), value...)) + return e +} diff --git a/chapter3/test b/chapter3/test deleted file mode 120000 index f3fc111..0000000 --- a/chapter3/test +++ /dev/null @@ -1 +0,0 @@ -../chapter2/test \ No newline at end of file diff --git a/chapter3/test/test.sh b/chapter3/test/test.sh new file mode 100755 index 0000000..313e884 --- /dev/null +++ b/chapter3/test/test.sh @@ -0,0 +1,13 @@ +./client/client -c set -k testkey -v testvalue + +./client/client -c get -k testkey + +curl 127.0.0.1:12345/status + +./client/client -c del -k testkey + +curl 127.0.0.1:12345/status + +./cache-benchmark/cache-benchmark -type tcp -n 100000 -r 100000 -t set + +./cache-benchmark/cache-benchmark -type tcp -n 100000 -r 100000 -t get From b7501bd88b7eead5a50d097226a97831c7907e96 Mon Sep 17 00:00:00 2001 From: exfly Date: Fri, 23 Oct 2020 00:05:06 +0000 Subject: [PATCH 3/5] fix(chapter07): cluster works --- chapter7/server/cache | 1 - chapter7/server/cache/cache.go | 8 ++ chapter7/server/cache/inmemory.go | 46 ++++++++++++ chapter7/server/cache/new.go | 18 +++++ chapter7/server/cache/pair.go | 6 ++ chapter7/server/cache/rocksdb.go | 15 ++++ chapter7/server/cache/rocksdb_del.go | 21 ++++++ chapter7/server/cache/rocksdb_get.go | 23 ++++++ chapter7/server/cache/rocksdb_getstat.go | 32 ++++++++ chapter7/server/cache/rocksdb_new.go | 23 ++++++ chapter7/server/cache/rocksdb_set.go | 58 ++++++++++++++ chapter7/server/cache/stat.go | 19 +++++ chapter7/server/http/cache.go | 60 ++++++++++++++- chapter7/server/http/server.go | 5 +- chapter7/server/http/status.go | 30 +++++++- chapter7/server/main.go | 9 ++- chapter7/server/tcp/new.go | 4 +- chapter7/server/tcp/process.go | 96 +++++++++++++++++++++++- chapter7/server/tcp/utils.go | 34 ++++++++- go.mod | 2 + go.sum | 38 ++++++++++ 21 files changed, 535 insertions(+), 13 deletions(-) delete mode 120000 chapter7/server/cache create mode 100644 chapter7/server/cache/cache.go create mode 100644 chapter7/server/cache/inmemory.go create mode 100644 chapter7/server/cache/new.go create mode 100644 chapter7/server/cache/pair.go create mode 100644 chapter7/server/cache/rocksdb.go create mode 100644 chapter7/server/cache/rocksdb_del.go create mode 100644 chapter7/server/cache/rocksdb_get.go create mode 100644 chapter7/server/cache/rocksdb_getstat.go create mode 100644 chapter7/server/cache/rocksdb_new.go create mode 100644 chapter7/server/cache/rocksdb_set.go create mode 100644 chapter7/server/cache/stat.go mode change 120000 => 100644 chapter7/server/http/cache.go mode change 120000 => 100644 chapter7/server/http/status.go mode change 120000 => 100644 chapter7/server/tcp/process.go mode change 120000 => 100644 chapter7/server/tcp/utils.go diff --git a/chapter7/server/cache b/chapter7/server/cache deleted file mode 120000 index d971c32..0000000 --- a/chapter7/server/cache +++ /dev/null @@ -1 +0,0 @@ -../../chapter5/server/cache \ No newline at end of file diff --git a/chapter7/server/cache/cache.go b/chapter7/server/cache/cache.go new file mode 100644 index 0000000..16903e2 --- /dev/null +++ b/chapter7/server/cache/cache.go @@ -0,0 +1,8 @@ +package cache + +type Cache interface { + Set(string, []byte) error + Get(string) ([]byte, error) + Del(string) error + GetStat() Stat +} diff --git a/chapter7/server/cache/inmemory.go b/chapter7/server/cache/inmemory.go new file mode 100644 index 0000000..633bff6 --- /dev/null +++ b/chapter7/server/cache/inmemory.go @@ -0,0 +1,46 @@ +package cache + +import "sync" + +type inMemoryCache struct { + c map[string][]byte + mutex sync.RWMutex + Stat +} + +func (c *inMemoryCache) Set(k string, v []byte) error { + c.mutex.Lock() + defer c.mutex.Unlock() + tmp, exist := c.c[k] + if exist { + c.del(k, tmp) + } + c.c[k] = v + c.add(k, v) + return nil +} + +func (c *inMemoryCache) Get(k string) ([]byte, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.c[k], nil +} + +func (c *inMemoryCache) Del(k string) error { + c.mutex.Lock() + defer c.mutex.Unlock() + v, exist := c.c[k] + if exist { + delete(c.c, k) + c.del(k, v) + } + return nil +} + +func (c *inMemoryCache) GetStat() Stat { + return c.Stat +} + +func newInMemoryCache() *inMemoryCache { + return &inMemoryCache{make(map[string][]byte), sync.RWMutex{}, Stat{}} +} diff --git a/chapter7/server/cache/new.go b/chapter7/server/cache/new.go new file mode 100644 index 0000000..ddcb7e3 --- /dev/null +++ b/chapter7/server/cache/new.go @@ -0,0 +1,18 @@ +package cache + +import "log" + +func New(typ string) Cache { + var c Cache + if typ == "inmemory" { + c = newInMemoryCache() + } + if typ == "rocksdb" { + c = newRocksdbCache() + } + if c == nil { + panic("unknown cache type " + typ) + } + log.Println(typ, "ready to serve") + return c +} diff --git a/chapter7/server/cache/pair.go b/chapter7/server/cache/pair.go new file mode 100644 index 0000000..a0e09c8 --- /dev/null +++ b/chapter7/server/cache/pair.go @@ -0,0 +1,6 @@ +package cache + +type pair struct { + k string + v []byte +} diff --git a/chapter7/server/cache/rocksdb.go b/chapter7/server/cache/rocksdb.go new file mode 100644 index 0000000..d88f99a --- /dev/null +++ b/chapter7/server/cache/rocksdb.go @@ -0,0 +1,15 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" + +type rocksdbCache struct { + db *C.rocksdb_t + ro *C.rocksdb_readoptions_t + wo *C.rocksdb_writeoptions_t + e *C.char + ch chan *pair +} diff --git a/chapter7/server/cache/rocksdb_del.go b/chapter7/server/cache/rocksdb_del.go new file mode 100644 index 0000000..0670c9f --- /dev/null +++ b/chapter7/server/cache/rocksdb_del.go @@ -0,0 +1,21 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "errors" + "unsafe" +) + +func (c *rocksdbCache) Del(key string) error { + k := C.CString(key) + defer C.free(unsafe.Pointer(k)) + C.rocksdb_delete(c.db, c.wo, k, C.size_t(len(key)), &c.e) + if c.e != nil { + return errors.New(C.GoString(c.e)) + } + return nil +} diff --git a/chapter7/server/cache/rocksdb_get.go b/chapter7/server/cache/rocksdb_get.go new file mode 100644 index 0000000..97b5ea7 --- /dev/null +++ b/chapter7/server/cache/rocksdb_get.go @@ -0,0 +1,23 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "errors" + "unsafe" +) + +func (c *rocksdbCache) Get(key string) ([]byte, error) { + k := C.CString(key) + defer C.free(unsafe.Pointer(k)) + var length C.size_t + v := C.rocksdb_get(c.db, c.ro, k, C.size_t(len(key)), &length, &c.e) + if c.e != nil { + return nil, errors.New(C.GoString(c.e)) + } + defer C.free(unsafe.Pointer(v)) + return C.GoBytes(unsafe.Pointer(v), C.int(length)), nil +} diff --git a/chapter7/server/cache/rocksdb_getstat.go b/chapter7/server/cache/rocksdb_getstat.go new file mode 100644 index 0000000..9f49c25 --- /dev/null +++ b/chapter7/server/cache/rocksdb_getstat.go @@ -0,0 +1,32 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "regexp" + "strconv" + "unsafe" +) + +func (c *rocksdbCache) GetStat() Stat { + k := C.CString("rocksdb.aggregated-table-properties") + defer C.free(unsafe.Pointer(k)) + v := C.rocksdb_property_value(c.db, k) + defer C.free(unsafe.Pointer(v)) + p := C.GoString(v) + r := regexp.MustCompile(`([^;]+)=([^;]+);`) + s := Stat{} + for _, submatches := range r.FindAllStringSubmatch(p, -1) { + if submatches[1] == " # entries" { + s.Count, _ = strconv.ParseInt(submatches[2], 10, 64) + } else if submatches[1] == " raw key size" { + s.KeySize, _ = strconv.ParseInt(submatches[2], 10, 64) + } else if submatches[1] == " raw value size" { + s.ValueSize, _ = strconv.ParseInt(submatches[2], 10, 64) + } + } + return s +} diff --git a/chapter7/server/cache/rocksdb_new.go b/chapter7/server/cache/rocksdb_new.go new file mode 100644 index 0000000..eea6678 --- /dev/null +++ b/chapter7/server/cache/rocksdb_new.go @@ -0,0 +1,23 @@ +package cache + +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import "runtime" + +func newRocksdbCache() *rocksdbCache { + options := C.rocksdb_options_create() + C.rocksdb_options_increase_parallelism(options, C.int(runtime.NumCPU())) + C.rocksdb_options_set_create_if_missing(options, 1) + var e *C.char + db := C.rocksdb_open(options, C.CString("/tmp/rocksdb"), &e) + if e != nil { + panic(C.GoString(e)) + } + C.rocksdb_options_destroy(options) + c := make(chan *pair, 5000) + wo := C.rocksdb_writeoptions_create() + go write_func(db, c, wo) + return &rocksdbCache{db, C.rocksdb_readoptions_create(), wo, e, c} +} diff --git a/chapter7/server/cache/rocksdb_set.go b/chapter7/server/cache/rocksdb_set.go new file mode 100644 index 0000000..d38f73a --- /dev/null +++ b/chapter7/server/cache/rocksdb_set.go @@ -0,0 +1,58 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "time" + "unsafe" +) + +const BATCH_SIZE = 100 + +func flush_batch(db *C.rocksdb_t, b *C.rocksdb_writebatch_t, o *C.rocksdb_writeoptions_t) { + var e *C.char + C.rocksdb_write(db, o, b, &e) + if e != nil { + panic(C.GoString(e)) + } + C.rocksdb_writebatch_clear(b) +} + +func write_func(db *C.rocksdb_t, c chan *pair, o *C.rocksdb_writeoptions_t) { + count := 0 + t := time.NewTimer(time.Second) + b := C.rocksdb_writebatch_create() + for { + select { + case p := <-c: + count++ + key := C.CString(p.k) + value := C.CBytes(p.v) + C.rocksdb_writebatch_put(b, key, C.size_t(len(p.k)), (*C.char)(value), C.size_t(len(p.v))) + C.free(unsafe.Pointer(key)) + C.free(value) + if count == BATCH_SIZE { + flush_batch(db, b, o) + count = 0 + } + if !t.Stop() { + <-t.C + } + t.Reset(time.Second) + case <-t.C: + if count != 0 { + flush_batch(db, b, o) + count = 0 + } + t.Reset(time.Second) + } + } +} + +func (c *rocksdbCache) Set(key string, value []byte) error { + c.ch <- &pair{key, value} + return nil +} diff --git a/chapter7/server/cache/stat.go b/chapter7/server/cache/stat.go new file mode 100644 index 0000000..9b11a2e --- /dev/null +++ b/chapter7/server/cache/stat.go @@ -0,0 +1,19 @@ +package cache + +type Stat struct { + Count int64 + KeySize int64 + ValueSize int64 +} + +func (s *Stat) add(k string, v []byte) { + s.Count += 1 + s.KeySize += int64(len(k)) + s.ValueSize += int64(len(v)) +} + +func (s *Stat) del(k string, v []byte) { + s.Count -= 1 + s.KeySize -= int64(len(k)) + s.ValueSize -= int64(len(v)) +} diff --git a/chapter7/server/http/cache.go b/chapter7/server/http/cache.go deleted file mode 120000 index a048cd9..0000000 --- a/chapter7/server/http/cache.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/http/cache.go \ No newline at end of file diff --git a/chapter7/server/http/cache.go b/chapter7/server/http/cache.go new file mode 100644 index 0000000..2b6234b --- /dev/null +++ b/chapter7/server/http/cache.go @@ -0,0 +1,59 @@ +package http + +import ( + "io/ioutil" + "log" + "net/http" + "strings" +) + +type cacheHandler struct { + *Server +} + +func (h *cacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + key := strings.Split(r.URL.EscapedPath(), "/")[2] + if len(key) == 0 { + w.WriteHeader(http.StatusBadRequest) + return + } + m := r.Method + if m == http.MethodPut { + b, _ := ioutil.ReadAll(r.Body) + if len(b) != 0 { + e := h.Set(key, b) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + } + } + return + } + if m == http.MethodGet { + b, e := h.Get(key) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + if len(b) == 0 { + w.WriteHeader(http.StatusNotFound) + return + } + w.Write(b) + return + } + if m == http.MethodDelete { + e := h.Del(key) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + } + return + } + w.WriteHeader(http.StatusMethodNotAllowed) +} + +func (s *Server) cacheHandler() http.Handler { + return &cacheHandler{s} +} diff --git a/chapter7/server/http/server.go b/chapter7/server/http/server.go index d38c248..7562b20 100644 --- a/chapter7/server/http/server.go +++ b/chapter7/server/http/server.go @@ -1,9 +1,10 @@ package http import ( - "../cache" - "../cluster" "net/http" + + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/cluster" ) type Server struct { diff --git a/chapter7/server/http/status.go b/chapter7/server/http/status.go deleted file mode 120000 index c31e2af..0000000 --- a/chapter7/server/http/status.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/http/status.go \ No newline at end of file diff --git a/chapter7/server/http/status.go b/chapter7/server/http/status.go new file mode 100644 index 0000000..e05e140 --- /dev/null +++ b/chapter7/server/http/status.go @@ -0,0 +1,29 @@ +package http + +import ( + "encoding/json" + "log" + "net/http" +) + +type statusHandler struct { + *Server +} + +func (h *statusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + b, e := json.Marshal(h.GetStat()) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) +} + +func (s *Server) statusHandler() http.Handler { + return &statusHandler{s} +} diff --git a/chapter7/server/main.go b/chapter7/server/main.go index bcb7957..d1f01f0 100644 --- a/chapter7/server/main.go +++ b/chapter7/server/main.go @@ -1,12 +1,13 @@ package main import ( - "./cache" - "./cluster" - "./http" - "./tcp" "flag" "log" + + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/cluster" + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/http" + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/tcp" ) func main() { diff --git a/chapter7/server/tcp/new.go b/chapter7/server/tcp/new.go index be5ca0a..0081dc3 100644 --- a/chapter7/server/tcp/new.go +++ b/chapter7/server/tcp/new.go @@ -1,8 +1,8 @@ package tcp import ( - "../cache" - "../cluster" + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/cluster" "net" ) diff --git a/chapter7/server/tcp/process.go b/chapter7/server/tcp/process.go deleted file mode 120000 index e6a9d06..0000000 --- a/chapter7/server/tcp/process.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter6/server/tcp/process.go \ No newline at end of file diff --git a/chapter7/server/tcp/process.go b/chapter7/server/tcp/process.go new file mode 100644 index 0000000..95daa54 --- /dev/null +++ b/chapter7/server/tcp/process.go @@ -0,0 +1,95 @@ +package tcp + +import ( + "bufio" + "io" + "log" + "net" +) + +type result struct { + v []byte + e error +} + +func (s *Server) get(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, e := s.readKey(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + v, e := s.Get(k) + c <- &result{v, e} + }() +} + +func (s *Server) set(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, v, e := s.readKeyAndValue(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + c <- &result{nil, s.Set(k, v)} + }() +} + +func (s *Server) del(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, e := s.readKey(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + c <- &result{nil, s.Del(k)} + }() +} + +func reply(conn net.Conn, resultCh chan chan *result) { + defer conn.Close() + for { + c, open := <-resultCh + if !open { + return + } + r := <-c + e := sendResponse(r.v, r.e, conn) + if e != nil { + log.Println("close connection due to error:", e) + return + } + } +} + +func (s *Server) process(conn net.Conn) { + r := bufio.NewReader(conn) + resultCh := make(chan chan *result, 5000) + defer close(resultCh) + go reply(conn, resultCh) + for { + op, e := r.ReadByte() + if e != nil { + if e != io.EOF { + log.Println("close connection due to error:", e) + } + return + } + if op == 'S' { + s.set(resultCh, r) + } else if op == 'G' { + s.get(resultCh, r) + } else if op == 'D' { + s.del(resultCh, r) + } else { + log.Println("close connection due to invalid operation:", op) + return + } + } +} diff --git a/chapter7/server/tcp/utils.go b/chapter7/server/tcp/utils.go deleted file mode 120000 index 8bbfda2..0000000 --- a/chapter7/server/tcp/utils.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter6/server/tcp/utils.go \ No newline at end of file diff --git a/chapter7/server/tcp/utils.go b/chapter7/server/tcp/utils.go new file mode 100644 index 0000000..e7d65da --- /dev/null +++ b/chapter7/server/tcp/utils.go @@ -0,0 +1,33 @@ +package tcp + +import ( + "bufio" + "fmt" + "net" + "strconv" + "strings" +) + +func readLen(r *bufio.Reader) (int, error) { + tmp, e := r.ReadString(' ') + if e != nil { + return 0, e + } + l, e := strconv.Atoi(strings.TrimSpace(tmp)) + if e != nil { + return 0, e + } + return l, nil +} + +func sendResponse(value []byte, err error, conn net.Conn) error { + if err != nil { + errString := err.Error() + tmp := fmt.Sprintf("-%d ", len(errString)) + errString + _, e := conn.Write([]byte(tmp)) + return e + } + vlen := fmt.Sprintf("%d ", len(value)) + _, e := conn.Write(append([]byte(vlen), value...)) + return e +} diff --git a/go.mod b/go.mod index 3ba6a2c..0249196 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.14 require ( github.com/go-redis/redis v6.15.9+incompatible + github.com/hashicorp/memberlist v0.2.2 github.com/onsi/ginkgo v1.14.2 // indirect github.com/onsi/gomega v1.10.3 // indirect + stathat.com/c/consistent v1.0.0 ) diff --git a/go.sum b/go.sum index 471c874..cdae6c3 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,6 @@ +github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= +github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/exfly/bookcode v0.0.0-20200604001342-1f6544c7d803 h1:mt/s8v5DWobxzXpHSlHYx4v/PE1CGr/PHCX9uO4oMm8= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= @@ -11,10 +14,29 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4= +github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs= +github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/memberlist v0.2.2 h1:5+RffWKwqJ71YPu9mWsF7ZOscZmwfasdA8kbdC7AO2g= +github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= +github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -25,20 +47,32 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= +github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stuarthu/go-implement-your-cache-server v0.0.0-20180319190417-da1ea4d68b56 h1:cEM+OORtQKDc/RIaOMy4vj+/LLDhMHUP2Bl2h4aSUYM= github.com/stuarthu/go-implement-your-cache-server v0.0.0-20180319190417-da1ea4d68b56/go.mod h1:TOMkdtgcOSs41p0N+n5hxWIyiv5wUHlZUrYf3cSNHh0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M= golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -50,6 +84,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -65,3 +101,5 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= From a754be36a8de21efaba4ffefbcd92954a2feadc9 Mon Sep 17 00:00:00 2001 From: exfly Date: Fri, 23 Oct 2020 00:13:14 +0000 Subject: [PATCH 4/5] fix(chapter08): rebalence works --- chapter7/server/tcp/new.go | 3 +- chapter8/server/cache/inmemory.go | 47 +++++++++++- chapter8/server/cache/new.go | 19 ++++- chapter8/server/cache/pair.go | 7 +- chapter8/server/cache/rocksdb.go | 16 +++- chapter8/server/cache/rocksdb_del.go | 22 +++++- chapter8/server/cache/rocksdb_get.go | 24 +++++- chapter8/server/cache/rocksdb_getstat.go | 33 +++++++- chapter8/server/cache/rocksdb_new.go | 24 +++++- chapter8/server/cache/rocksdb_scanner.go | 2 +- chapter8/server/cache/rocksdb_set.go | 59 ++++++++++++++- chapter8/server/cache/stat.go | 20 ++++- chapter8/server/cluster | 1 - chapter8/server/cluster/cluster.go | 61 +++++++++++++++ chapter8/server/http/cache.go | 60 ++++++++++++++- chapter8/server/http/cluster.go | 31 +++++++- chapter8/server/http/server.go | 5 +- chapter8/server/http/status.go | 30 +++++++- chapter8/server/main.go | 29 +++++++- chapter8/server/tcp | 1 - chapter8/server/tcp/new.go | 30 ++++++++ chapter8/server/tcp/process.go | 95 ++++++++++++++++++++++++ chapter8/server/tcp/read_key.go | 52 +++++++++++++ chapter8/server/tcp/utils.go | 33 ++++++++ 24 files changed, 684 insertions(+), 20 deletions(-) mode change 120000 => 100644 chapter8/server/cache/inmemory.go mode change 120000 => 100644 chapter8/server/cache/new.go mode change 120000 => 100644 chapter8/server/cache/pair.go mode change 120000 => 100644 chapter8/server/cache/rocksdb.go mode change 120000 => 100644 chapter8/server/cache/rocksdb_del.go mode change 120000 => 100644 chapter8/server/cache/rocksdb_get.go mode change 120000 => 100644 chapter8/server/cache/rocksdb_getstat.go mode change 120000 => 100644 chapter8/server/cache/rocksdb_new.go mode change 120000 => 100644 chapter8/server/cache/rocksdb_set.go mode change 120000 => 100644 chapter8/server/cache/stat.go delete mode 120000 chapter8/server/cluster create mode 100644 chapter8/server/cluster/cluster.go mode change 120000 => 100644 chapter8/server/http/cache.go mode change 120000 => 100644 chapter8/server/http/cluster.go mode change 120000 => 100644 chapter8/server/http/status.go mode change 120000 => 100644 chapter8/server/main.go delete mode 120000 chapter8/server/tcp create mode 100644 chapter8/server/tcp/new.go create mode 100644 chapter8/server/tcp/process.go create mode 100644 chapter8/server/tcp/read_key.go create mode 100644 chapter8/server/tcp/utils.go diff --git a/chapter7/server/tcp/new.go b/chapter7/server/tcp/new.go index 0081dc3..5292197 100644 --- a/chapter7/server/tcp/new.go +++ b/chapter7/server/tcp/new.go @@ -1,9 +1,10 @@ package tcp import ( + "net" + "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/cache" "github.com/stuarthu/go-implement-your-cache-server/chapter7/server/cluster" - "net" ) type Server struct { diff --git a/chapter8/server/cache/inmemory.go b/chapter8/server/cache/inmemory.go deleted file mode 120000 index effc873..0000000 --- a/chapter8/server/cache/inmemory.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/cache/inmemory.go \ No newline at end of file diff --git a/chapter8/server/cache/inmemory.go b/chapter8/server/cache/inmemory.go new file mode 100644 index 0000000..633bff6 --- /dev/null +++ b/chapter8/server/cache/inmemory.go @@ -0,0 +1,46 @@ +package cache + +import "sync" + +type inMemoryCache struct { + c map[string][]byte + mutex sync.RWMutex + Stat +} + +func (c *inMemoryCache) Set(k string, v []byte) error { + c.mutex.Lock() + defer c.mutex.Unlock() + tmp, exist := c.c[k] + if exist { + c.del(k, tmp) + } + c.c[k] = v + c.add(k, v) + return nil +} + +func (c *inMemoryCache) Get(k string) ([]byte, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.c[k], nil +} + +func (c *inMemoryCache) Del(k string) error { + c.mutex.Lock() + defer c.mutex.Unlock() + v, exist := c.c[k] + if exist { + delete(c.c, k) + c.del(k, v) + } + return nil +} + +func (c *inMemoryCache) GetStat() Stat { + return c.Stat +} + +func newInMemoryCache() *inMemoryCache { + return &inMemoryCache{make(map[string][]byte), sync.RWMutex{}, Stat{}} +} diff --git a/chapter8/server/cache/new.go b/chapter8/server/cache/new.go deleted file mode 120000 index 421410c..0000000 --- a/chapter8/server/cache/new.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter3/server/cache/new.go \ No newline at end of file diff --git a/chapter8/server/cache/new.go b/chapter8/server/cache/new.go new file mode 100644 index 0000000..ddcb7e3 --- /dev/null +++ b/chapter8/server/cache/new.go @@ -0,0 +1,18 @@ +package cache + +import "log" + +func New(typ string) Cache { + var c Cache + if typ == "inmemory" { + c = newInMemoryCache() + } + if typ == "rocksdb" { + c = newRocksdbCache() + } + if c == nil { + panic("unknown cache type " + typ) + } + log.Println(typ, "ready to serve") + return c +} diff --git a/chapter8/server/cache/pair.go b/chapter8/server/cache/pair.go deleted file mode 120000 index 57c9db1..0000000 --- a/chapter8/server/cache/pair.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter5/server/cache/pair.go \ No newline at end of file diff --git a/chapter8/server/cache/pair.go b/chapter8/server/cache/pair.go new file mode 100644 index 0000000..a0e09c8 --- /dev/null +++ b/chapter8/server/cache/pair.go @@ -0,0 +1,6 @@ +package cache + +type pair struct { + k string + v []byte +} diff --git a/chapter8/server/cache/rocksdb.go b/chapter8/server/cache/rocksdb.go deleted file mode 120000 index 7de3a47..0000000 --- a/chapter8/server/cache/rocksdb.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter5/server/cache/rocksdb.go \ No newline at end of file diff --git a/chapter8/server/cache/rocksdb.go b/chapter8/server/cache/rocksdb.go new file mode 100644 index 0000000..d88f99a --- /dev/null +++ b/chapter8/server/cache/rocksdb.go @@ -0,0 +1,15 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" + +type rocksdbCache struct { + db *C.rocksdb_t + ro *C.rocksdb_readoptions_t + wo *C.rocksdb_writeoptions_t + e *C.char + ch chan *pair +} diff --git a/chapter8/server/cache/rocksdb_del.go b/chapter8/server/cache/rocksdb_del.go deleted file mode 120000 index b8ae6c6..0000000 --- a/chapter8/server/cache/rocksdb_del.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter3/server/cache/rocksdb_del.go \ No newline at end of file diff --git a/chapter8/server/cache/rocksdb_del.go b/chapter8/server/cache/rocksdb_del.go new file mode 100644 index 0000000..0670c9f --- /dev/null +++ b/chapter8/server/cache/rocksdb_del.go @@ -0,0 +1,21 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "errors" + "unsafe" +) + +func (c *rocksdbCache) Del(key string) error { + k := C.CString(key) + defer C.free(unsafe.Pointer(k)) + C.rocksdb_delete(c.db, c.wo, k, C.size_t(len(key)), &c.e) + if c.e != nil { + return errors.New(C.GoString(c.e)) + } + return nil +} diff --git a/chapter8/server/cache/rocksdb_get.go b/chapter8/server/cache/rocksdb_get.go deleted file mode 120000 index 0095047..0000000 --- a/chapter8/server/cache/rocksdb_get.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter3/server/cache/rocksdb_get.go \ No newline at end of file diff --git a/chapter8/server/cache/rocksdb_get.go b/chapter8/server/cache/rocksdb_get.go new file mode 100644 index 0000000..97b5ea7 --- /dev/null +++ b/chapter8/server/cache/rocksdb_get.go @@ -0,0 +1,23 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "errors" + "unsafe" +) + +func (c *rocksdbCache) Get(key string) ([]byte, error) { + k := C.CString(key) + defer C.free(unsafe.Pointer(k)) + var length C.size_t + v := C.rocksdb_get(c.db, c.ro, k, C.size_t(len(key)), &length, &c.e) + if c.e != nil { + return nil, errors.New(C.GoString(c.e)) + } + defer C.free(unsafe.Pointer(v)) + return C.GoBytes(unsafe.Pointer(v), C.int(length)), nil +} diff --git a/chapter8/server/cache/rocksdb_getstat.go b/chapter8/server/cache/rocksdb_getstat.go deleted file mode 120000 index e9bc48c..0000000 --- a/chapter8/server/cache/rocksdb_getstat.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter3/server/cache/rocksdb_getstat.go \ No newline at end of file diff --git a/chapter8/server/cache/rocksdb_getstat.go b/chapter8/server/cache/rocksdb_getstat.go new file mode 100644 index 0000000..9f49c25 --- /dev/null +++ b/chapter8/server/cache/rocksdb_getstat.go @@ -0,0 +1,32 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "regexp" + "strconv" + "unsafe" +) + +func (c *rocksdbCache) GetStat() Stat { + k := C.CString("rocksdb.aggregated-table-properties") + defer C.free(unsafe.Pointer(k)) + v := C.rocksdb_property_value(c.db, k) + defer C.free(unsafe.Pointer(v)) + p := C.GoString(v) + r := regexp.MustCompile(`([^;]+)=([^;]+);`) + s := Stat{} + for _, submatches := range r.FindAllStringSubmatch(p, -1) { + if submatches[1] == " # entries" { + s.Count, _ = strconv.ParseInt(submatches[2], 10, 64) + } else if submatches[1] == " raw key size" { + s.KeySize, _ = strconv.ParseInt(submatches[2], 10, 64) + } else if submatches[1] == " raw value size" { + s.ValueSize, _ = strconv.ParseInt(submatches[2], 10, 64) + } + } + return s +} diff --git a/chapter8/server/cache/rocksdb_new.go b/chapter8/server/cache/rocksdb_new.go deleted file mode 120000 index b8e3907..0000000 --- a/chapter8/server/cache/rocksdb_new.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter5/server/cache/rocksdb_new.go \ No newline at end of file diff --git a/chapter8/server/cache/rocksdb_new.go b/chapter8/server/cache/rocksdb_new.go new file mode 100644 index 0000000..8a14449 --- /dev/null +++ b/chapter8/server/cache/rocksdb_new.go @@ -0,0 +1,23 @@ +package cache + +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import "runtime" + +func newRocksdbCache() *rocksdbCache { + options := C.rocksdb_options_create() + C.rocksdb_options_increase_parallelism(options, C.int(runtime.NumCPU())) + C.rocksdb_options_set_create_if_missing(options, 1) + var e *C.char + db := C.rocksdb_open(options, C.CString("/mnt/rocksdb"), &e) + if e != nil { + panic(C.GoString(e)) + } + C.rocksdb_options_destroy(options) + c := make(chan *pair, 5000) + wo := C.rocksdb_writeoptions_create() + go write_func(db, c, wo) + return &rocksdbCache{db, C.rocksdb_readoptions_create(), wo, e, c} +} diff --git a/chapter8/server/cache/rocksdb_scanner.go b/chapter8/server/cache/rocksdb_scanner.go index 0cdd5d8..f534f1d 100644 --- a/chapter8/server/cache/rocksdb_scanner.go +++ b/chapter8/server/cache/rocksdb_scanner.go @@ -2,7 +2,7 @@ package cache // #include "rocksdb/c.h" // #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include -// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 import "C" import "unsafe" diff --git a/chapter8/server/cache/rocksdb_set.go b/chapter8/server/cache/rocksdb_set.go deleted file mode 120000 index 1b57bb5..0000000 --- a/chapter8/server/cache/rocksdb_set.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter5/server/cache/rocksdb_set.go \ No newline at end of file diff --git a/chapter8/server/cache/rocksdb_set.go b/chapter8/server/cache/rocksdb_set.go new file mode 100644 index 0000000..d38f73a --- /dev/null +++ b/chapter8/server/cache/rocksdb_set.go @@ -0,0 +1,58 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "time" + "unsafe" +) + +const BATCH_SIZE = 100 + +func flush_batch(db *C.rocksdb_t, b *C.rocksdb_writebatch_t, o *C.rocksdb_writeoptions_t) { + var e *C.char + C.rocksdb_write(db, o, b, &e) + if e != nil { + panic(C.GoString(e)) + } + C.rocksdb_writebatch_clear(b) +} + +func write_func(db *C.rocksdb_t, c chan *pair, o *C.rocksdb_writeoptions_t) { + count := 0 + t := time.NewTimer(time.Second) + b := C.rocksdb_writebatch_create() + for { + select { + case p := <-c: + count++ + key := C.CString(p.k) + value := C.CBytes(p.v) + C.rocksdb_writebatch_put(b, key, C.size_t(len(p.k)), (*C.char)(value), C.size_t(len(p.v))) + C.free(unsafe.Pointer(key)) + C.free(value) + if count == BATCH_SIZE { + flush_batch(db, b, o) + count = 0 + } + if !t.Stop() { + <-t.C + } + t.Reset(time.Second) + case <-t.C: + if count != 0 { + flush_batch(db, b, o) + count = 0 + } + t.Reset(time.Second) + } + } +} + +func (c *rocksdbCache) Set(key string, value []byte) error { + c.ch <- &pair{key, value} + return nil +} diff --git a/chapter8/server/cache/stat.go b/chapter8/server/cache/stat.go deleted file mode 120000 index 7e92408..0000000 --- a/chapter8/server/cache/stat.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/cache/stat.go \ No newline at end of file diff --git a/chapter8/server/cache/stat.go b/chapter8/server/cache/stat.go new file mode 100644 index 0000000..9b11a2e --- /dev/null +++ b/chapter8/server/cache/stat.go @@ -0,0 +1,19 @@ +package cache + +type Stat struct { + Count int64 + KeySize int64 + ValueSize int64 +} + +func (s *Stat) add(k string, v []byte) { + s.Count += 1 + s.KeySize += int64(len(k)) + s.ValueSize += int64(len(v)) +} + +func (s *Stat) del(k string, v []byte) { + s.Count -= 1 + s.KeySize -= int64(len(k)) + s.ValueSize -= int64(len(v)) +} diff --git a/chapter8/server/cluster b/chapter8/server/cluster deleted file mode 120000 index aeaa6ac..0000000 --- a/chapter8/server/cluster +++ /dev/null @@ -1 +0,0 @@ -../../chapter7/server/cluster \ No newline at end of file diff --git a/chapter8/server/cluster/cluster.go b/chapter8/server/cluster/cluster.go new file mode 100644 index 0000000..11896e7 --- /dev/null +++ b/chapter8/server/cluster/cluster.go @@ -0,0 +1,61 @@ +package cluster + +import ( + "github.com/hashicorp/memberlist" + "io/ioutil" + "stathat.com/c/consistent" + "time" +) + +type Node interface { + ShouldProcess(key string) (string, bool) + Members() []string + Addr() string +} + +type node struct { + *consistent.Consistent + addr string +} + +func (n *node) Addr() string { + return n.addr +} + +func New(addr, cluster string) (Node, error) { + conf := memberlist.DefaultLANConfig() + conf.Name = addr + conf.BindAddr = addr + conf.LogOutput = ioutil.Discard + l, e := memberlist.Create(conf) + if e != nil { + return nil, e + } + if cluster == "" { + cluster = addr + } + clu := []string{cluster} + _, e = l.Join(clu) + if e != nil { + return nil, e + } + circle := consistent.New() + circle.NumberOfReplicas = 256 + go func() { + for { + m := l.Members() + nodes := make([]string, len(m)) + for i, n := range m { + nodes[i] = n.Name + } + circle.Set(nodes) + time.Sleep(time.Second) + } + }() + return &node{circle, addr}, nil +} + +func (n *node) ShouldProcess(key string) (string, bool) { + addr, _ := n.Get(key) + return addr, addr == n.addr +} diff --git a/chapter8/server/http/cache.go b/chapter8/server/http/cache.go deleted file mode 120000 index a048cd9..0000000 --- a/chapter8/server/http/cache.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/http/cache.go \ No newline at end of file diff --git a/chapter8/server/http/cache.go b/chapter8/server/http/cache.go new file mode 100644 index 0000000..2b6234b --- /dev/null +++ b/chapter8/server/http/cache.go @@ -0,0 +1,59 @@ +package http + +import ( + "io/ioutil" + "log" + "net/http" + "strings" +) + +type cacheHandler struct { + *Server +} + +func (h *cacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + key := strings.Split(r.URL.EscapedPath(), "/")[2] + if len(key) == 0 { + w.WriteHeader(http.StatusBadRequest) + return + } + m := r.Method + if m == http.MethodPut { + b, _ := ioutil.ReadAll(r.Body) + if len(b) != 0 { + e := h.Set(key, b) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + } + } + return + } + if m == http.MethodGet { + b, e := h.Get(key) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + if len(b) == 0 { + w.WriteHeader(http.StatusNotFound) + return + } + w.Write(b) + return + } + if m == http.MethodDelete { + e := h.Del(key) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + } + return + } + w.WriteHeader(http.StatusMethodNotAllowed) +} + +func (s *Server) cacheHandler() http.Handler { + return &cacheHandler{s} +} diff --git a/chapter8/server/http/cluster.go b/chapter8/server/http/cluster.go deleted file mode 120000 index e88a865..0000000 --- a/chapter8/server/http/cluster.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter7/server/http/cluster.go \ No newline at end of file diff --git a/chapter8/server/http/cluster.go b/chapter8/server/http/cluster.go new file mode 100644 index 0000000..cb83390 --- /dev/null +++ b/chapter8/server/http/cluster.go @@ -0,0 +1,30 @@ +package http + +import ( + "encoding/json" + "log" + "net/http" +) + +type clusterHandler struct { + *Server +} + +func (h *clusterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + m := h.Members() + b, e := json.Marshal(m) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) +} + +func (s *Server) clusterHandler() http.Handler { + return &clusterHandler{s} +} diff --git a/chapter8/server/http/server.go b/chapter8/server/http/server.go index 4dd093b..0f5c98d 100644 --- a/chapter8/server/http/server.go +++ b/chapter8/server/http/server.go @@ -1,9 +1,10 @@ package http import ( - "../cache" - "../cluster" "net/http" + + "github.com/stuarthu/go-implement-your-cache-server/chapter8/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter8/server/cluster" ) type Server struct { diff --git a/chapter8/server/http/status.go b/chapter8/server/http/status.go deleted file mode 120000 index c31e2af..0000000 --- a/chapter8/server/http/status.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/http/status.go \ No newline at end of file diff --git a/chapter8/server/http/status.go b/chapter8/server/http/status.go new file mode 100644 index 0000000..e05e140 --- /dev/null +++ b/chapter8/server/http/status.go @@ -0,0 +1,29 @@ +package http + +import ( + "encoding/json" + "log" + "net/http" +) + +type statusHandler struct { + *Server +} + +func (h *statusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + b, e := json.Marshal(h.GetStat()) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) +} + +func (s *Server) statusHandler() http.Handler { + return &statusHandler{s} +} diff --git a/chapter8/server/main.go b/chapter8/server/main.go deleted file mode 120000 index 42bcd0c..0000000 --- a/chapter8/server/main.go +++ /dev/null @@ -1 +0,0 @@ -../../chapter7/server/main.go \ No newline at end of file diff --git a/chapter8/server/main.go b/chapter8/server/main.go new file mode 100644 index 0000000..25511ad --- /dev/null +++ b/chapter8/server/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "flag" + "log" + + "github.com/stuarthu/go-implement-your-cache-server/chapter8/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter8/server/cluster" + "github.com/stuarthu/go-implement-your-cache-server/chapter8/server/http" + "github.com/stuarthu/go-implement-your-cache-server/chapter8/server/tcp" +) + +func main() { + typ := flag.String("type", "inmemory", "cache type") + node := flag.String("node", "127.0.0.1", "node address") + clus := flag.String("cluster", "", "cluster address") + flag.Parse() + log.Println("type is", *typ) + log.Println("node is", *node) + log.Println("cluster is", *clus) + c := cache.New(*typ) + n, e := cluster.New(*node, *clus) + if e != nil { + panic(e) + } + go tcp.New(c, n).Listen() + http.New(c, n).Listen() +} diff --git a/chapter8/server/tcp b/chapter8/server/tcp deleted file mode 120000 index b87ec76..0000000 --- a/chapter8/server/tcp +++ /dev/null @@ -1 +0,0 @@ -../../chapter7/server/tcp \ No newline at end of file diff --git a/chapter8/server/tcp/new.go b/chapter8/server/tcp/new.go new file mode 100644 index 0000000..e245e8c --- /dev/null +++ b/chapter8/server/tcp/new.go @@ -0,0 +1,30 @@ +package tcp + +import ( + "github.com/stuarthu/go-implement-your-cache-server/chapter8/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter8/server/cluster" + "net" +) + +type Server struct { + cache.Cache + cluster.Node +} + +func (s *Server) Listen() { + l, e := net.Listen("tcp", s.Addr()+":12346") + if e != nil { + panic(e) + } + for { + c, e := l.Accept() + if e != nil { + panic(e) + } + go s.process(c) + } +} + +func New(c cache.Cache, n cluster.Node) *Server { + return &Server{c, n} +} diff --git a/chapter8/server/tcp/process.go b/chapter8/server/tcp/process.go new file mode 100644 index 0000000..95daa54 --- /dev/null +++ b/chapter8/server/tcp/process.go @@ -0,0 +1,95 @@ +package tcp + +import ( + "bufio" + "io" + "log" + "net" +) + +type result struct { + v []byte + e error +} + +func (s *Server) get(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, e := s.readKey(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + v, e := s.Get(k) + c <- &result{v, e} + }() +} + +func (s *Server) set(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, v, e := s.readKeyAndValue(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + c <- &result{nil, s.Set(k, v)} + }() +} + +func (s *Server) del(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, e := s.readKey(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + c <- &result{nil, s.Del(k)} + }() +} + +func reply(conn net.Conn, resultCh chan chan *result) { + defer conn.Close() + for { + c, open := <-resultCh + if !open { + return + } + r := <-c + e := sendResponse(r.v, r.e, conn) + if e != nil { + log.Println("close connection due to error:", e) + return + } + } +} + +func (s *Server) process(conn net.Conn) { + r := bufio.NewReader(conn) + resultCh := make(chan chan *result, 5000) + defer close(resultCh) + go reply(conn, resultCh) + for { + op, e := r.ReadByte() + if e != nil { + if e != io.EOF { + log.Println("close connection due to error:", e) + } + return + } + if op == 'S' { + s.set(resultCh, r) + } else if op == 'G' { + s.get(resultCh, r) + } else if op == 'D' { + s.del(resultCh, r) + } else { + log.Println("close connection due to invalid operation:", op) + return + } + } +} diff --git a/chapter8/server/tcp/read_key.go b/chapter8/server/tcp/read_key.go new file mode 100644 index 0000000..3a619c1 --- /dev/null +++ b/chapter8/server/tcp/read_key.go @@ -0,0 +1,52 @@ +package tcp + +import ( + "bufio" + "errors" + "io" +) + +func (s *Server) readKey(r *bufio.Reader) (string, error) { + klen, e := readLen(r) + if e != nil { + return "", e + } + k := make([]byte, klen) + _, e = io.ReadFull(r, k) + if e != nil { + return "", e + } + key := string(k) + addr, ok := s.ShouldProcess(key) + if !ok { + return "", errors.New("redirect " + addr) + } + return key, nil +} + +func (s *Server) readKeyAndValue(r *bufio.Reader) (string, []byte, error) { + klen, e := readLen(r) + if e != nil { + return "", nil, e + } + vlen, e := readLen(r) + if e != nil { + return "", nil, e + } + k := make([]byte, klen) + _, e = io.ReadFull(r, k) + if e != nil { + return "", nil, e + } + key := string(k) + addr, ok := s.ShouldProcess(key) + if !ok { + return "", nil, errors.New("redirect " + addr) + } + v := make([]byte, vlen) + _, e = io.ReadFull(r, v) + if e != nil { + return "", nil, e + } + return key, v, nil +} diff --git a/chapter8/server/tcp/utils.go b/chapter8/server/tcp/utils.go new file mode 100644 index 0000000..e7d65da --- /dev/null +++ b/chapter8/server/tcp/utils.go @@ -0,0 +1,33 @@ +package tcp + +import ( + "bufio" + "fmt" + "net" + "strconv" + "strings" +) + +func readLen(r *bufio.Reader) (int, error) { + tmp, e := r.ReadString(' ') + if e != nil { + return 0, e + } + l, e := strconv.Atoi(strings.TrimSpace(tmp)) + if e != nil { + return 0, e + } + return l, nil +} + +func sendResponse(value []byte, err error, conn net.Conn) error { + if err != nil { + errString := err.Error() + tmp := fmt.Sprintf("-%d ", len(errString)) + errString + _, e := conn.Write([]byte(tmp)) + return e + } + vlen := fmt.Sprintf("%d ", len(value)) + _, e := conn.Write(append([]byte(vlen), value...)) + return e +} From 0a05b499ddcd47011b7a215c2a730405a373c01c Mon Sep 17 00:00:00 2001 From: exfly Date: Fri, 23 Oct 2020 00:27:14 +0000 Subject: [PATCH 5/5] fix(chapter09): ttl works --- chapter9/server/cache/cache.go | 10 ++- chapter9/server/cache/pair.go | 7 +- chapter9/server/cache/rocksdb.go | 16 +++- chapter9/server/cache/rocksdb_del.go | 22 +++++- chapter9/server/cache/rocksdb_get.go | 24 +++++- chapter9/server/cache/rocksdb_getstat.go | 33 +++++++- chapter9/server/cache/rocksdb_new.go | 2 +- chapter9/server/cache/rocksdb_scanner.go | 43 ++++++++++- chapter9/server/cache/rocksdb_set.go | 59 ++++++++++++++- chapter9/server/cache/scanner.go | 9 ++- chapter9/server/cache/stat.go | 20 ++++- chapter9/server/cluster | 1 - chapter9/server/cluster/cluster.go | 61 +++++++++++++++ chapter9/server/http | 1 - chapter9/server/http/cache.go | 59 +++++++++++++++ chapter9/server/http/cluster.go | 30 ++++++++ chapter9/server/http/rebalance.go | 36 +++++++++ chapter9/server/http/server.go | 25 +++++++ chapter9/server/http/status.go | 29 ++++++++ chapter9/server/main.go | 8 +- chapter9/server/tcp | 1 - chapter9/server/tcp/new.go | 31 ++++++++ chapter9/server/tcp/process.go | 95 ++++++++++++++++++++++++ chapter9/server/tcp/read_key.go | 52 +++++++++++++ chapter9/server/tcp/utils.go | 33 ++++++++ 25 files changed, 689 insertions(+), 18 deletions(-) mode change 120000 => 100644 chapter9/server/cache/cache.go mode change 120000 => 100644 chapter9/server/cache/pair.go mode change 120000 => 100644 chapter9/server/cache/rocksdb.go mode change 120000 => 100644 chapter9/server/cache/rocksdb_del.go mode change 120000 => 100644 chapter9/server/cache/rocksdb_get.go mode change 120000 => 100644 chapter9/server/cache/rocksdb_getstat.go mode change 120000 => 100644 chapter9/server/cache/rocksdb_scanner.go mode change 120000 => 100644 chapter9/server/cache/rocksdb_set.go mode change 120000 => 100644 chapter9/server/cache/scanner.go mode change 120000 => 100644 chapter9/server/cache/stat.go delete mode 120000 chapter9/server/cluster create mode 100644 chapter9/server/cluster/cluster.go delete mode 120000 chapter9/server/http create mode 100644 chapter9/server/http/cache.go create mode 100644 chapter9/server/http/cluster.go create mode 100644 chapter9/server/http/rebalance.go create mode 100644 chapter9/server/http/server.go create mode 100644 chapter9/server/http/status.go delete mode 120000 chapter9/server/tcp create mode 100644 chapter9/server/tcp/new.go create mode 100644 chapter9/server/tcp/process.go create mode 100644 chapter9/server/tcp/read_key.go create mode 100644 chapter9/server/tcp/utils.go diff --git a/chapter9/server/cache/cache.go b/chapter9/server/cache/cache.go deleted file mode 120000 index fab5571..0000000 --- a/chapter9/server/cache/cache.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter8/server/cache/cache.go \ No newline at end of file diff --git a/chapter9/server/cache/cache.go b/chapter9/server/cache/cache.go new file mode 100644 index 0000000..62eccf9 --- /dev/null +++ b/chapter9/server/cache/cache.go @@ -0,0 +1,9 @@ +package cache + +type Cache interface { + Set(string, []byte) error + Get(string) ([]byte, error) + Del(string) error + GetStat() Stat + NewScanner() Scanner +} diff --git a/chapter9/server/cache/pair.go b/chapter9/server/cache/pair.go deleted file mode 120000 index 57c9db1..0000000 --- a/chapter9/server/cache/pair.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter5/server/cache/pair.go \ No newline at end of file diff --git a/chapter9/server/cache/pair.go b/chapter9/server/cache/pair.go new file mode 100644 index 0000000..a0e09c8 --- /dev/null +++ b/chapter9/server/cache/pair.go @@ -0,0 +1,6 @@ +package cache + +type pair struct { + k string + v []byte +} diff --git a/chapter9/server/cache/rocksdb.go b/chapter9/server/cache/rocksdb.go deleted file mode 120000 index 7de3a47..0000000 --- a/chapter9/server/cache/rocksdb.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter5/server/cache/rocksdb.go \ No newline at end of file diff --git a/chapter9/server/cache/rocksdb.go b/chapter9/server/cache/rocksdb.go new file mode 100644 index 0000000..d88f99a --- /dev/null +++ b/chapter9/server/cache/rocksdb.go @@ -0,0 +1,15 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" + +type rocksdbCache struct { + db *C.rocksdb_t + ro *C.rocksdb_readoptions_t + wo *C.rocksdb_writeoptions_t + e *C.char + ch chan *pair +} diff --git a/chapter9/server/cache/rocksdb_del.go b/chapter9/server/cache/rocksdb_del.go deleted file mode 120000 index b8ae6c6..0000000 --- a/chapter9/server/cache/rocksdb_del.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter3/server/cache/rocksdb_del.go \ No newline at end of file diff --git a/chapter9/server/cache/rocksdb_del.go b/chapter9/server/cache/rocksdb_del.go new file mode 100644 index 0000000..0670c9f --- /dev/null +++ b/chapter9/server/cache/rocksdb_del.go @@ -0,0 +1,21 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "errors" + "unsafe" +) + +func (c *rocksdbCache) Del(key string) error { + k := C.CString(key) + defer C.free(unsafe.Pointer(k)) + C.rocksdb_delete(c.db, c.wo, k, C.size_t(len(key)), &c.e) + if c.e != nil { + return errors.New(C.GoString(c.e)) + } + return nil +} diff --git a/chapter9/server/cache/rocksdb_get.go b/chapter9/server/cache/rocksdb_get.go deleted file mode 120000 index 0095047..0000000 --- a/chapter9/server/cache/rocksdb_get.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter3/server/cache/rocksdb_get.go \ No newline at end of file diff --git a/chapter9/server/cache/rocksdb_get.go b/chapter9/server/cache/rocksdb_get.go new file mode 100644 index 0000000..97b5ea7 --- /dev/null +++ b/chapter9/server/cache/rocksdb_get.go @@ -0,0 +1,23 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "errors" + "unsafe" +) + +func (c *rocksdbCache) Get(key string) ([]byte, error) { + k := C.CString(key) + defer C.free(unsafe.Pointer(k)) + var length C.size_t + v := C.rocksdb_get(c.db, c.ro, k, C.size_t(len(key)), &length, &c.e) + if c.e != nil { + return nil, errors.New(C.GoString(c.e)) + } + defer C.free(unsafe.Pointer(v)) + return C.GoBytes(unsafe.Pointer(v), C.int(length)), nil +} diff --git a/chapter9/server/cache/rocksdb_getstat.go b/chapter9/server/cache/rocksdb_getstat.go deleted file mode 120000 index e9bc48c..0000000 --- a/chapter9/server/cache/rocksdb_getstat.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter3/server/cache/rocksdb_getstat.go \ No newline at end of file diff --git a/chapter9/server/cache/rocksdb_getstat.go b/chapter9/server/cache/rocksdb_getstat.go new file mode 100644 index 0000000..9f49c25 --- /dev/null +++ b/chapter9/server/cache/rocksdb_getstat.go @@ -0,0 +1,32 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "regexp" + "strconv" + "unsafe" +) + +func (c *rocksdbCache) GetStat() Stat { + k := C.CString("rocksdb.aggregated-table-properties") + defer C.free(unsafe.Pointer(k)) + v := C.rocksdb_property_value(c.db, k) + defer C.free(unsafe.Pointer(v)) + p := C.GoString(v) + r := regexp.MustCompile(`([^;]+)=([^;]+);`) + s := Stat{} + for _, submatches := range r.FindAllStringSubmatch(p, -1) { + if submatches[1] == " # entries" { + s.Count, _ = strconv.ParseInt(submatches[2], 10, 64) + } else if submatches[1] == " raw key size" { + s.KeySize, _ = strconv.ParseInt(submatches[2], 10, 64) + } else if submatches[1] == " raw value size" { + s.ValueSize, _ = strconv.ParseInt(submatches[2], 10, 64) + } + } + return s +} diff --git a/chapter9/server/cache/rocksdb_new.go b/chapter9/server/cache/rocksdb_new.go index 645c53e..1aebbaa 100644 --- a/chapter9/server/cache/rocksdb_new.go +++ b/chapter9/server/cache/rocksdb_new.go @@ -2,7 +2,7 @@ package cache // #include "rocksdb/c.h" // #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include -// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 import "C" import "runtime" diff --git a/chapter9/server/cache/rocksdb_scanner.go b/chapter9/server/cache/rocksdb_scanner.go deleted file mode 120000 index 70d5ea1..0000000 --- a/chapter9/server/cache/rocksdb_scanner.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter8/server/cache/rocksdb_scanner.go \ No newline at end of file diff --git a/chapter9/server/cache/rocksdb_scanner.go b/chapter9/server/cache/rocksdb_scanner.go new file mode 100644 index 0000000..f534f1d --- /dev/null +++ b/chapter9/server/cache/rocksdb_scanner.go @@ -0,0 +1,42 @@ +package cache + +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lbz2 -llz4 -lzstd -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import "unsafe" + +type rocksdbScanner struct { + i *C.rocksdb_iterator_t + initialized bool +} + +func (s *rocksdbScanner) Close() { + C.rocksdb_iter_destroy(s.i) +} + +func (s *rocksdbScanner) Scan() bool { + if !s.initialized { + C.rocksdb_iter_seek_to_first(s.i) + s.initialized = true + } else { + C.rocksdb_iter_next(s.i) + } + return C.rocksdb_iter_valid(s.i) != 0 +} + +func (s *rocksdbScanner) Key() string { + var length C.size_t + k := C.rocksdb_iter_key(s.i, &length) + return C.GoString(k) +} + +func (s *rocksdbScanner) Value() []byte { + var length C.size_t + v := C.rocksdb_iter_value(s.i, &length) + return C.GoBytes(unsafe.Pointer(v), C.int(length)) +} + +func (c *rocksdbCache) NewScanner() Scanner { + return &rocksdbScanner{C.rocksdb_create_iterator(c.db, c.ro), false} +} diff --git a/chapter9/server/cache/rocksdb_set.go b/chapter9/server/cache/rocksdb_set.go deleted file mode 120000 index 1b57bb5..0000000 --- a/chapter9/server/cache/rocksdb_set.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter5/server/cache/rocksdb_set.go \ No newline at end of file diff --git a/chapter9/server/cache/rocksdb_set.go b/chapter9/server/cache/rocksdb_set.go new file mode 100644 index 0000000..d38f73a --- /dev/null +++ b/chapter9/server/cache/rocksdb_set.go @@ -0,0 +1,58 @@ +package cache + +// #include +// #include "rocksdb/c.h" +// #cgo CFLAGS: -I${SRCDIR}/../../../rocksdb/include +// #cgo LDFLAGS: -L${SRCDIR}/../../../rocksdb -lrocksdb -lz -lpthread -lsnappy -lstdc++ -lm -O3 +import "C" +import ( + "time" + "unsafe" +) + +const BATCH_SIZE = 100 + +func flush_batch(db *C.rocksdb_t, b *C.rocksdb_writebatch_t, o *C.rocksdb_writeoptions_t) { + var e *C.char + C.rocksdb_write(db, o, b, &e) + if e != nil { + panic(C.GoString(e)) + } + C.rocksdb_writebatch_clear(b) +} + +func write_func(db *C.rocksdb_t, c chan *pair, o *C.rocksdb_writeoptions_t) { + count := 0 + t := time.NewTimer(time.Second) + b := C.rocksdb_writebatch_create() + for { + select { + case p := <-c: + count++ + key := C.CString(p.k) + value := C.CBytes(p.v) + C.rocksdb_writebatch_put(b, key, C.size_t(len(p.k)), (*C.char)(value), C.size_t(len(p.v))) + C.free(unsafe.Pointer(key)) + C.free(value) + if count == BATCH_SIZE { + flush_batch(db, b, o) + count = 0 + } + if !t.Stop() { + <-t.C + } + t.Reset(time.Second) + case <-t.C: + if count != 0 { + flush_batch(db, b, o) + count = 0 + } + t.Reset(time.Second) + } + } +} + +func (c *rocksdbCache) Set(key string, value []byte) error { + c.ch <- &pair{key, value} + return nil +} diff --git a/chapter9/server/cache/scanner.go b/chapter9/server/cache/scanner.go deleted file mode 120000 index 90f7ee9..0000000 --- a/chapter9/server/cache/scanner.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter8/server/cache/scanner.go \ No newline at end of file diff --git a/chapter9/server/cache/scanner.go b/chapter9/server/cache/scanner.go new file mode 100644 index 0000000..4b0766a --- /dev/null +++ b/chapter9/server/cache/scanner.go @@ -0,0 +1,8 @@ +package cache + +type Scanner interface { + Scan() bool + Key() string + Value() []byte + Close() +} diff --git a/chapter9/server/cache/stat.go b/chapter9/server/cache/stat.go deleted file mode 120000 index 7e92408..0000000 --- a/chapter9/server/cache/stat.go +++ /dev/null @@ -1 +0,0 @@ -../../../chapter1/server/cache/stat.go \ No newline at end of file diff --git a/chapter9/server/cache/stat.go b/chapter9/server/cache/stat.go new file mode 100644 index 0000000..9b11a2e --- /dev/null +++ b/chapter9/server/cache/stat.go @@ -0,0 +1,19 @@ +package cache + +type Stat struct { + Count int64 + KeySize int64 + ValueSize int64 +} + +func (s *Stat) add(k string, v []byte) { + s.Count += 1 + s.KeySize += int64(len(k)) + s.ValueSize += int64(len(v)) +} + +func (s *Stat) del(k string, v []byte) { + s.Count -= 1 + s.KeySize -= int64(len(k)) + s.ValueSize -= int64(len(v)) +} diff --git a/chapter9/server/cluster b/chapter9/server/cluster deleted file mode 120000 index aeaa6ac..0000000 --- a/chapter9/server/cluster +++ /dev/null @@ -1 +0,0 @@ -../../chapter7/server/cluster \ No newline at end of file diff --git a/chapter9/server/cluster/cluster.go b/chapter9/server/cluster/cluster.go new file mode 100644 index 0000000..11896e7 --- /dev/null +++ b/chapter9/server/cluster/cluster.go @@ -0,0 +1,61 @@ +package cluster + +import ( + "github.com/hashicorp/memberlist" + "io/ioutil" + "stathat.com/c/consistent" + "time" +) + +type Node interface { + ShouldProcess(key string) (string, bool) + Members() []string + Addr() string +} + +type node struct { + *consistent.Consistent + addr string +} + +func (n *node) Addr() string { + return n.addr +} + +func New(addr, cluster string) (Node, error) { + conf := memberlist.DefaultLANConfig() + conf.Name = addr + conf.BindAddr = addr + conf.LogOutput = ioutil.Discard + l, e := memberlist.Create(conf) + if e != nil { + return nil, e + } + if cluster == "" { + cluster = addr + } + clu := []string{cluster} + _, e = l.Join(clu) + if e != nil { + return nil, e + } + circle := consistent.New() + circle.NumberOfReplicas = 256 + go func() { + for { + m := l.Members() + nodes := make([]string, len(m)) + for i, n := range m { + nodes[i] = n.Name + } + circle.Set(nodes) + time.Sleep(time.Second) + } + }() + return &node{circle, addr}, nil +} + +func (n *node) ShouldProcess(key string) (string, bool) { + addr, _ := n.Get(key) + return addr, addr == n.addr +} diff --git a/chapter9/server/http b/chapter9/server/http deleted file mode 120000 index d993783..0000000 --- a/chapter9/server/http +++ /dev/null @@ -1 +0,0 @@ -../../chapter8/server/http \ No newline at end of file diff --git a/chapter9/server/http/cache.go b/chapter9/server/http/cache.go new file mode 100644 index 0000000..2b6234b --- /dev/null +++ b/chapter9/server/http/cache.go @@ -0,0 +1,59 @@ +package http + +import ( + "io/ioutil" + "log" + "net/http" + "strings" +) + +type cacheHandler struct { + *Server +} + +func (h *cacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + key := strings.Split(r.URL.EscapedPath(), "/")[2] + if len(key) == 0 { + w.WriteHeader(http.StatusBadRequest) + return + } + m := r.Method + if m == http.MethodPut { + b, _ := ioutil.ReadAll(r.Body) + if len(b) != 0 { + e := h.Set(key, b) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + } + } + return + } + if m == http.MethodGet { + b, e := h.Get(key) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + if len(b) == 0 { + w.WriteHeader(http.StatusNotFound) + return + } + w.Write(b) + return + } + if m == http.MethodDelete { + e := h.Del(key) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + } + return + } + w.WriteHeader(http.StatusMethodNotAllowed) +} + +func (s *Server) cacheHandler() http.Handler { + return &cacheHandler{s} +} diff --git a/chapter9/server/http/cluster.go b/chapter9/server/http/cluster.go new file mode 100644 index 0000000..cb83390 --- /dev/null +++ b/chapter9/server/http/cluster.go @@ -0,0 +1,30 @@ +package http + +import ( + "encoding/json" + "log" + "net/http" +) + +type clusterHandler struct { + *Server +} + +func (h *clusterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + m := h.Members() + b, e := json.Marshal(m) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) +} + +func (s *Server) clusterHandler() http.Handler { + return &clusterHandler{s} +} diff --git a/chapter9/server/http/rebalance.go b/chapter9/server/http/rebalance.go new file mode 100644 index 0000000..9e55920 --- /dev/null +++ b/chapter9/server/http/rebalance.go @@ -0,0 +1,36 @@ +package http + +import ( + "bytes" + "net/http" +) + +type rebalanceHandler struct { + *Server +} + +func (h *rebalanceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + } + go h.rebalance() +} + +func (h *rebalanceHandler) rebalance() { + s := h.NewScanner() + defer s.Close() + c := &http.Client{} + for s.Scan() { + k := s.Key() + n, ok := h.ShouldProcess(k) + if !ok { + r, _ := http.NewRequest(http.MethodPut, "http://"+n+":12345/cache/"+k, bytes.NewReader(s.Value())) + c.Do(r) + h.Del(k) + } + } +} + +func (s *Server) rebalanceHandler() http.Handler { + return &rebalanceHandler{s} +} diff --git a/chapter9/server/http/server.go b/chapter9/server/http/server.go new file mode 100644 index 0000000..cd1abc2 --- /dev/null +++ b/chapter9/server/http/server.go @@ -0,0 +1,25 @@ +package http + +import ( + "net/http" + + "github.com/stuarthu/go-implement-your-cache-server/chapter9/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter9/server/cluster" +) + +type Server struct { + cache.Cache + cluster.Node +} + +func (s *Server) Listen() { + http.Handle("/cache/", s.cacheHandler()) + http.Handle("/status", s.statusHandler()) + http.Handle("/cluster", s.clusterHandler()) + http.Handle("/rebalance", s.rebalanceHandler()) + http.ListenAndServe(s.Addr()+":12345", nil) +} + +func New(c cache.Cache, n cluster.Node) *Server { + return &Server{c, n} +} diff --git a/chapter9/server/http/status.go b/chapter9/server/http/status.go new file mode 100644 index 0000000..e05e140 --- /dev/null +++ b/chapter9/server/http/status.go @@ -0,0 +1,29 @@ +package http + +import ( + "encoding/json" + "log" + "net/http" +) + +type statusHandler struct { + *Server +} + +func (h *statusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + b, e := json.Marshal(h.GetStat()) + if e != nil { + log.Println(e) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(b) +} + +func (s *Server) statusHandler() http.Handler { + return &statusHandler{s} +} diff --git a/chapter9/server/main.go b/chapter9/server/main.go index 0ecf5e3..491a1c5 100644 --- a/chapter9/server/main.go +++ b/chapter9/server/main.go @@ -1,10 +1,10 @@ package main import ( - "./cache" - "./cluster" - "./http" - "./tcp" + "github.com/stuarthu/go-implement-your-cache-server/chapter9/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter9/server/cluster" + "github.com/stuarthu/go-implement-your-cache-server/chapter9/server/http" + "github.com/stuarthu/go-implement-your-cache-server/chapter9/server/tcp" "flag" "log" ) diff --git a/chapter9/server/tcp b/chapter9/server/tcp deleted file mode 120000 index b87ec76..0000000 --- a/chapter9/server/tcp +++ /dev/null @@ -1 +0,0 @@ -../../chapter7/server/tcp \ No newline at end of file diff --git a/chapter9/server/tcp/new.go b/chapter9/server/tcp/new.go new file mode 100644 index 0000000..11a8d8a --- /dev/null +++ b/chapter9/server/tcp/new.go @@ -0,0 +1,31 @@ +package tcp + +import ( + "net" + + "github.com/stuarthu/go-implement-your-cache-server/chapter9/server/cache" + "github.com/stuarthu/go-implement-your-cache-server/chapter9/server/cluster" +) + +type Server struct { + cache.Cache + cluster.Node +} + +func (s *Server) Listen() { + l, e := net.Listen("tcp", s.Addr()+":12346") + if e != nil { + panic(e) + } + for { + c, e := l.Accept() + if e != nil { + panic(e) + } + go s.process(c) + } +} + +func New(c cache.Cache, n cluster.Node) *Server { + return &Server{c, n} +} diff --git a/chapter9/server/tcp/process.go b/chapter9/server/tcp/process.go new file mode 100644 index 0000000..95daa54 --- /dev/null +++ b/chapter9/server/tcp/process.go @@ -0,0 +1,95 @@ +package tcp + +import ( + "bufio" + "io" + "log" + "net" +) + +type result struct { + v []byte + e error +} + +func (s *Server) get(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, e := s.readKey(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + v, e := s.Get(k) + c <- &result{v, e} + }() +} + +func (s *Server) set(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, v, e := s.readKeyAndValue(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + c <- &result{nil, s.Set(k, v)} + }() +} + +func (s *Server) del(ch chan chan *result, r *bufio.Reader) { + c := make(chan *result) + ch <- c + k, e := s.readKey(r) + if e != nil { + c <- &result{nil, e} + return + } + go func() { + c <- &result{nil, s.Del(k)} + }() +} + +func reply(conn net.Conn, resultCh chan chan *result) { + defer conn.Close() + for { + c, open := <-resultCh + if !open { + return + } + r := <-c + e := sendResponse(r.v, r.e, conn) + if e != nil { + log.Println("close connection due to error:", e) + return + } + } +} + +func (s *Server) process(conn net.Conn) { + r := bufio.NewReader(conn) + resultCh := make(chan chan *result, 5000) + defer close(resultCh) + go reply(conn, resultCh) + for { + op, e := r.ReadByte() + if e != nil { + if e != io.EOF { + log.Println("close connection due to error:", e) + } + return + } + if op == 'S' { + s.set(resultCh, r) + } else if op == 'G' { + s.get(resultCh, r) + } else if op == 'D' { + s.del(resultCh, r) + } else { + log.Println("close connection due to invalid operation:", op) + return + } + } +} diff --git a/chapter9/server/tcp/read_key.go b/chapter9/server/tcp/read_key.go new file mode 100644 index 0000000..3a619c1 --- /dev/null +++ b/chapter9/server/tcp/read_key.go @@ -0,0 +1,52 @@ +package tcp + +import ( + "bufio" + "errors" + "io" +) + +func (s *Server) readKey(r *bufio.Reader) (string, error) { + klen, e := readLen(r) + if e != nil { + return "", e + } + k := make([]byte, klen) + _, e = io.ReadFull(r, k) + if e != nil { + return "", e + } + key := string(k) + addr, ok := s.ShouldProcess(key) + if !ok { + return "", errors.New("redirect " + addr) + } + return key, nil +} + +func (s *Server) readKeyAndValue(r *bufio.Reader) (string, []byte, error) { + klen, e := readLen(r) + if e != nil { + return "", nil, e + } + vlen, e := readLen(r) + if e != nil { + return "", nil, e + } + k := make([]byte, klen) + _, e = io.ReadFull(r, k) + if e != nil { + return "", nil, e + } + key := string(k) + addr, ok := s.ShouldProcess(key) + if !ok { + return "", nil, errors.New("redirect " + addr) + } + v := make([]byte, vlen) + _, e = io.ReadFull(r, v) + if e != nil { + return "", nil, e + } + return key, v, nil +} diff --git a/chapter9/server/tcp/utils.go b/chapter9/server/tcp/utils.go new file mode 100644 index 0000000..e7d65da --- /dev/null +++ b/chapter9/server/tcp/utils.go @@ -0,0 +1,33 @@ +package tcp + +import ( + "bufio" + "fmt" + "net" + "strconv" + "strings" +) + +func readLen(r *bufio.Reader) (int, error) { + tmp, e := r.ReadString(' ') + if e != nil { + return 0, e + } + l, e := strconv.Atoi(strings.TrimSpace(tmp)) + if e != nil { + return 0, e + } + return l, nil +} + +func sendResponse(value []byte, err error, conn net.Conn) error { + if err != nil { + errString := err.Error() + tmp := fmt.Sprintf("-%d ", len(errString)) + errString + _, e := conn.Write([]byte(tmp)) + return e + } + vlen := fmt.Sprintf("%d ", len(value)) + _, e := conn.Write(append([]byte(vlen), value...)) + return e +}