This repository serves as a template to create new python-telegram-bot applications, their python wrapper over the Telegram API is amazing and enables very smooth programming for bots. It doesn't however provide defaults for persistence, state management and other shortcuts that are necessary for a maintainable and growable software architecture.
This template is mostly meant for projects that start with quite a bit of complexity and whose requirements are going to evolve as time passes.
I made this template to provide an implementation for a few things that I always ended up implementing in my telegram
bot projects, custom ApplicationContext for context.bot_data, context.chat_data, context.user_data typing,
decorators/wrappers/dependency_injection for handlers to cut down on a bit of boilerplate and implement common behaviours. This will take the
mind off technicalities and instead help put your focus where it belongs, on the project.
- Make sure poetry is installed on your system.
- Run:
poetry install
The bot can be run in either production or dev mode. The difference being that the dev mode loads your environment variables
from the .env file in the project and does a complete teardown + buildup of your database to give you a fresh debugging environment every time.
- Run:
poetry run python -m src.main --devor execute themainfunction directly in your debugger which will default todevmode. (make sure the environment is activated by runningpoetry shellfirst)
Production mode runs alembic migrations against your database before starting the bot, make sure the following environment variables are set:
BOT_TOKENyou can get one from BotfatherDB_PATHthe path to your database, relative from where the bot is executing. (I recommend choosing/data/yourdb.sqlite3, as a/datadirectory is automatically created in the Docker container and can be mounted to a persistent volume)FIRST_ADMINyourtelegram_id, this can be set to give you automatically theADMINrole when you register in your own bot
The following are optional:
LOGGING_CHANNELa telegramchat_idthat theErrorForwardercan use to send JSON logs to. Very useful, usually set to a shared channel or your own id.
Finally:
- Execute
./entrypoint.sh
Now that the template is running on SQL, every time your schema changes you will need to run new migrations on your production database to keep up to date. env.py is already set up to read DB_PATH env variable or default to the db.sqlite3 file.
Define your database schema inside of db/tables.py, then to autogenerate the migration run: alembic revision --autogenerate -m "<description>", the migrations
are then applied whenever the application starts through entrypoint.sh.
Lets define a User table inside of tables.py:
from sqlalchemy.orm import Mapped, declarative_base, mapped_column
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, init=False)
telegram_id: Mapped[int] = mapped_column(unique=True, nullable=False, index=True)
is_bot: Mapped[bool] = mapped_column(nullable=False)
full_name: Mapped[str] = mapped_column(nullable=True)
telegram_username: Mapped[str | None] = mapped_column(nullable=True)
"""
Can be hidden due to privacy settings
"""
role: Mapped[UserRole] = mapped_column(nullable=False, default=UserRole.USER)Now lets run alembic revision --autogenerate -m "user table", a wild 247a9e59a9a8_user_table.py just appeared!!
"""user table
Revision ID: b1170ff4029d
Revises:
Create Date: 2024-03-16 10:17:30.936366
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'b1170ff4029d'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('telegram_id', sa.Integer(), nullable=False),
sa.Column('is_bot', sa.Boolean(), nullable=False),
sa.Column('full_name', sa.String(), nullable=True),
sa.Column('telegram_username', sa.String(), nullable=True),
sa.Column('role', sa.Enum('USER', 'ADMIN', name='userrole'), nullable=False),
sa.Column('admin', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
with op.batch_alter_table('users', schema=None) as batch_op:
batch_op.create_index(batch_op.f('ix_users_telegram_id'), ['telegram_id'], unique=True)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('users', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_users_telegram_id'))
op.drop_table('users')
# ### end Alembic commands ###This file contains SQLAlchemy commands to update the schema in your database! You can also add your own custom commands if you feel like it because the ORM does not fullfill all your needs, here is an example how I added a trigger in one of my projects:
- Run:
alembic revision -m "create trigger for product cleanup" - Inside of the
upgradefunction in the generated file add your SQL:
def upgrade() -> None:
op.execute(
"""
DELETE FROM products
WHERE NOT EXISTS (
SELECT 1 FROM alerts
WHERE alerts.product_id = products.id
);
"""
)
op.execute(
"""
CREATE TRIGGER DeleteProductsWithNoAlerts
AFTER DELETE ON alerts
BEGIN
DELETE FROM products
WHERE id = OLD.product_id
AND NOT EXISTS (
SELECT 1 FROM alerts
WHERE alerts.product_id = OLD.product_id
);
END;
"""
)- Inside of the
downgradefunction add the code necessary to get back to the previous schema
def downgrade() -> None:
op.execute("""DROP TRIGGER IF EXISTS DeleteProductsWithNoAlerts;""")The Specific SQL shown in the example is out of scope of this template, but this should showcase how you can tweak the database to your liking!
Often your data goes beyond the constraints of flat tables, and you wish you could just embed some good ol JSON into your database, use pydantic models in your code, and have everything just automatically serialize/deserialize. I took care of the boilerplate for this:
# tables.py
class PydanticType(sa.types.TypeDecorator):
impl = sa.types.JSON
def __init__(self, pydantic_type):
super().__init__()
self.pydantic_type = pydantic_type
def load_dialect_impl(self, dialect):
return dialect.type_descriptor(sa.JSON())
def process_bind_param(self, value, dialect):
return value.model_dump() if value else None
def process_result_value(self, value, dialect):
return self.pydantic_type.model_validate(value) if value else None
# env.py
def render_item(type_, obj, autogen_context):
"""Apply custom rendering for PydanticType."""
if type_ == "type" and isinstance(obj, tables.PydanticType):
return "sa.JSON()"
return FalseThis project comes with a barebone CI pipeline.
- It tests your code using pytest, the same as it would locally with
poetry run python -m pytest - It builds the Docker image
- It pushes the Docker image to a repository
- It deploys the bot to a server over some very bare-bone SSH. (remove
if: falsefrom thedeploy-sshjob)
Set the following Github secrets:
DOCKERHUB_USERNAME: Docker Hub usernameDOCKERHUB_PASSWORD: Docker Hub passwordDOCKERHUB_TARGET: Docker image tag for pushing and pullingSSH_HOST: SSH host for deploymentSSH_USER: SSH username for deploymentSSH_KEY: SSH private key for deploymentSSH_PORT: SSH port for deploymentCONTAINER_NAME: Name for the Docker containerBOT_TOKEN: Token for the botFIRST_ADMIN: Telegram ID of the first admin user you want to createLOGGING_CHANNEL: Channel for logging purposes
The app gets its configuration from environment variables that are defined in the classes
extending pydantic.BaseSettings in settings.py
from pydantic_settings import BaseSettings
class DBSettings(BaseSettings):
DB_PATH: str = "template_app.db"
class TelegramSettings(BaseSettings):
BOT_TOKEN: str
class Settings(TelegramSettings, DBSettings):
pass
settings = Settings()This template moved over to structlog: https://www.structlog.org/en/stable/,
it is configured to log everything through the std logging module and use structlog formatters & processors.
This allows libraries like ptb to still output good logs whilst enabling you to make full use of structlog.
Error logs are sent as JSON inside a codeblock to the designated logging channel.
Now that the app uses dependency injection I cant abort handlers and execute logic when extracing a dependency fails. This
is why I created a global error handler inside of errors.py. All uncaught exceptions just get logged with stacktrace,
you can created designated exceptions like UserNotRegistered to then execute specific logic when you throw them from your
dependency extractors:
async def handle_error(update: Update, context: ApplicationContext):
e = context.error
if not e:
return
match e:
case UserNotRegistered():
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="You are not registered. Please register first with /start",
)
case _:
# Log out the Stacktrace for unhandled exceptions
log.error("Unhandled exception", exc_info=e)When you use python-telegram-bot you have access to 3 shared objects on your context:
context.user_data, this object is shared between all handlers that interact with updates from the same usercontext.chat_data, shared between all updates for the same chatcontext.bot_data, this is shared by all handlers and is useful to keep track of your shared application state
Working with raw dicts is error prone, that's why python-telegram-bot let's you define your own CallbackContext to
replace the usual ContextTypes.DEFAULT.
The boilerplate needed for your ContextTypes is already set up inside of src/bot/common/context.py and comes with batteries included:
class BotData:
_db: async_sessionmaker[AsyncSession]
"""
Your database session factory
"""
_settings: Settings
"""
Application settings
"""
class ChatData:
pass
ConversationState = TypeVar("ConversationState")
class UserData:
_conversation_state: dict[type, Any] = {}
def get_or_init_conversation_state(
self, cls: Type[ConversationState]
) -> ConversationState:
return self._conversation_state.setdefault(cls, cls())
def clean_up_conversation_state(self, conversation_type: Type):
if conversation_type in self._conversation_state:
del self._conversation_state[conversation_type]
class ApplicationContext(CallbackContext[ExtBot, UserData, ChatData, BotData]):
# Define custom @property and utility methods here that interact with your context
@asynccontextmanager
async def session(self):
# If called by a User, check if the user has a SQL session already open
if self.user_data:
if self.user_data._current_session:
yield self.user_data._current_session
else:
try:
async with self.bot_data._db() as session:
self.user_data._current_session = session
yield session
finally:
self.user_data._current_session = None
else:
async with self.bot_data._db() as session:
yield session
@property
def settings(self) -> Settings:
return self.bot_data._settings
context_types = ContextTypes(
context=ApplicationContext, chat_data=ChatData, bot_data=BotData, user_data=UserData
)You will find these classes in the bot.common module in context.py, you can edit the three classes above to define
the state in your application depending on the context, the ApplicationContext class itself is used in the type signature for the context of your handlers and you can also define useful @property and other utility methods on it as well. BotData comes with _db and _settings which are initialized in the on_startup method of the Application.
To open a SQLAlchemy session just do:
async with context.session() as s:
# your DB codeTo make the framework instantiate your custom objects instead of the usual dictionaries they are passed as
a ContextTypes object to your ApplicationBuilder, the template takes care of this. The Application object itself
is build inside of bot.application, that's also where you will need to register your handlers, either in the on_startup method or on the application object.
context_types = ContextTypes(
context=ApplicationContext,
chat_data=ChatData,
bot_data=BotData,
user_data=UserData
)
application: Application = (
ApplicationBuilder()
.token(settings.BOT_TOKEN)
.context_types(context_types)
.arbitrary_callback_data(True)
.post_init(on_startup)
.build()
)In this version of the template I moved away from decorators for the dependency injection and instead started using something more flexible: https://github.com/lancetnik/FastDepends
To use this just annotate your methods with @inject before turning them into handlers:
@command_handler("deez")
async def nuts(
update: Update,
context: ApplicationContext,
session: AsyncSession = Depends(tx)
)Where tx is found inside of extractors.py, to learn more about dependency injection look up the original repo, or just follow the pattern I set.
Note: FastDepends DI can look prettier by using Annotated types, however my Pyright hates it, so I'm not using it.
As you may have noticed, the three State objects that are present in the context have user, chat and global scope. A lot
of logic is implemented inside of ConversationHandler flows and for this custom state-management is needed, usually
inside either chat_data or user_data, as most of these flows in my experience have been on a per-user basis I have
provided a default to achieve this without having to add a new field to your UserData class for every
conversation-flow that you need to implement.
The UserData class comes pre-defined with a dictionary to hold conversation state, the type of the object
itself is used as a key to identify it, this necessitates that for a conversation state type T there is at most 1
active conversation per user that uses this type for its state.
To avoid leaking memory this object needs to be cleared from the dictionary when you are done with it, this happens automatically in the dependency injection extractor:
def ConversationState(t: type, clear: bool = False):
def extract_state(context: ApplicationContext):
try:
yield context.user_data.get_or_init_conversation_state(t)
if clear:
context.user_data.clean_up_conversation_state(t)
except Exception as e:
context.user_data.clean_up_conversation_state(t)
raise e
return Depends(extract_state)Using ConversationState(T) as the type for your dependency injection handles initialization of your default state object for that conversation.
Unhandled exceptions make sure to clear the state. And if clear = True, which you should set in the last step of your conversation, state will be cleared.
For example, let's define an entry point handler and an exit method for a conversation flow where a user needs to follow
multiple steps to fill up a OrderRequest object. (I will ignore the implementation details for
a ConversationHandler, if you want to see a good example of how this works
click here)
@inject
async def start_order_request(
update: Update,
context: ApplicationContext,
order_request: OrderRequest = ConversationState(OrderRequest)
):
...
@inject
async def add_item(
update: Update,
context: ApplicationContext,
order_request: OrderRequest = ConversationState(OrderRequest)
):
...
@inject
async def file_order(
update: Update,
context: ApplicationContext,
order_request: OrderRequest = ConversationState(OrderRequest)
):
# Complete the order, persist to database, send messages, etc...
...def delete_message_after(f: Callable[[Update, ApplicationContext], Awaitable[Any]]):
@wraps(f)
async def wrapper(update: Update, context: ApplicationContext):
result = await f(update, context)
try:
await context.bot.delete_message(
message_id=update.effective_message.id,
chat_id=update.effective_chat.id
)
finally:
return result
return wrapperThis decorator ensures your handler tries to delete the message after finishing the
logic, update.effective_message.delete() from time to time throws exceptions even when it shouldn't, as
does bot.delete_message, this decorator is a easy and safe way to abstract this away and make sure you tried your best
to delete that message.
Arbitrary callback data is an awesome feature of python-telegram-bot, it increases security of your application ( callback-queries are generated on the client-side and can contain malicious payloads) and makes your development workflow easier.
Since the smoothest interactions are through inline keyboards your application will be full of CallbackQueryHandler
flows. The problem is that callback_data does not provide a type hint for your objects, making you write the same code
over and over again to satisfy the type checker and get type hints:
async def sample_handler(update: Update, context: ApplicationContext):
my_data = cast(CustomData, context.callback_data)
... # do stuff
await update.callback_query.answer()
# if you want you can also clear your callback data from your cacheNow with dependency Injection:
@inject
async def sample_handler(
update: Update,
context: ApplicationContext,
my_data: CustomData = CallbackQuery(CustomData)
):
... # do stuffI would recommend you keep your code loosely coupled and keep cohesion high, separate your modules by feature:
├── src
│ ├── bot
│ │ ├── application.py
│ │ ├── common
│ │ │ ├── callback.py
│ │ │ ├── context.py
│ │ │ └── wrappers.py
│ │ │ └── conversation.py
│ ├── orders
│ │ │ ├── conversations
│ │ │ │ ├── create_order.py
│ │ │ │ ├── edit_order.py
│ │ │ ├── models.py
│ │ │ ├── queries.py
│ │ │ ├── handlers.py
│ ├── db
│ │ ├── config.py
│ │ ├── tables.py
│ ├── main.py
│ ├── errors.py
│ ├── extractors.py
│ ├── resources
│ └── settings.py
└── tests
└── __init__.py
I added a folder orders that could represent a way to add a feature to interact with orders:
handlers.pyis where you define the handlers needed to interact with this module through the telegram api, export a list of handlers that you import inapplication.pyand then add to theApplicationobject throughadd_handlers(). This list of handlers has to contain all the handlers of the modulequeries.pyif you need more than just simple queries and want to move them, create function that take anAsyncSessionas an argument and execute your database logic.conversationscontains a file for everyConversationHandlerthe module defines, since it takes a lot of code to define a single conversation, with it's states, state-management, fallbacks etc. a single file for every conversation flow seems okay.
These are just examples how the structure could look like.
def command_handler(command: str, *, allow_group: bool = False):
def inner_decorator(f: Callable[[Update, ApplicationContext], Coroutine[Any, Any, RT]]) -> CommandHandler:
return CommandHandler(
filters=None if allow_group else filters.ChatType.PRIVATE,
command=command,
callback=f
)
return inner_decoratorShortcut to create command handlers, by default they are set to only work in private chats and have to be explicitly activated for group chats.
After programming bots for a while I always found myself using the same pattern to define actions on my entities:
class DeleteItem(BaseModel):
item: Item
delete_item_button = InlineKeyboardButton(
text="❌ DELETE ITEM ❌",
callback_data=DeleteItem(item=my_item)
)
reply_markup = InlineKeyboardMarkup([
[delete_item_button]
])This would create a menu with a single button, but you can imagine that there could be more, each one with its own class
for the action it represents. So I came up with this class that turns itself into a button or a single keyboard (found
myself often making single-row keyboards), I reference __class__.__name__ to derive the button text and surround it
with an emoji if provided, turning a class like EDIT_ITEM into either EDIT or EDIT ITEM buttons.
class CallbackButton(BaseModel):
def to_short_button(self, *, emoji: Optional[str] = None) -> InlineKeyboardButton:
text = self.__class__.__name__.split("_")[0]
if emoji:
text = f"{emoji} {text} {emoji}"
return InlineKeyboardButton(text=text, callback_data=self)
def to_button(self, *, text: Optional[str] = None, emoji: Optional[str]) -> InlineKeyboardButton:
if text is None:
text = (' ').join(self.__class__.__name__.split("_"))
if emoji:
text = f"{emoji} {text} {emoji}"
return InlineKeyboardButton(text=text, callback_data=self)
def to_keyboard(
self,
*,
text: Optional[str] = None,
emoji: Optional[str] = None
) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup([
[self.to_button(text=text, emoji=emoji)]
])Now we can rewrite the block before as:
class DELETE_ITEM(CallbackButton):
item: Item
reply_markup = DELETE_ITEM(item=item).to_keyboard()Now that we have an action we would define it's CallbackQueryHandler using the decorator I showed before:
@arbitrary_callback_query_handler(DELETE_ITEM)
@inject
async def delete_item(
update: Update,
context: ApplicationContext,
action: DELETE_ITEM = CallbackQuery(DELETE_ITEM)
):
...I don't like how verbose building a ConversationHandler currently is, that is why I created a builder for it:
builder = ConversationBuilder(conversation_timeout=69)
FIRST_STATE, SECOND_STATE = range(0)
@builder.entry_point
@command_handler("trigger")
async def entrypoint(update: Update, context: ApplicationContext):
pass
@builder.state(FIRST_STATE)
@any_message
async def do_something(update: Update, context: ApplicationContext):
pass
handler = builder.build()This builds a ConversationHandler that enters on command "trigger" and on the state FIRST_STATE executes do_something. For now you still need to return the expected states from your handlers as per documentation, however in the future I want to add decorators like @next_state(SECOND_STATE) so one can't forget.