Skip to content

Commit 8e56b94

Browse files
ashtulgkorland
authored andcommitted
Add pipeline support (#9)
1 parent a9aeb39 commit 8e56b94

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

redisbloom/client.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import six
22
import redis
33
from redis import Redis, RedisError
4+
from redis.client import Pipeline
45
from redis.client import bool_ok
56
from redis._compat import (long, nativestr)
67
from redis.exceptions import DataError
@@ -192,7 +193,6 @@ def appendBucketSize(params, bucket_size):
192193
params.extend(['BUCKETSIZE', bucket_size])
193194

194195
################## Bloom Filter Functions ######################
195-
196196
def bfCreate(self, key, errorRate, capacity, expansion=None, noScale=None):
197197
"""
198198
Creates a new Bloom Filter ``key`` with desired probability of false
@@ -502,3 +502,21 @@ def topkInfo(self, key):
502502

503503
return self.execute_command(self.TOPK_INFO, key)
504504

505+
def pipeline(self, transaction=True, shard_hint=None):
506+
"""
507+
Return a new pipeline object that can queue multiple commands for
508+
later execution. ``transaction`` indicates whether all commands
509+
should be executed atomically. Apart from making a group of operations
510+
atomic, pipelines are useful for reducing the back-and-forth overhead
511+
between the client and server.
512+
Overridden in order to provide the right client through the pipeline.
513+
"""
514+
p = Pipeline(
515+
connection_pool=self.connection_pool,
516+
response_callbacks=self.response_callbacks,
517+
transaction=transaction,
518+
shard_hint=shard_hint)
519+
return p
520+
521+
class Pipeline(Pipeline, Client):
522+
"Pipeline for RedisBloom Client"

test_commands.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,22 @@ def testTopK(self):
180180
self.assertEqual(3, info.depth)
181181
self.assertAlmostEqual(0.9, float(info.decay))
182182

183+
def test_pipeline(self):
184+
pipeline = rb.pipeline()
185+
186+
self.assertFalse(rb.execute_command('get pipeline'))
187+
188+
self.assertTrue(rb.bfCreate('pipeline', 0.01, 1000))
189+
for i in range(100):
190+
pipeline.bfAdd('pipeline', i)
191+
for i in range(100):
192+
self.assertFalse(rb.bfExists('pipeline', i))
193+
194+
pipeline.execute()
195+
196+
for i in range(100):
197+
self.assertTrue(rb.bfExists('pipeline', i))
198+
199+
183200
if __name__ == '__main__':
184201
unittest.main()

0 commit comments

Comments
 (0)