-
|
I have created a new migration file using the command: Now I go to apply the migration using: This is my import asyncio
from typing import TYPE_CHECKING
from advanced_alchemy.base import metadata_registry
from alembic import context
from alembic.autogenerate import rewriter
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine, async_engine_from_config
if TYPE_CHECKING:
from advanced_alchemy.alembic.commands import AlembicCommandConfig
from sqlalchemy.engine import Connection
__all__ = ("do_run_migrations", "run_migrations_offline", "run_migrations_online")
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config: "AlembicCommandConfig" = context.config # type: ignore
writer = rewriter.Rewriter()
target_metadata = metadata_registry.get(config.bind_key)
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
context.configure(
url=config.db_url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named", "foreign_keys": True},
transaction_per_migration=True,
include_schemas=True,
compare_type=config.compare_type,
compare_server_default=True,
version_table=config.version_table_name,
version_table_pk=config.version_table_pk,
user_module_prefix=config.user_module_prefix,
render_as_batch=config.render_as_batch,
process_revision_directives=writer,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: "Connection") -> None:
"""Run migrations."""
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=config.compare_type,
compare_server_default=True,
include_schemas=True,
dialect_opts={"paramstyle": "named", "foreign_keys": True},
transaction_per_migration=True,
version_table=config.version_table_name,
version_table_pk=config.version_table_pk,
user_module_prefix=config.user_module_prefix,
render_as_batch=config.render_as_batch,
process_revision_directives=writer,
)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine and associate a
connection with the context.
Raises:
RuntimeError: If the engine cannot be created from the config.
"""
configuration = config.get_section(config.config_ini_section) or {}
configuration["sqlalchemy.url"] = config.db_url
connectable = config.engine or async_engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
future=True,
)
if connectable is None: # pyright: ignore[reportUnnecessaryComparison]
raise RuntimeError(
"Could not get engine from config. Please ensure your `alembic.ini` according to the official Alembic documentation.",
)
try:
if isinstance(connectable, AsyncEngine):
async with connectable.begin() as conn:
await conn.run_sync(do_run_migrations)
else:
with connectable.begin() as conn:
do_run_migrations(conn)
finally:
if isinstance(connectable, AsyncEngine):
await connectable.dispose()
else:
connectable.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())I've tried to debug the error and it doesn't make sense how that assertion could be throwing an error. The sequence of executed lines of code that are called to reach that assertion already checks if a transaction is active: _in_connection_transaction = self._in_connection_transaction()
''' the above expands to:
_in_connection_transaction = self._transaction is not None and self._transaction.is_active
'''
if self.impl.transactional_ddl and self.as_sql:
self.impl.emit_commit()
elif _in_connection_transaction:
assert self._transaction is not None # <-- error occurs here |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
|
After some debugging, caught the mistake in try:
if isinstance(connectable, AsyncEngine):
async with connectable.connect() as conn: # <-- here
await conn.run_sync(do_run_migrations)
else:
with connectable.connect() as conn: # <-- and here
do_run_migrations(conn)
finally:
if isinstance(connectable, AsyncEngine):
await connectable.dispose()
else:
connectable.dispose()With the previous code, it starts a transaction before the migration starts, which causes the final code that executes in the migration script to fail because a new transaction wasn't started. This could very well be a bug. |
Beta Was this translation helpful? Give feedback.
After some debugging, caught the mistake in
env.py:With the previous code, it starts a transaction before the migration starts, which causes the final code that executes in the migration script to fail because a new transaction wasn't started. This could very well be a bug.