diff --git a/README.md b/README.md index 44a9313..7767d7d 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,40 @@ SINGLE_BEAT_REDIS_SERVER='redis://redis-host:6379/1' single-beat celery beat it will try to spawn celerybeat every 60 seconds. +- SINGLE_BEAT_REDIS_SENTINEL_SSL (default False) + + enable SSL mode for Sentinel nodes in Sentinel configuration + +- SINGLE_BEAT_REDIS_SENTINEL_SSL_CA_CERTS + + (optional) CA certificate (single PEM file) for SSL validation for Sentinel nodes + +- SINGLE_BEAT_REDIS_SENTINEL_SSL_CERT_REQS (default "required" if SSL True) + + SSL certificate validation mode for Sentinel nodes. + + The options are: + - "none": doesn't require a certificate from the server for authentication. + - "optional": can accept a certificate from the server, but it doesn't require one. + - "required": must receive and validate a certificate from the server + +- SINGLE_BEAT_REDIS_SSL (default False) + + enable SSL mode for Redis nodes in Sentinel configuration + +- SINGLE_BEAT_REDIS_SSL_CA_CERTS + + (optional) CA certificate (single PEM file) for SSL validation for Redis nodes in Sentinel configuration + +- SINGLE_BEAT_REDIS_SSL_CERT_REQS (default "required" if SSL True) + + SSL certificate validation mode for Redis nodes in Sentinel configuration. + + The options are: + - "none": doesn't require a certificate from the server for authentication. + - "optional": can accept a certificate from the server, but it doesn't require one. + - "required": must receive and validate a certificate from the server + Cli ------------- Single-beat also has a simple cli, that gives info about where your process is living - also can pause single-beat, restart your process etc. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ba5dcd2 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[tool.black] +target-version = ["py311"] +line-length = 96 +include = '\.pyi?$' +exclude = ''' +/( + | __pycache__ + | .git + | .mypy_cache + | .pytest_cache + | .tox + | .venv + | .vscode +)/ +''' diff --git a/setup.py b/setup.py index baeccd6..3c594c6 100755 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='single-beat', - version='0.6.3', + version='0.6.4', long_description=long_description, long_description_content_type="text/markdown", description='ensures only one instance of your process across your servers', diff --git a/singlebeat/beat.py b/singlebeat/beat.py index b07c7de..fc4da29 100644 --- a/singlebeat/beat.py +++ b/singlebeat/beat.py @@ -22,21 +22,29 @@ def env(identifier, default, type=noop): class Config(object): - REDIS_SERVER = env('REDIS_SERVER', 'redis://localhost:6379') - REDIS_PASSWORD = env('REDIS_PASSWORD', None) - REDIS_SENTINEL = env('REDIS_SENTINEL', None) - REDIS_SENTINEL_MASTER = env('REDIS_SENTINEL_MASTER', 'mymaster') - REDIS_SENTINEL_DB = env('REDIS_SENTINEL_DB', 0) - REDIS_SENTINEL_PASSWORD = env('REDIS_SENTINEL_PASSWORD', None) - IDENTIFIER = env('IDENTIFIER', None) - LOCK_TIME = env('LOCK_TIME', 5, int) - INITIAL_LOCK_TIME = env('INITIAL_LOCK_TIME', LOCK_TIME * 2, int) - HEARTBEAT_INTERVAL = env('HEARTBEAT_INTERVAL', 1, int) - HOST_IDENTIFIER = env('HOST_IDENTIFIER', socket.gethostname()) - LOG_LEVEL = env('LOG_LEVEL', 'warn') + REDIS_SERVER = env("REDIS_SERVER", "redis://localhost:6379") + REDIS_PASSWORD = env("REDIS_PASSWORD", None) + REDIS_SENTINEL = env("REDIS_SENTINEL", None) + REDIS_SENTINEL_MASTER = env("REDIS_SENTINEL_MASTER", "mymaster") + REDIS_SENTINEL_DB = env("REDIS_SENTINEL_DB", 0) + REDIS_SENTINEL_PASSWORD = env("REDIS_SENTINEL_PASSWORD", None) + IDENTIFIER = env("IDENTIFIER", None) + LOCK_TIME = env("LOCK_TIME", 5, int) + INITIAL_LOCK_TIME = env("INITIAL_LOCK_TIME", LOCK_TIME * 2, int) + HEARTBEAT_INTERVAL = env("HEARTBEAT_INTERVAL", 1, int) + HOST_IDENTIFIER = env("HOST_IDENTIFIER", socket.gethostname()) + LOG_LEVEL = env("LOG_LEVEL", "warn") # wait_mode can be, supervisord or heartbeat WAIT_MODE = env("WAIT_MODE", "heartbeat") WAIT_BEFORE_DIE = env("WAIT_BEFORE_DIE", 60, int) + # SSL config + REDIS_SSL = env("REDIS_SSL", False, bool) + REDIS_SSL_CA_CERTS = env("REDIS_SSL_CA_CERTS", None) + REDIS_SSL_CERT_REQS = env("REDIS_SSL_CERT_REQS", None) + REDIS_SENTINEL_SSL = env("REDIS_SENTINEL_SSL", False, bool) + REDIS_SENTINEL_SSL_CA_CERTS = env("REDIS_SENTINEL_SSL_CA_CERTS", None) + REDIS_SENTINEL_SSL_CERT_REQS = env("REDIS_SENTINEL_SSL_CERT_REQS", None) + _host_identifier = None def check(self, cond, message): @@ -60,9 +68,17 @@ def checks(self): def get_redis(self): if self.REDIS_SENTINEL: - return self._sentinel.master_for(self.REDIS_SENTINEL_MASTER, - password=self.REDIS_PASSWORD, - redis_class=redis.Redis) + redis_kwargs = {} + if self.REDIS_SSL: + redis_kwargs["ssl"] = True + redis_kwargs["ssl_ca_certs"] = self.REDIS_SSL_CA_CERTS or None + redis_kwargs["ssl_cert_reqs"] = self.REDIS_SSL_CERT_REQS or "required" + return self._sentinel.master_for( + self.REDIS_SENTINEL_MASTER, + password=self.REDIS_PASSWORD, + redis_class=redis.Redis, + **redis_kwargs, + ) return self._redis def rewrite_redis_url(self): @@ -82,12 +98,22 @@ def rewrite_redis_url(self): def __init__(self): if self.REDIS_SENTINEL: - sentinels = [tuple(s.split(':')) for s in self.REDIS_SENTINEL.split(';')] - self._sentinel = redis.sentinel.Sentinel(sentinels, - db=self.REDIS_SENTINEL_DB, - socket_timeout=0.1, - sentinel_kwargs={"password": self.REDIS_SENTINEL_PASSWORD} - ) + sentinels = [tuple(s.split(":")) for s in self.REDIS_SENTINEL.split(";")] + sentinel_kwargs = { + "password": self.REDIS_SENTINEL_PASSWORD, + } + if self.REDIS_SENTINEL_SSL: + sentinel_kwargs["ssl"] = True + sentinel_kwargs["ssl_ca_certs"] = self.REDIS_SENTINEL_SSL_CA_CERTS or None + sentinel_kwargs["ssl_cert_reqs"] = ( + self.REDIS_SENTINEL_SSL_CERT_REQS or "required" + ) + self._sentinel = redis.sentinel.Sentinel( + sentinels, + db=self.REDIS_SENTINEL_DB, + socket_timeout=0.1, + sentinel_kwargs=sentinel_kwargs, + ) else: self._redis = redis.Redis.from_url(self.rewrite_redis_url()) @@ -107,9 +133,7 @@ def get_host_identifier(self): if self._host_identifier: return self._host_identifier local_ip_addr = ( - self.get_redis() - .connection_pool.get_connection("ping") - ._sock.getsockname()[0] + self.get_redis().connection_pool.get_connection("ping")._sock.getsockname()[0] ) self._host_identifier = "{}:{}".format(local_ip_addr, os.getpid()) return self._host_identifier @@ -209,9 +233,7 @@ async def timer_cb_waiting(self): return self.ioloop.create_task(self.spawn_process()) # couldn't acquire lock if config.WAIT_MODE == "supervised": - logger.debug( - "already running, will exit after %s seconds" % config.WAIT_BEFORE_DIE - ) + logger.debug("already running, will exit after %s seconds" % config.WAIT_BEFORE_DIE) time.sleep(config.WAIT_BEFORE_DIE) sys.exit() @@ -257,7 +279,9 @@ async def timer_cb_running(self): rds.set( "SINGLE_BEAT_{identifier}".format(identifier=self.identifier), "{}:{}:{}".format( - self.fence_token, config.HOST_IDENTIFIER, self.process_pid() + self.fence_token, + config.HOST_IDENTIFIER, + self.process_pid(), ), ex=config.LOCK_TIME, ) @@ -312,7 +336,8 @@ def sigterm_handler(self, signum, loop): if self.state == "RUNNING": logger.debug( - "already running sending signal to child - %s", self.sprocess.pid + "already running sending signal to child - %s", + self.sprocess.pid, ) self.sprocess.send_signal(signum) logger.debug("waiting for subprocess to finish") @@ -325,7 +350,7 @@ async def run(self): await asyncio.sleep(config.HEARTBEAT_INTERVAL) async def _read_stream(self, stream, cb): - decoder = codecs.getincrementaldecoder('utf-8')(errors='strict') + decoder = codecs.getincrementaldecoder("utf-8")(errors="strict") while True: line = await stream.read(100) @@ -344,7 +369,7 @@ async def spawn_process(self): *cmd, env=env, stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + stderr=asyncio.subprocess.PIPE, ) except FileNotFoundError: """ @@ -356,8 +381,12 @@ async def spawn_process(self): try: await asyncio.wait( [ - asyncio.create_task(self._read_stream(self.sprocess.stdout, self.forward_stdout)), - asyncio.create_task(self._read_stream(self.sprocess.stderr, self.forward_stderr)), + asyncio.create_task( + self._read_stream(self.sprocess.stdout, self.forward_stdout) + ), + asyncio.create_task( + self._read_stream(self.sprocess.stderr, self.forward_stderr) + ), ] ) self.child_exit_cb(self.sprocess.returncode) @@ -506,4 +535,3 @@ def main(): if __name__ == "__main__": main() -