Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,40 @@ SINGLE_BEAT_REDIS_SERVER='redis://redis-host:6379/1' single-beat celery beat

it will try to spawn celerybeat every 60 seconds.

- SINGLE_BEAT_REDIS_SENTINEL_SSL (default False)

enable SSL mode for Sentinel nodes in Sentinel configuration

- SINGLE_BEAT_REDIS_SENTINEL_SSL_CA_CERTS

(optional) CA certificate (single PEM file) for SSL validation for Sentinel nodes

- SINGLE_BEAT_REDIS_SENTINEL_SSL_CERT_REQS (default "required" if SSL True)

SSL certificate validation mode for Sentinel nodes.

The options are:
- "none": doesn't require a certificate from the server for authentication.
- "optional": can accept a certificate from the server, but it doesn't require one.
- "required": must receive and validate a certificate from the server

- SINGLE_BEAT_REDIS_SSL (default False)

enable SSL mode for Redis nodes in Sentinel configuration

- SINGLE_BEAT_REDIS_SSL_CA_CERTS

(optional) CA certificate (single PEM file) for SSL validation for Redis nodes in Sentinel configuration

- SINGLE_BEAT_REDIS_SSL_CERT_REQS (default "required" if SSL True)

SSL certificate validation mode for Redis nodes in Sentinel configuration.

The options are:
- "none": doesn't require a certificate from the server for authentication.
- "optional": can accept a certificate from the server, but it doesn't require one.
- "required": must receive and validate a certificate from the server

Cli
-------------
Single-beat also has a simple cli, that gives info about where your process is living - also can pause single-beat, restart your process etc.
Expand Down
15 changes: 15 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.black]
target-version = ["py311"]
line-length = 96
include = '\.pyi?$'
exclude = '''
/(
| __pycache__
| .git
| .mypy_cache
| .pytest_cache
| .tox
| .venv
| .vscode
)/
'''
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name='single-beat',
version='0.6.3',
version='0.6.4',
long_description=long_description,
long_description_content_type="text/markdown",
description='ensures only one instance of your process across your servers',
Expand Down
96 changes: 62 additions & 34 deletions singlebeat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,29 @@ def env(identifier, default, type=noop):


class Config(object):
REDIS_SERVER = env('REDIS_SERVER', 'redis://localhost:6379')
REDIS_PASSWORD = env('REDIS_PASSWORD', None)
REDIS_SENTINEL = env('REDIS_SENTINEL', None)
REDIS_SENTINEL_MASTER = env('REDIS_SENTINEL_MASTER', 'mymaster')
REDIS_SENTINEL_DB = env('REDIS_SENTINEL_DB', 0)
REDIS_SENTINEL_PASSWORD = env('REDIS_SENTINEL_PASSWORD', None)
IDENTIFIER = env('IDENTIFIER', None)
LOCK_TIME = env('LOCK_TIME', 5, int)
INITIAL_LOCK_TIME = env('INITIAL_LOCK_TIME', LOCK_TIME * 2, int)
HEARTBEAT_INTERVAL = env('HEARTBEAT_INTERVAL', 1, int)
HOST_IDENTIFIER = env('HOST_IDENTIFIER', socket.gethostname())
LOG_LEVEL = env('LOG_LEVEL', 'warn')
REDIS_SERVER = env("REDIS_SERVER", "redis://localhost:6379")
REDIS_PASSWORD = env("REDIS_PASSWORD", None)
REDIS_SENTINEL = env("REDIS_SENTINEL", None)
REDIS_SENTINEL_MASTER = env("REDIS_SENTINEL_MASTER", "mymaster")
REDIS_SENTINEL_DB = env("REDIS_SENTINEL_DB", 0)
REDIS_SENTINEL_PASSWORD = env("REDIS_SENTINEL_PASSWORD", None)
IDENTIFIER = env("IDENTIFIER", None)
LOCK_TIME = env("LOCK_TIME", 5, int)
INITIAL_LOCK_TIME = env("INITIAL_LOCK_TIME", LOCK_TIME * 2, int)
HEARTBEAT_INTERVAL = env("HEARTBEAT_INTERVAL", 1, int)
HOST_IDENTIFIER = env("HOST_IDENTIFIER", socket.gethostname())
LOG_LEVEL = env("LOG_LEVEL", "warn")
# wait_mode can be, supervisord or heartbeat
WAIT_MODE = env("WAIT_MODE", "heartbeat")
WAIT_BEFORE_DIE = env("WAIT_BEFORE_DIE", 60, int)
# SSL config
REDIS_SSL = env("REDIS_SSL", False, bool)
REDIS_SSL_CA_CERTS = env("REDIS_SSL_CA_CERTS", None)
REDIS_SSL_CERT_REQS = env("REDIS_SSL_CERT_REQS", None)
REDIS_SENTINEL_SSL = env("REDIS_SENTINEL_SSL", False, bool)
REDIS_SENTINEL_SSL_CA_CERTS = env("REDIS_SENTINEL_SSL_CA_CERTS", None)
REDIS_SENTINEL_SSL_CERT_REQS = env("REDIS_SENTINEL_SSL_CERT_REQS", None)

_host_identifier = None

def check(self, cond, message):
Expand All @@ -60,9 +68,17 @@ def checks(self):

def get_redis(self):
if self.REDIS_SENTINEL:
return self._sentinel.master_for(self.REDIS_SENTINEL_MASTER,
password=self.REDIS_PASSWORD,
redis_class=redis.Redis)
redis_kwargs = {}
if self.REDIS_SSL:
redis_kwargs["ssl"] = True
redis_kwargs["ssl_ca_certs"] = self.REDIS_SSL_CA_CERTS or None
redis_kwargs["ssl_cert_reqs"] = self.REDIS_SSL_CERT_REQS or "required"
return self._sentinel.master_for(
self.REDIS_SENTINEL_MASTER,
password=self.REDIS_PASSWORD,
redis_class=redis.Redis,
**redis_kwargs,
)
return self._redis

def rewrite_redis_url(self):
Expand All @@ -82,12 +98,22 @@ def rewrite_redis_url(self):

def __init__(self):
if self.REDIS_SENTINEL:
sentinels = [tuple(s.split(':')) for s in self.REDIS_SENTINEL.split(';')]
self._sentinel = redis.sentinel.Sentinel(sentinels,
db=self.REDIS_SENTINEL_DB,
socket_timeout=0.1,
sentinel_kwargs={"password": self.REDIS_SENTINEL_PASSWORD}
)
sentinels = [tuple(s.split(":")) for s in self.REDIS_SENTINEL.split(";")]
sentinel_kwargs = {
"password": self.REDIS_SENTINEL_PASSWORD,
}
if self.REDIS_SENTINEL_SSL:
sentinel_kwargs["ssl"] = True
sentinel_kwargs["ssl_ca_certs"] = self.REDIS_SENTINEL_SSL_CA_CERTS or None
sentinel_kwargs["ssl_cert_reqs"] = (
self.REDIS_SENTINEL_SSL_CERT_REQS or "required"
)
self._sentinel = redis.sentinel.Sentinel(
sentinels,
db=self.REDIS_SENTINEL_DB,
socket_timeout=0.1,
sentinel_kwargs=sentinel_kwargs,
)
else:
self._redis = redis.Redis.from_url(self.rewrite_redis_url())

Expand All @@ -107,9 +133,7 @@ def get_host_identifier(self):
if self._host_identifier:
return self._host_identifier
local_ip_addr = (
self.get_redis()
.connection_pool.get_connection("ping")
._sock.getsockname()[0]
self.get_redis().connection_pool.get_connection("ping")._sock.getsockname()[0]
)
self._host_identifier = "{}:{}".format(local_ip_addr, os.getpid())
return self._host_identifier
Expand Down Expand Up @@ -209,9 +233,7 @@ async def timer_cb_waiting(self):
return self.ioloop.create_task(self.spawn_process())
# couldn't acquire lock
if config.WAIT_MODE == "supervised":
logger.debug(
"already running, will exit after %s seconds" % config.WAIT_BEFORE_DIE
)
logger.debug("already running, will exit after %s seconds" % config.WAIT_BEFORE_DIE)
time.sleep(config.WAIT_BEFORE_DIE)
sys.exit()

Expand Down Expand Up @@ -257,7 +279,9 @@ async def timer_cb_running(self):
rds.set(
"SINGLE_BEAT_{identifier}".format(identifier=self.identifier),
"{}:{}:{}".format(
self.fence_token, config.HOST_IDENTIFIER, self.process_pid()
self.fence_token,
config.HOST_IDENTIFIER,
self.process_pid(),
),
ex=config.LOCK_TIME,
)
Expand Down Expand Up @@ -312,7 +336,8 @@ def sigterm_handler(self, signum, loop):

if self.state == "RUNNING":
logger.debug(
"already running sending signal to child - %s", self.sprocess.pid
"already running sending signal to child - %s",
self.sprocess.pid,
)
self.sprocess.send_signal(signum)
logger.debug("waiting for subprocess to finish")
Expand All @@ -325,7 +350,7 @@ async def run(self):
await asyncio.sleep(config.HEARTBEAT_INTERVAL)

async def _read_stream(self, stream, cb):
decoder = codecs.getincrementaldecoder('utf-8')(errors='strict')
decoder = codecs.getincrementaldecoder("utf-8")(errors="strict")

while True:
line = await stream.read(100)
Expand All @@ -344,7 +369,7 @@ async def spawn_process(self):
*cmd,
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError:
"""
Expand All @@ -356,8 +381,12 @@ async def spawn_process(self):
try:
await asyncio.wait(
[
asyncio.create_task(self._read_stream(self.sprocess.stdout, self.forward_stdout)),
asyncio.create_task(self._read_stream(self.sprocess.stderr, self.forward_stderr)),
asyncio.create_task(
self._read_stream(self.sprocess.stdout, self.forward_stdout)
),
asyncio.create_task(
self._read_stream(self.sprocess.stderr, self.forward_stderr)
),
]
)
self.child_exit_cb(self.sprocess.returncode)
Expand Down Expand Up @@ -506,4 +535,3 @@ def main():

if __name__ == "__main__":
main()