diff --git a/.github/workflows/black.yml b/.github/workflows/black.yml deleted file mode 100644 index facd0e2..0000000 --- a/.github/workflows/black.yml +++ /dev/null @@ -1,12 +0,0 @@ -name: Lint - -on: [push, pull_request] - -jobs: - lint: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: psf/black@stable - with: - options: "--check --diff --exclude=migrations" \ No newline at end of file diff --git a/.github/workflows/ruff.yaml b/.github/workflows/ruff.yaml new file mode 100644 index 0000000..00cd0a9 --- /dev/null +++ b/.github/workflows/ruff.yaml @@ -0,0 +1,10 @@ +name: Ruff +on: [pull_request] +jobs: + ruff: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1 + - uses: astral-sh/ruff-action@d0a0e814ec17e92d33be7d24dd922b479f1bcd38 # v1.1.1 + with: + changed-files: "true" diff --git a/migrations/env.py b/migrations/env.py index b25165d..6f5f53a 100644 --- a/migrations/env.py +++ b/migrations/env.py @@ -1,95 +1,93 @@ -from __future__ import with_statement - -import logging -from logging.config import fileConfig - -from flask import current_app - -from alembic import context - -# this is the Alembic Config object, which provides -# access to the values within the .ini file in use. -config = context.config - -# Interpret the config file for Python logging. -# This line sets up loggers basically. -fileConfig(config.config_file_name) -logger = logging.getLogger("alembic.env") - -# add your model's MetaData object here -# for 'autogenerate' support -# from myapp import mymodel -# target_metadata = mymodel.Base.metadata -config.set_main_option( - "sqlalchemy.url", - str(current_app.extensions["migrate"].db.get_engine().url).replace("%", "%%"), -) -target_db = current_app.extensions["migrate"].db - -# other values from the config, defined by the needs of env.py, -# can be acquired: -# my_important_option = config.get_main_option("my_important_option") -# ... etc. - - -def get_metadata(): - if hasattr(target_db, "metadatas"): - return target_db.metadatas[None] - return target_db.metadata - - -def run_migrations_offline(): - """Run migrations in 'offline' mode. - - This configures the context with just a URL - and not an Engine, though an Engine is acceptable - here as well. By skipping the Engine creation - we don't even need a DBAPI to be available. - - Calls to context.execute() here emit the given string to the - script output. - - """ - url = config.get_main_option("sqlalchemy.url") - context.configure(url=url, target_metadata=get_metadata(), literal_binds=True) - - with context.begin_transaction(): - context.run_migrations() - - -def run_migrations_online(): - """Run migrations in 'online' mode. - - In this scenario we need to create an Engine - and associate a connection with the context. - - """ - - # this callback is used to prevent an auto-migration from being generated - # when there are no changes to the schema - # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html - def process_revision_directives(context, revision, directives): - if getattr(config.cmd_opts, "autogenerate", False): - script = directives[0] - if script.upgrade_ops.is_empty(): - directives[:] = [] - logger.info("No changes in schema detected.") - - connectable = current_app.extensions["migrate"].db.get_engine() - - with connectable.connect() as connection: - context.configure( - connection=connection, - target_metadata=get_metadata(), - process_revision_directives=process_revision_directives, - **current_app.extensions["migrate"].configure_args - ) - - with context.begin_transaction(): - context.run_migrations() - - -if context.is_offline_mode(): - run_migrations_offline() -else: - run_migrations_online() + +import logging +from logging.config import fileConfig + +from alembic import context +from flask import current_app + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +fileConfig(config.config_file_name) +logger = logging.getLogger("alembic.env") + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +config.set_main_option( + "sqlalchemy.url", + str(current_app.extensions["migrate"].db.get_engine().url).replace("%", "%%"), +) +target_db = current_app.extensions["migrate"].db + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def get_metadata(): + if hasattr(target_db, "metadatas"): + return target_db.metadatas[None] + return target_db.metadata + + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure(url=url, target_metadata=get_metadata(), literal_binds=True) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + + # this callback is used to prevent an auto-migration from being generated + # when there are no changes to the schema + # reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html + def process_revision_directives(context, revision, directives): + if getattr(config.cmd_opts, "autogenerate", False): + script = directives[0] + if script.upgrade_ops.is_empty(): + directives[:] = [] + logger.info("No changes in schema detected.") + + connectable = current_app.extensions["migrate"].db.get_engine() + + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=get_metadata(), + process_revision_directives=process_revision_directives, + **current_app.extensions["migrate"].configure_args, + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/migrations/versions/073aa9bc98d2_email_as_signin.py b/migrations/versions/073aa9bc98d2_email_as_signin.py index 90a8b4a..a0e0a49 100644 --- a/migrations/versions/073aa9bc98d2_email_as_signin.py +++ b/migrations/versions/073aa9bc98d2_email_as_signin.py @@ -1,40 +1,36 @@ -"""Email as signin - -Revision ID: 073aa9bc98d2 -Revises: 235dba32858b -Create Date: 2023-09-16 12:24:28.367099 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '073aa9bc98d2' -down_revision = '235dba32858b' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.alter_column('email', - existing_type=sa.VARCHAR(), - nullable=False) - batch_op.create_unique_constraint(None, ['email']) - batch_op.drop_column('username') - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.add_column(sa.Column('username', sa.VARCHAR(), nullable=False)) - batch_op.drop_constraint(None, type_='unique') - batch_op.alter_column('email', - existing_type=sa.VARCHAR(), - nullable=True) - - # ### end Alembic commands ### +"""Email as signin + +Revision ID: 073aa9bc98d2 +Revises: 235dba32858b +Create Date: 2023-09-16 12:24:28.367099 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "073aa9bc98d2" +down_revision = "235dba32858b" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.alter_column("email", existing_type=sa.VARCHAR(), nullable=False) + batch_op.create_unique_constraint(None, ["email"]) + batch_op.drop_column("username") + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.add_column(sa.Column("username", sa.VARCHAR(), nullable=False)) + batch_op.drop_constraint(None, type_="unique") + batch_op.alter_column("email", existing_type=sa.VARCHAR(), nullable=True) + + # ### end Alembic commands ### diff --git a/migrations/versions/235dba32858b_sqlalchemy_2.0.py b/migrations/versions/235dba32858b_sqlalchemy_2.0.py index e08bcd3..dbf94fd 100644 --- a/migrations/versions/235dba32858b_sqlalchemy_2.0.py +++ b/migrations/versions/235dba32858b_sqlalchemy_2.0.py @@ -1,160 +1,152 @@ -"""SQLAlchemy 2.0 - -Revision ID: 235dba32858b -Revises: 7a297468f586 -Create Date: 2023-01-26 23:01:44.635683 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = "235dba32858b" -down_revision = "7a297468f586" -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table("active", schema=None) as batch_op: - batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=False) - batch_op.alter_column("event_id", existing_type=sa.INTEGER(), nullable=False) - batch_op.alter_column( - "start", - existing_type=sa.DATETIME(), - nullable=False, - existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), - ) - - with op.batch_alter_table("badge_awards", schema=None) as batch_op: - batch_op.alter_column( - "received", - existing_type=sa.DATETIME(), - nullable=False, - existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), - ) - - with op.batch_alter_table("badges", schema=None) as batch_op: - batch_op.alter_column("color", existing_type=sa.VARCHAR(), nullable=False) - - with op.batch_alter_table("event_types", schema=None) as batch_op: - batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=False) - batch_op.alter_column("description", existing_type=sa.VARCHAR(), nullable=False) - - with op.batch_alter_table("events", schema=None) as batch_op: - batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=False) - batch_op.alter_column("description", existing_type=sa.VARCHAR(), nullable=False) - batch_op.alter_column("code", existing_type=sa.VARCHAR(), nullable=False) - batch_op.alter_column("location", existing_type=sa.VARCHAR(), nullable=False) - batch_op.alter_column("type_id", existing_type=sa.INTEGER(), nullable=False) - batch_op.alter_column( - "registration_open", - existing_type=sa.BOOLEAN(), - nullable=False, - existing_server_default=sa.text("'false'"), - ) - - with op.batch_alter_table("guardians", schema=None) as batch_op: - batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=False) - batch_op.alter_column( - "contact_order", existing_type=sa.INTEGER(), nullable=False - ) - - with op.batch_alter_table("stamps", schema=None) as batch_op: - batch_op.alter_column( - "user_id", existing_type=sa.VARCHAR(), type_=sa.Integer(), nullable=False - ) - batch_op.alter_column("event_id", existing_type=sa.INTEGER(), nullable=False) - batch_op.alter_column("start", existing_type=sa.DATETIME(), nullable=False) - batch_op.alter_column( - "end", - existing_type=sa.DATETIME(), - nullable=False, - existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), - ) - - with op.batch_alter_table("students", schema=None) as batch_op: - batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=False) - batch_op.alter_column( - "graduation_year", existing_type=sa.INTEGER(), nullable=False - ) - - with op.batch_alter_table("users", schema=None) as batch_op: - batch_op.alter_column("role_id", existing_type=sa.INTEGER(), nullable=False) - batch_op.alter_column("approved", existing_type=sa.BOOLEAN(), nullable=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table("users", schema=None) as batch_op: - batch_op.alter_column("approved", existing_type=sa.BOOLEAN(), nullable=True) - batch_op.alter_column("role_id", existing_type=sa.INTEGER(), nullable=True) - - with op.batch_alter_table("students", schema=None) as batch_op: - batch_op.alter_column( - "graduation_year", existing_type=sa.INTEGER(), nullable=True - ) - batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=True) - - with op.batch_alter_table("stamps", schema=None) as batch_op: - batch_op.alter_column( - "end", - existing_type=sa.DATETIME(), - nullable=True, - existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), - ) - batch_op.alter_column("start", existing_type=sa.DATETIME(), nullable=True) - batch_op.alter_column("event_id", existing_type=sa.INTEGER(), nullable=True) - batch_op.alter_column( - "user_id", existing_type=sa.Integer(), type_=sa.VARCHAR(), nullable=True - ) - - with op.batch_alter_table("guardians", schema=None) as batch_op: - batch_op.alter_column( - "contact_order", existing_type=sa.INTEGER(), nullable=True - ) - batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=True) - - with op.batch_alter_table("events", schema=None) as batch_op: - batch_op.alter_column( - "registration_open", - existing_type=sa.BOOLEAN(), - nullable=True, - existing_server_default=sa.text("'false'"), - ) - batch_op.alter_column("type_id", existing_type=sa.INTEGER(), nullable=True) - batch_op.alter_column("location", existing_type=sa.VARCHAR(), nullable=True) - batch_op.alter_column("code", existing_type=sa.VARCHAR(), nullable=True) - batch_op.alter_column("description", existing_type=sa.VARCHAR(), nullable=True) - batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=True) - - with op.batch_alter_table("event_types", schema=None) as batch_op: - batch_op.alter_column("description", existing_type=sa.VARCHAR(), nullable=True) - batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=True) - - with op.batch_alter_table("badges", schema=None) as batch_op: - batch_op.alter_column("color", existing_type=sa.VARCHAR(), nullable=True) - - with op.batch_alter_table("badge_awards", schema=None) as batch_op: - batch_op.alter_column( - "received", - existing_type=sa.DATETIME(), - nullable=True, - existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), - ) - - with op.batch_alter_table("active", schema=None) as batch_op: - batch_op.alter_column( - "start", - existing_type=sa.DATETIME(), - nullable=True, - existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), - ) - batch_op.alter_column("event_id", existing_type=sa.INTEGER(), nullable=True) - batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=True) - - # ### end Alembic commands ### +"""SQLAlchemy 2.0 + +Revision ID: 235dba32858b +Revises: 7a297468f586 +Create Date: 2023-01-26 23:01:44.635683 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "235dba32858b" +down_revision = "7a297468f586" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("active", schema=None) as batch_op: + batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=False) + batch_op.alter_column("event_id", existing_type=sa.INTEGER(), nullable=False) + batch_op.alter_column( + "start", + existing_type=sa.DATETIME(), + nullable=False, + existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), + ) + + with op.batch_alter_table("badge_awards", schema=None) as batch_op: + batch_op.alter_column( + "received", + existing_type=sa.DATETIME(), + nullable=False, + existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), + ) + + with op.batch_alter_table("badges", schema=None) as batch_op: + batch_op.alter_column("color", existing_type=sa.VARCHAR(), nullable=False) + + with op.batch_alter_table("event_types", schema=None) as batch_op: + batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=False) + batch_op.alter_column("description", existing_type=sa.VARCHAR(), nullable=False) + + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=False) + batch_op.alter_column("description", existing_type=sa.VARCHAR(), nullable=False) + batch_op.alter_column("code", existing_type=sa.VARCHAR(), nullable=False) + batch_op.alter_column("location", existing_type=sa.VARCHAR(), nullable=False) + batch_op.alter_column("type_id", existing_type=sa.INTEGER(), nullable=False) + batch_op.alter_column( + "registration_open", + existing_type=sa.BOOLEAN(), + nullable=False, + existing_server_default=sa.text("'false'"), + ) + + with op.batch_alter_table("guardians", schema=None) as batch_op: + batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=False) + batch_op.alter_column("contact_order", existing_type=sa.INTEGER(), nullable=False) + + with op.batch_alter_table("stamps", schema=None) as batch_op: + batch_op.alter_column( + "user_id", existing_type=sa.VARCHAR(), type_=sa.Integer(), nullable=False + ) + batch_op.alter_column("event_id", existing_type=sa.INTEGER(), nullable=False) + batch_op.alter_column("start", existing_type=sa.DATETIME(), nullable=False) + batch_op.alter_column( + "end", + existing_type=sa.DATETIME(), + nullable=False, + existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), + ) + + with op.batch_alter_table("students", schema=None) as batch_op: + batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=False) + batch_op.alter_column("graduation_year", existing_type=sa.INTEGER(), nullable=False) + + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.alter_column("role_id", existing_type=sa.INTEGER(), nullable=False) + batch_op.alter_column("approved", existing_type=sa.BOOLEAN(), nullable=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.alter_column("approved", existing_type=sa.BOOLEAN(), nullable=True) + batch_op.alter_column("role_id", existing_type=sa.INTEGER(), nullable=True) + + with op.batch_alter_table("students", schema=None) as batch_op: + batch_op.alter_column("graduation_year", existing_type=sa.INTEGER(), nullable=True) + batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=True) + + with op.batch_alter_table("stamps", schema=None) as batch_op: + batch_op.alter_column( + "end", + existing_type=sa.DATETIME(), + nullable=True, + existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), + ) + batch_op.alter_column("start", existing_type=sa.DATETIME(), nullable=True) + batch_op.alter_column("event_id", existing_type=sa.INTEGER(), nullable=True) + batch_op.alter_column( + "user_id", existing_type=sa.Integer(), type_=sa.VARCHAR(), nullable=True + ) + + with op.batch_alter_table("guardians", schema=None) as batch_op: + batch_op.alter_column("contact_order", existing_type=sa.INTEGER(), nullable=True) + batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=True) + + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.alter_column( + "registration_open", + existing_type=sa.BOOLEAN(), + nullable=True, + existing_server_default=sa.text("'false'"), + ) + batch_op.alter_column("type_id", existing_type=sa.INTEGER(), nullable=True) + batch_op.alter_column("location", existing_type=sa.VARCHAR(), nullable=True) + batch_op.alter_column("code", existing_type=sa.VARCHAR(), nullable=True) + batch_op.alter_column("description", existing_type=sa.VARCHAR(), nullable=True) + batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=True) + + with op.batch_alter_table("event_types", schema=None) as batch_op: + batch_op.alter_column("description", existing_type=sa.VARCHAR(), nullable=True) + batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=True) + + with op.batch_alter_table("badges", schema=None) as batch_op: + batch_op.alter_column("color", existing_type=sa.VARCHAR(), nullable=True) + + with op.batch_alter_table("badge_awards", schema=None) as batch_op: + batch_op.alter_column( + "received", + existing_type=sa.DATETIME(), + nullable=True, + existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), + ) + + with op.batch_alter_table("active", schema=None) as batch_op: + batch_op.alter_column( + "start", + existing_type=sa.DATETIME(), + nullable=True, + existing_server_default=sa.text("(CURRENT_TIMESTAMP)"), + ) + batch_op.alter_column("event_id", existing_type=sa.INTEGER(), nullable=True) + batch_op.alter_column("user_id", existing_type=sa.INTEGER(), nullable=True) + + # ### end Alembic commands ### diff --git a/migrations/versions/7a297468f586_initial_migration.py b/migrations/versions/7a297468f586_initial_migration.py index 8107cd2..c7e35c8 100644 --- a/migrations/versions/7a297468f586_initial_migration.py +++ b/migrations/versions/7a297468f586_initial_migration.py @@ -1,70 +1,60 @@ -"""Initial Migration - -Revision ID: 7a297468f586 -Revises: -Create Date: 2022-12-02 22:16:52.739438 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = "7a297468f586" -down_revision = None -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table("account_types", schema=None) as batch_op: - batch_op.add_column( - sa.Column( - "receives_funds", sa.Boolean(), nullable=False, server_default="false" - ) - ) - - with op.batch_alter_table("badges", schema=None) as batch_op: - batch_op.add_column(sa.Column("emoji", sa.String(), nullable=True)) - - with op.batch_alter_table("events", schema=None) as batch_op: - batch_op.add_column( - sa.Column( - "registration_open", sa.Boolean(), nullable=True, server_default="false" - ) - ) - batch_op.add_column( - sa.Column("funds", sa.Integer(), nullable=False, server_default="0") - ) - batch_op.add_column( - sa.Column("cost", sa.Integer(), nullable=False, server_default="0") - ) - batch_op.add_column( - sa.Column("overhead", sa.Float(), nullable=False, server_default="0.5") - ) - batch_op.alter_column("start", existing_type=sa.DATETIME(), nullable=False) - batch_op.alter_column("end", existing_type=sa.DATETIME(), nullable=False) - batch_op.drop_column("enabled") - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table("events", schema=None) as batch_op: - batch_op.add_column(sa.Column("enabled", sa.BOOLEAN(), nullable=False)) - batch_op.alter_column("end", existing_type=sa.DATETIME(), nullable=True) - batch_op.alter_column("start", existing_type=sa.DATETIME(), nullable=True) - batch_op.drop_column("overhead") - batch_op.drop_column("cost") - batch_op.drop_column("funds") - batch_op.drop_column("registration_open") - - with op.batch_alter_table("badges", schema=None) as batch_op: - batch_op.drop_column("emoji") - - with op.batch_alter_table("account_types", schema=None) as batch_op: - batch_op.drop_column("receives_funds") - - # ### end Alembic commands ### +"""Initial Migration + +Revision ID: 7a297468f586 +Revises: +Create Date: 2022-12-02 22:16:52.739438 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "7a297468f586" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("account_types", schema=None) as batch_op: + batch_op.add_column( + sa.Column("receives_funds", sa.Boolean(), nullable=False, server_default="false") + ) + + with op.batch_alter_table("badges", schema=None) as batch_op: + batch_op.add_column(sa.Column("emoji", sa.String(), nullable=True)) + + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.add_column( + sa.Column("registration_open", sa.Boolean(), nullable=True, server_default="false") + ) + batch_op.add_column(sa.Column("funds", sa.Integer(), nullable=False, server_default="0")) + batch_op.add_column(sa.Column("cost", sa.Integer(), nullable=False, server_default="0")) + batch_op.add_column(sa.Column("overhead", sa.Float(), nullable=False, server_default="0.5")) + batch_op.alter_column("start", existing_type=sa.DATETIME(), nullable=False) + batch_op.alter_column("end", existing_type=sa.DATETIME(), nullable=False) + batch_op.drop_column("enabled") + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.add_column(sa.Column("enabled", sa.BOOLEAN(), nullable=False)) + batch_op.alter_column("end", existing_type=sa.DATETIME(), nullable=True) + batch_op.alter_column("start", existing_type=sa.DATETIME(), nullable=True) + batch_op.drop_column("overhead") + batch_op.drop_column("cost") + batch_op.drop_column("funds") + batch_op.drop_column("registration_open") + + with op.batch_alter_table("badges", schema=None) as batch_op: + batch_op.drop_column("emoji") + + with op.batch_alter_table("account_types", schema=None) as batch_op: + batch_op.drop_column("receives_funds") + + # ### end Alembic commands ### diff --git a/migrations/versions/7bafa58010d4_email_as_signin_part_2.py b/migrations/versions/7bafa58010d4_email_as_signin_part_2.py index 5c62f9f..b75e5da 100644 --- a/migrations/versions/7bafa58010d4_email_as_signin_part_2.py +++ b/migrations/versions/7bafa58010d4_email_as_signin_part_2.py @@ -1,38 +1,37 @@ -"""Email as signin part 2 - -Revision ID: 7bafa58010d4 -Revises: 073aa9bc98d2 -Create Date: 2023-09-16 12:28:33.578066 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '7bafa58010d4' -down_revision = '073aa9bc98d2' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('events', schema=None) as batch_op: - batch_op.create_unique_constraint(batch_op.f('uq_events_code'), ['code']) - - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.create_unique_constraint(batch_op.f('uq_users_code'), ['code']) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.drop_constraint(batch_op.f('uq_users_code'), type_='unique') - - with op.batch_alter_table('events', schema=None) as batch_op: - batch_op.drop_constraint(batch_op.f('uq_events_code'), type_='unique') - - # ### end Alembic commands ### +"""Email as signin part 2 + +Revision ID: 7bafa58010d4 +Revises: 073aa9bc98d2 +Create Date: 2023-09-16 12:28:33.578066 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "7bafa58010d4" +down_revision = "073aa9bc98d2" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.create_unique_constraint(batch_op.f("uq_events_code"), ["code"]) + + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.create_unique_constraint(batch_op.f("uq_users_code"), ["code"]) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.drop_constraint(batch_op.f("uq_users_code"), type_="unique") + + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.drop_constraint(batch_op.f("uq_events_code"), type_="unique") + + # ### end Alembic commands ### diff --git a/migrations/versions/9b0e597da71c_pronouns.py b/migrations/versions/9b0e597da71c_pronouns.py index 690333e..78a4434 100644 --- a/migrations/versions/9b0e597da71c_pronouns.py +++ b/migrations/versions/9b0e597da71c_pronouns.py @@ -1,32 +1,38 @@ -"""Pronouns - -Revision ID: 9b0e597da71c -Revises: 7bafa58010d4 -Create Date: 2023-09-16 12:34:12.079018 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '9b0e597da71c' -down_revision = '7bafa58010d4' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.add_column(sa.Column('pronouns', sa.Enum('He_Him', 'She_Her', 'They_Them', 'He_They', 'She_They', name='pronoun'), nullable=True)) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('users', schema=None) as batch_op: - batch_op.drop_column('pronouns') - - # ### end Alembic commands ### +"""Pronouns + +Revision ID: 9b0e597da71c +Revises: 7bafa58010d4 +Create Date: 2023-09-16 12:34:12.079018 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "9b0e597da71c" +down_revision = "7bafa58010d4" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "pronouns", + sa.Enum("He_Him", "She_Her", "They_Them", "He_They", "She_They", name="pronoun"), + nullable=True, + ) + ) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.drop_column("pronouns") + + # ### end Alembic commands ### diff --git a/signinapp/__init__.py b/signinapp/__init__.py index b512787..95f0345 100644 --- a/signinapp/__init__.py +++ b/signinapp/__init__.py @@ -1,322 +1,311 @@ -#!/usr/bin/env python - -import datetime -import locale -import os -import zoneinfo - -import flask_excel as excel -import yaml -from flask import Flask, render_template -from flask_assets import Bundle, Environment -from flask_bootstrap import Bootstrap5 -from flask_migrate import Migrate -from sqlalchemy.future import select - -from . import ( - active, - admin, - auth, - badge, - dbadmin, - event, - events, - proxy, - qr, - search, - team, - user, - finance, -) -from .auth import login_manager -from .jobs import scheduler -from .model import ( - Active, - Badge, - Event, - EventType, - Guardian, - Role, - Student, - Subteam, - User, - db, -) - -locale.setlocale(locale.LC_ALL, "") - -app = Flask(__name__) - - -class Config(object): - TITLE = "Chop Shop Sign In" - BLURB = """ -We are Merrimack High School FIRST Robotics Competition Team 166, Chop Shop, from Merrimack, New Hampshire. -Our mission is to build teamwork and a great robot, along with fostering a love for Science, Technology, Engineering, and Mathematics.""".strip() - DB_NAME = "signin.db" - TIME_ZONE = "America/New_York" - SECRET_KEY = "1234" - PRE_EVENT_ACTIVE_TIME = 30 - POST_EVENT_ACTIVE_TIME = 120 - AUTO_SIGNOUT_BEHAVIOR = "None" # Valid Options (Credit, Discard, None) - PROXY_URL = "http://localhost:8080/kanboard/" - - -class DebugConfig(Config): - TITLE = "Chop Shop Sign In (debug)" - DB_NAME = ":memory:" - - -# First load the default config... -if app.config["DEBUG"]: - app.config.from_object(DebugConfig) -else: - app.config.from_object(Config) -# ...then load the config file if it exists... -rv = os.environ.get("CSSIGNIN_CONFIG") -if rv: - app.config.from_file(rv, load=yaml.safe_load, silent=True) -# ...then load from environment variables -app.config.from_prefixed_env() - -# Now validate the config -assert app.config["TITLE"], "Invalid title given in config" -assert ( - app.config["TIME_ZONE"] in zoneinfo.available_timezones() -), "Invalid time zone given in config" -assert ( - app.config["PRE_EVENT_ACTIVE_TIME"] >= 0 -), "Invalid pre active time given in config" -assert ( - app.config["POST_EVENT_ACTIVE_TIME"] >= 0 -), "Invalid post active time given in config" -assert app.config["AUTO_SIGNOUT_BEHAVIOR"] in ( - "Credit", - "Discard", - "None", -), "Invalid sign out behavior given in config" - -app.config.setdefault("SQLALCHEMY_DATABASE_URI", "sqlite:///" + app.config["DB_NAME"]) -app.config.setdefault("SQLALCHEMY_TRACK_MODIFICATIONS", True) - -scss = Bundle( - "custom.scss", - filters="libsass,cssmin", - depends="scss/*.scss", - output="custom.generated.css", -) -assets = Environment(app) -assets.register("custom_css", scss) - -excel.init_excel(app) - -bootstrap = Bootstrap5(app) - -login_manager.login_view = "auth.login" -login_manager.init_app(app) - -db.init_app(app) -with app.app_context(): - db.create_all() - -migrate = Migrate(app, db) - -scheduler.init_app(app) -scheduler.start() - -active.init_app(app) -admin.init_app(app) -auth.init_app(app) -badge.init_app(app) -dbadmin.init_app(app) -event.init_app(app) -events.init_app(app) -finance.init_app(app) -proxy.init_app(app) -qr.init_app(app) -search.init_app(app) -team.init_app(app) -user.init_app(app) - - -@app.route("/") -def index(): - stmt = select(Event).filter_by(is_active=True) - events = db.session.scalars(stmt) - return render_template("index.html.jinja2", events=events) - - -@app.errorhandler(404) -def page_not_found(e): - return ( - render_template( - "error.html.jinja2", error_headline="Page Not Found", error_msg=e - ), - 404, - ) - - -@app.errorhandler(500) -def internal_server_error(e: int): - return ( - render_template( - "error.html.jinja2", error_headline="Internal Error", error_msg=e - ), - 500, - ) - - -@app.errorhandler(Exception) -def internal_server_error_ex(e: Exception): - import io - import traceback - - buffer = io.StringIO() - - traceback.print_exception(e) - traceback.print_exception(e, file=buffer) - return ( - render_template( - "error.html.jinja2", - error_headline="Internal Error", - error_msg=buffer.getvalue(), - ), - ) - - -def create_if_not_exists(cls, name, **kwargs): - if not cls.from_name(name): - item = cls(name=name, **kwargs) - db.session.add(item) - - -def init_default_db(): - create_if_not_exists(Role, name="admin", mentor=True, can_display=True, admin=True) - create_if_not_exists(Role, name="mentor", mentor=True, can_display=True) - create_if_not_exists(Role, name="display", can_display=True, autoload=True) - create_if_not_exists(Role, name="lead", can_see_subteam=True, receives_funds=True) - create_if_not_exists(Role, name="student", receives_funds=True) - create_if_not_exists(Role, name="guardian_limited", guardian=True, visible=False) - create_if_not_exists(Role, name="guardian", guardian=True) - - create_if_not_exists( - EventType, name="Training", description="Training Session", autoload=True - ) - create_if_not_exists( - EventType, name="Build", description="Build Season", autoload=True - ) - create_if_not_exists(EventType, name="Fundraiser", description="Fundraiser") - create_if_not_exists(EventType, name="Competition", description="Competition") - - create_if_not_exists(Subteam, name="Software") - create_if_not_exists(Subteam, name="Mechanical") - create_if_not_exists(Subteam, name="CAD") - create_if_not_exists(Subteam, name="Marketing") - create_if_not_exists(Subteam, name="Outreach") - - db.session.commit() - - if not User.from_email("admin@signin.chopshoplib.info"): - User.make( - "admin@signin.chopshoplib.info", - "admin", - password="1234", - role="admin", - approved=True, - ) - if not User.from_email("display@signin.chopshoplib.info"): - User.make( - "display@signin.chopshoplib.info", - "display", - password="1234", - role="display", - approved=True, - ) - db.session.commit() - - -if app.config["DEBUG"]: - with app.app_context(): - init_default_db() - - now = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0) - offset = datetime.timedelta(hours=3) - training = Event.create( - name="Training", - description="Test Training Event", - location="D124", - code="5678", - start=now, - end=now + offset, - event_type="Training", - ) - notTraining = Event.create( - name="Not Training", - description="Test Build Event", - location="D124", - code="5679", - start=now, - end=now + offset, - event_type="Build", - ) - - expired_event = Event.create( - name="Ended Training", - description="Test Training Event", - location="D124", - code="8765", - start=now - offset, - end=now - - datetime.timedelta(minutes=app.config["POST_EVENT_ACTIVE_TIME"] - 5), - event_type="Build", - ) - db.session.commit() - - mentor_user = User.make( - "msoucy@signin.chopshoplib.info", - "Matt Soucy", - preferred_name="Matt", - code="code-msoucy", - phone_number="603 555-5555", - address="123 First Street", - tshirt_size="Large", - password="1234", - role="mentor", - approved=True, - ) - student_user = Student.make( - "Jburke@signin.chopshoplib.info", - "Jeff Burke", - preferred_name="Jeff", - code="code-jburke", - password="1234", - graduation_year=2022, - subteam="Software", - approved=True, - tshirt_size="Large", - ) - student_user.student_user_data.add_guardian( - guardian=Guardian.get_from( - name="Parent Burke", - phone_number="(603)555-5555", - email="pburke@signin.chopshoplib.info", - contact_order=1, - ) - ) - - student_training_event = Active( - user_id=student_user.id, event_id=expired_event.id, start=now - offset - ) - db.session.add(student_training_event) - - safe = Badge( - name="Safety Certified", - icon="cone-striped", - color="#FFA500", # Orange - needs to be in hex format for WTForms - description="Passed Safety Training", - ) - db.session.add(safe) - db.session.commit() - - mentor_user.award_badge(safe) - db.session.commit() +#!/usr/bin/env python + +import datetime +import locale +import os +import zoneinfo + +import flask_excel as excel +import yaml +from flask import Flask, render_template +from flask_assets import Bundle, Environment +from flask_bootstrap import Bootstrap5 +from flask_migrate import Migrate +from sqlalchemy.future import select + +from . import ( + active, + admin, + auth, + badge, + dbadmin, + event, + events, + finance, + proxy, + qr, + search, + team, + user, +) +from .auth import login_manager +from .jobs import scheduler +from .model import ( + Active, + Badge, + Event, + EventType, + Guardian, + Role, + Student, + Subteam, + User, + db, +) + +locale.setlocale(locale.LC_ALL, "") + +app = Flask(__name__) + + +class Config: + TITLE = "Chop Shop Sign In" + BLURB = """ +We are Merrimack High School FIRST Robotics Competition Team 166, Chop Shop, \ + from Merrimack, New Hampshire. +Our mission is to build teamwork and a great robot, along with fostering a love for \ + Science, Technology, Engineering, and Mathematics.""".strip() + DB_NAME = "signin.db" + TIME_ZONE = "America/New_York" + SECRET_KEY = "1234" + PRE_EVENT_ACTIVE_TIME = 30 + POST_EVENT_ACTIVE_TIME = 120 + AUTO_SIGNOUT_BEHAVIOR = "None" # Valid Options (Credit, Discard, None) + PROXY_URL = "http://localhost:8080/kanboard/" + + +class DebugConfig(Config): + TITLE = "Chop Shop Sign In (debug)" + DB_NAME = ":memory:" + + +# First load the default config... +if app.config["DEBUG"]: + app.config.from_object(DebugConfig) +else: + app.config.from_object(Config) +# ...then load the config file if it exists... +rv = os.environ.get("CSSIGNIN_CONFIG") +if rv: + app.config.from_file(rv, load=yaml.safe_load, silent=True) +# ...then load from environment variables +app.config.from_prefixed_env() + +# Now validate the config +assert app.config["TITLE"], "Invalid title given in config" +assert ( + app.config["TIME_ZONE"] in zoneinfo.available_timezones() +), "Invalid time zone given in config" +assert app.config["PRE_EVENT_ACTIVE_TIME"] >= 0, "Invalid pre active time given in config" +assert app.config["POST_EVENT_ACTIVE_TIME"] >= 0, "Invalid post active time given in config" +assert app.config["AUTO_SIGNOUT_BEHAVIOR"] in ( + "Credit", + "Discard", + "None", +), "Invalid sign out behavior given in config" + +app.config.setdefault("SQLALCHEMY_DATABASE_URI", "sqlite:///" + app.config["DB_NAME"]) +app.config.setdefault("SQLALCHEMY_TRACK_MODIFICATIONS", True) + +scss = Bundle( + "custom.scss", + filters="libsass,cssmin", + depends="scss/*.scss", + output="custom.generated.css", +) +assets = Environment(app) +assets.register("custom_css", scss) + +excel.init_excel(app) + +bootstrap = Bootstrap5(app) + +login_manager.login_view = "auth.login" +login_manager.init_app(app) + +db.init_app(app) +with app.app_context(): + db.create_all() + +migrate = Migrate(app, db) + +scheduler.init_app(app) +scheduler.start() + +active.init_app(app) +admin.init_app(app) +auth.init_app(app) +badge.init_app(app) +dbadmin.init_app(app) +event.init_app(app) +events.init_app(app) +finance.init_app(app) +proxy.init_app(app) +qr.init_app(app) +search.init_app(app) +team.init_app(app) +user.init_app(app) + + +@app.route("/") +def index(): + stmt = select(Event).filter_by(is_active=True) + events = db.session.scalars(stmt) + return render_template("index.html.jinja2", events=events) + + +@app.errorhandler(404) +def page_not_found(e): + return ( + render_template("error.html.jinja2", error_headline="Page Not Found", error_msg=e), + 404, + ) + + +@app.errorhandler(500) +def internal_server_error(e: int): + return ( + render_template("error.html.jinja2", error_headline="Internal Error", error_msg=e), + 500, + ) + + +@app.errorhandler(Exception) +def internal_server_error_ex(e: Exception): + import io + import traceback + + buffer = io.StringIO() + + traceback.print_exception(e) + traceback.print_exception(e, file=buffer) + return ( + render_template( + "error.html.jinja2", + error_headline="Internal Error", + error_msg=buffer.getvalue(), + ), + ) + + +def create_if_not_exists(cls, name, **kwargs): + if not cls.from_name(name): + item = cls(name=name, **kwargs) + db.session.add(item) + + +def init_default_db(): + create_if_not_exists(Role, name="admin", mentor=True, can_display=True, admin=True) + create_if_not_exists(Role, name="mentor", mentor=True, can_display=True) + create_if_not_exists(Role, name="display", can_display=True, autoload=True) + create_if_not_exists(Role, name="lead", can_see_subteam=True, receives_funds=True) + create_if_not_exists(Role, name="student", receives_funds=True) + create_if_not_exists(Role, name="guardian_limited", guardian=True, visible=False) + create_if_not_exists(Role, name="guardian", guardian=True) + + create_if_not_exists(EventType, name="Training", description="Training Session", autoload=True) + create_if_not_exists(EventType, name="Build", description="Build Season", autoload=True) + create_if_not_exists(EventType, name="Fundraiser", description="Fundraiser") + create_if_not_exists(EventType, name="Competition", description="Competition") + + create_if_not_exists(Subteam, name="Software") + create_if_not_exists(Subteam, name="Mechanical") + create_if_not_exists(Subteam, name="CAD") + create_if_not_exists(Subteam, name="Marketing") + create_if_not_exists(Subteam, name="Outreach") + + db.session.commit() + + if not User.from_email("admin@signin.chopshoplib.info"): + User.make( + "admin@signin.chopshoplib.info", + "admin", + password="1234", + role="admin", + approved=True, + ) + if not User.from_email("display@signin.chopshoplib.info"): + User.make( + "display@signin.chopshoplib.info", + "display", + password="1234", + role="display", + approved=True, + ) + db.session.commit() + + +if app.config["DEBUG"]: + with app.app_context(): + init_default_db() + + now = datetime.datetime.now(tz=datetime.UTC).replace(microsecond=0) + offset = datetime.timedelta(hours=3) + training = Event.create( + name="Training", + description="Test Training Event", + location="D124", + code="5678", + start=now, + end=now + offset, + event_type="Training", + ) + notTraining = Event.create( + name="Not Training", + description="Test Build Event", + location="D124", + code="5679", + start=now, + end=now + offset, + event_type="Build", + ) + + expired_event = Event.create( + name="Ended Training", + description="Test Training Event", + location="D124", + code="8765", + start=now - offset, + end=now - datetime.timedelta(minutes=app.config["POST_EVENT_ACTIVE_TIME"] - 5), + event_type="Build", + ) + db.session.commit() + + mentor_user = User.make( + "msoucy@signin.chopshoplib.info", + "Matt Soucy", + preferred_name="Matt", + code="code-msoucy", + phone_number="603 555-5555", + address="123 First Street", + tshirt_size="Large", + password="1234", + role="mentor", + approved=True, + ) + student_user = Student.make( + "Jburke@signin.chopshoplib.info", + "Jeff Burke", + preferred_name="Jeff", + code="code-jburke", + password="1234", + graduation_year=2022, + subteam="Software", + approved=True, + tshirt_size="Large", + ) + student_user.student_user_data.add_guardian( + guardian=Guardian.get_from( + name="Parent Burke", + phone_number="(603)555-5555", + email="pburke@signin.chopshoplib.info", + contact_order=1, + ) + ) + + student_training_event = Active( + user_id=student_user.id, event_id=expired_event.id, start=now - offset + ) + db.session.add(student_training_event) + + safe = Badge( + name="Safety Certified", + icon="cone-striped", + color="#FFA500", # Orange - needs to be in hex format for WTForms + description="Passed Safety Training", + ) + db.session.add(safe) + db.session.commit() + + mentor_user.award_badge(safe) + db.session.commit() diff --git a/signinapp/active.py b/signinapp/active.py index 4296318..b715f23 100644 --- a/signinapp/active.py +++ b/signinapp/active.py @@ -20,9 +20,7 @@ def view(): @mentor_required def post(): active_event = db.session.get(Active, request.form["active_id"]) - stamp = Stamps( - user=active_event.user, event=active_event.event, start=active_event.start - ) + stamp = Stamps(user=active_event.user, event=active_event.event, start=active_event.start) db.session.delete(active_event) db.session.add(stamp) db.session.commit() diff --git a/signinapp/admin/__init__.py b/signinapp/admin/__init__.py index 15e4c07..71c07b5 100644 --- a/signinapp/admin/__init__.py +++ b/signinapp/admin/__init__.py @@ -1,17 +1,16 @@ -from flask import Flask, redirect, url_for -from flask.templating import render_template -from sqlalchemy import delete - -from ..util import admin_required -from . import role, subteam, users # noqa -from .util import admin - - -@admin.route("/admin") -@admin_required -def admin_main(): - return render_template("admin/main.html.jinja2") - - -def init_app(app: Flask): - app.register_blueprint(admin) +from flask import Flask +from flask.templating import render_template + +from ..util import admin_required +from . import role, subteam, users # noqa +from .util import admin + + +@admin.route("/admin") +@admin_required +def admin_main(): + return render_template("admin/main.html.jinja2") + + +def init_app(app: Flask): + app.register_blueprint(admin) diff --git a/signinapp/admin/users.py b/signinapp/admin/users.py index a5c7582..eeaf467 100644 --- a/signinapp/admin/users.py +++ b/signinapp/admin/users.py @@ -60,9 +60,7 @@ def user_promote(): if form.validate_on_submit(): if User.from_email(form.email.data) not in (None, user): flash(f"Username {form.email.data} already exists") - return redirect( - url_for("admin.user_promote", user_id=request.args["user_id"]) - ) + return redirect(url_for("admin.user_promote", user_id=request.args["user_id"])) # Cannot use form.populate_data because of the password user.email = form.email.data @@ -122,9 +120,7 @@ def edit_user(): form.admin_data.role.process_data(user.role_id) form.admin_data.approved.process_data(user.approved) form.subteam.process_data(user.subteam_id) - form.tshirt_size.process_data( - user.tshirt_size.name if user.tshirt_size else "Large" - ) + form.tshirt_size.process_data(user.tshirt_size.name if user.tshirt_size else "Large") form.pronouns.process_data(user.pronouns.name if user.pronouns else "He/Him") return render_template( "form.html.jinja2", diff --git a/signinapp/auth.py b/signinapp/auth.py index ec49709..92abc0c 100644 --- a/signinapp/auth.py +++ b/signinapp/auth.py @@ -155,9 +155,7 @@ def register_guardian(): if form.validate_on_submit(): # Cannot use form.populate_data because of the password - user = Guardian.get_from( - form.name.data, form.phone_number.data, form.email.data, 0 - ).user + user = Guardian.get_from(form.name.data, form.phone_number.data, form.email.data, 0).user user.name = form.name.data if form.password.data: user.password = generate_password_hash(form.password.data) diff --git a/signinapp/dbadmin.py b/signinapp/dbadmin.py index b6543ee..5d3bbf1 100644 --- a/signinapp/dbadmin.py +++ b/signinapp/dbadmin.py @@ -1,60 +1,60 @@ -from flask import redirect, request, url_for, abort -from flask_admin import Admin, AdminIndexView -from flask_admin.contrib.sqla import ModelView -from flask_login import current_user - -from .model import ( - Badge, - Event, - EventType, - Guardian, - Role, - Stamps, - Student, - Subteam, - User, - db, -) - - -class AuthModelView(ModelView): - def is_accessible(self): - return current_user.is_authenticated and current_user.role.admin - - def inaccessible_callback(self, name, **kwargs): - # redirect to login page if user doesn't have access - return redirect(url_for("auth.login", next=request.url)) - - -class AdminView(AdminIndexView): - def is_accessible(self): - return current_user.is_authenticated and current_user.role.admin - - def inaccessible_callback(self, name, **kwargs): - # redirect to login page if user doesn't have access - if not current_user.is_authenticated: - return redirect(url_for("auth.login", next=request.url)) - elif not current_user.role.admin: - return abort(401) - - -def init_app(app): - app.config["FLASK_ADMIN_SWATCH"] = "cyborg" - flask_admin = Admin( - index_view=AdminView(name="Home", url="/dbadmin", endpoint="dbadmin"), - endpoint="dbadmin", - ) - - flask_admin.add_views( - AuthModelView(Badge, db.session, endpoint="admin_badge"), - AuthModelView(Event, db.session, endpoint="admin_event"), - AuthModelView(EventType, db.session, endpoint="admin_eventtype"), - AuthModelView(Guardian, db.session, endpoint="admin_guardian"), - AuthModelView(Role, db.session, endpoint="admin_role"), - AuthModelView(Student, db.session, endpoint="admin_student"), - AuthModelView(Subteam, db.session, endpoint="admin_subteam"), - AuthModelView(User, db.session, endpoint="admin_user"), - AuthModelView(Stamps, db.session, endpoint="admin_stamps"), - ) - - flask_admin.init_app(app) +from flask import abort, redirect, request, url_for +from flask_admin import Admin, AdminIndexView +from flask_admin.contrib.sqla import ModelView +from flask_login import current_user + +from .model import ( + Badge, + Event, + EventType, + Guardian, + Role, + Stamps, + Student, + Subteam, + User, + db, +) + + +class AuthModelView(ModelView): + def is_accessible(self): + return current_user.is_authenticated and current_user.role.admin + + def inaccessible_callback(self, name, **kwargs): + # redirect to login page if user doesn't have access + return redirect(url_for("auth.login", next=request.url)) + + +class AdminView(AdminIndexView): + def is_accessible(self): + return current_user.is_authenticated and current_user.role.admin + + def inaccessible_callback(self, name, **kwargs): + # redirect to login page if user doesn't have access + if not current_user.is_authenticated: + return redirect(url_for("auth.login", next=request.url)) + elif not current_user.role.admin: + return abort(401) + + +def init_app(app): + app.config["FLASK_ADMIN_SWATCH"] = "cyborg" + flask_admin = Admin( + index_view=AdminView(name="Home", url="/dbadmin", endpoint="dbadmin"), + endpoint="dbadmin", + ) + + flask_admin.add_views( + AuthModelView(Badge, db.session, endpoint="admin_badge"), + AuthModelView(Event, db.session, endpoint="admin_event"), + AuthModelView(EventType, db.session, endpoint="admin_eventtype"), + AuthModelView(Guardian, db.session, endpoint="admin_guardian"), + AuthModelView(Role, db.session, endpoint="admin_role"), + AuthModelView(Student, db.session, endpoint="admin_student"), + AuthModelView(Subteam, db.session, endpoint="admin_subteam"), + AuthModelView(User, db.session, endpoint="admin_user"), + AuthModelView(Stamps, db.session, endpoint="admin_stamps"), + ) + + flask_admin.init_app(app) diff --git a/signinapp/event.py b/signinapp/event.py index 2fd7df6..3bfd0d9 100644 --- a/signinapp/event.py +++ b/signinapp/event.py @@ -33,9 +33,7 @@ def event(): if not event_code or not Event.get_from_code(event_code): flash("Invalid event code") return redirect(url_for("index")) - return render_template( - "event.html.jinja2", url_base=request.host_url, event_code=event_code - ) + return render_template("event.html.jinja2", url_base=request.host_url, event_code=event_code) @eventbp.route("/event/self") @@ -71,9 +69,7 @@ def selfout(): return redirect(url_for("index")) if current_user.is_signed_into(ev): - active: Active = db.session.scalar( - select(Active).filter_by(user=current_user, event=ev) - ) + active: Active = db.session.scalar(select(Active).filter_by(user=current_user, event=ev)) active.convert_to_stamp() return redirect(url_for("index")) @@ -93,9 +89,7 @@ def scan(): event = request.values.get("event_code") if not (user_code := request.values.get("user_code")): - return Response( - f"Error: Not a valid QR code: {user_code}", HTTPStatus.BAD_REQUEST - ) + return Response(f"Error: Not a valid QR code: {user_code}", HTTPStatus.BAD_REQUEST) if not (user := User.from_code(user_code)): return Response("Error: User does not exist", HTTPStatus.NOT_FOUND) diff --git a/signinapp/events.py b/signinapp/events.py index cb61005..c4fa035 100644 --- a/signinapp/events.py +++ b/signinapp/events.py @@ -1,437 +1,429 @@ -from collections import defaultdict -from datetime import date, datetime, timedelta, timezone -from urllib import parse - -from dateutil.rrule import WEEKLY, rrule -from flask import Blueprint, Flask, flash, redirect, request, url_for -from flask.templating import render_template -from flask_login import current_user, login_required -from flask_wtf import FlaskForm -from sqlalchemy import func -from sqlalchemy.future import select -from wtforms import ( - BooleanField, - DateField, - DateTimeLocalField, - DecimalField, - FieldList, - Form, - FormField, - HiddenField, - SelectField, - SelectMultipleField, - StringField, - SubmitField, - TextAreaField, - TimeField, -) -from wtforms.validators import DataRequired, EqualTo, NumberRange, ValidationError - -from .model import ( - Event, - EventRegistration, - EventType, - db, - gen_code, - get_form_ids, - school_year_for_date, -) -from .util import correct_time_for_storage, correct_time_from_storage, mentor_required - -bp = Blueprint("events", __name__, url_prefix="/events") - -DATE_FORMATS = [ - "%Y-%m-%d %H:%M:%S", - "%Y-%m-%dT%H:%M:%S", - "%Y-%m-%d %H:%M", - "%Y-%m-%dT%H:%M", -] - -WEEKDAYS = [ - "Monday", - "Tuesday", - "Wednesday", - "Thursday", - "Friday", - "Saturday", - "Sunday", -] - - -class EventForm(FlaskForm): - name = StringField(validators=[DataRequired()]) - description = StringField() - location = StringField() - code = StringField(validators=[DataRequired()], default=gen_code) - start = DateTimeLocalField(format=DATE_FORMATS) - end = DateTimeLocalField(format=DATE_FORMATS) - type_id = SelectField(label="Type", choices=lambda: get_form_ids(EventType)) - registration_open = BooleanField( - default=False, description="Whether this event shows up for users to register" - ) - funds = DecimalField(label="Funds Received", default=0) - cost = DecimalField(label="Event Cost", default=0) - overhead = DecimalField( - label="Overhead Portion", - validators=[ - NumberRange(min=0.0, max=1.0, message="Must be between 0.0 and 1.0") - ], - default=1.0, - ) - submit = SubmitField() - - def validate_end(self, field): - if self.start.data >= field.data: - raise ValidationError("End time must not be before start time") - - -class BulkEventForm(FlaskForm): - name = StringField(validators=[DataRequired()]) - description = StringField() - location = StringField() - start_day = DateField(validators=[DataRequired()]) - end_day = DateField(validators=[DataRequired()]) - days = SelectMultipleField(choices=WEEKDAYS, validators=[DataRequired()]) - start_time = TimeField(validators=[DataRequired()]) - end_time = TimeField(validators=[DataRequired()]) - type_id = SelectField(label="Type", choices=lambda: get_form_ids(EventType)) - # No funds or cost field because we don't know this amount up front - submit = SubmitField() - - def validate_end_time(self, field): - if self.start_time.data >= field.data: - raise ValidationError("End time must not be before start time") - - def validate_end_day(self, field): - if self.start_day.data >= field.data: - raise ValidationError("End day must not be before start day") - - -class EventSearchForm(FlaskForm): - category = SelectField(choices=lambda: get_form_ids(EventType)) - submit = SubmitField() - - -class EventBlockForm(Form): - start = DateTimeLocalField(render_kw={"readonly": True}) - end = DateTimeLocalField(render_kw={"readonly": True}) - register = BooleanField() - block_id = HiddenField() - comment = TextAreaField( - "Comment", - description="Comment on event signup", - render_kw={"placeholder": "Comment"}, - ) - - -class EventRegistrationForm(FlaskForm): - blocks = FieldList(FormField(EventBlockForm)) - submit = SubmitField() - - -class DeleteEventForm(FlaskForm): - name = StringField(validators=[DataRequired()], render_kw={"readonly": True}) - start = DateTimeLocalField(render_kw={"readonly": True}) - end = DateTimeLocalField(render_kw={"readonly": True}) - verify = StringField( - "Confirm Name", - validators=[DataRequired(), EqualTo("name", message="Enter the event's name")], - ) - submit = SubmitField() - - -@bp.route("/", endpoint="list") -@mentor_required -def list_events(): - events: list[Event] = list( - db.session.scalars( - select(Event) - .where(Event.school_year == school_year_for_date(date.today())) - .order_by(Event.start) - ) - ) - return render_template("events.html.jinja2", events=events) - - -@bp.route("/previous") -@mentor_required -def previous(): - events: list[Event] = list( - db.session.scalars( - select(Event).order_by(Event.start).where(Event.end <= func.now()) - ) - ) - return render_template("events.html.jinja2", prefix="Previous ", events=events) - - -@bp.route("/active") -@mentor_required -def active(): - events: list[Event] = list( - db.session.scalars(select(Event).order_by(Event.start).where(Event.is_active)) - ) - return render_template("events.html.jinja2", prefix="Active ", events=events) - - -@bp.route("/today") -@mentor_required -def todays(): - query = select(Event).order_by(Event.start) - if db.get_engine().name == "postgresql": - query = query.where( - Event.start - < func.date_trunc("day", func.now()) + func.make_interval(0, 0, 0, 1), - Event.end > func.date_trunc("day", func.now()), - ) - elif db.get_engine().name == "sqlite": - query = query.where( - Event.start < func.datetime("now", "+1 day", "start of day"), - Event.end > func.datetime("now", "start of day"), - ) - events: list[Event] = list(db.session.scalars(query)) - return render_template("events.html.jinja2", prefix="Today's ", events=events) - - -@bp.route("/upcoming") -@mentor_required -def upcoming(): - events: list[Event] = list( - db.session.scalars( - select(Event).order_by(Event.start.desc()).where(Event.start > func.now()) - ) - ) - return render_template("events.html.jinja2", prefix="Upcoming ", events=events) - - -@bp.route("/open", endpoint="open") -def list_open(): - events: list[Event] = list( - db.session.scalars( - select(Event) - .filter_by(registration_open=True) - .order_by(Event.start.desc(), Event.end.desc()) - .where(Event.end > func.now()) - ) - ) - return render_template("open_events.html.jinja2", events=events) - - -@bp.route("/stats") -@mentor_required -def stats(): - event: Event = db.session.get(Event, request.args["event_id"]) - users = defaultdict(timedelta) - subteams = defaultdict(timedelta) - blocks = defaultdict(list) - for stamp in event.stamps: - users[stamp.user] += stamp.elapsed - subteams[stamp.user.subteam] += stamp.elapsed - now = datetime.now(tz=timezone.utc) - for active in event.active: - users[active.user] += now - correct_time_from_storage(active.start) - subteams[active.user.subteam] += now - correct_time_from_storage(active.start) - - for block in event.blocks: - for registration in block.registrations: - if registration.registered: - blocks[block].append((registration.user.name, registration.comment)) - # Sort user names - blocks[block] = sorted(blocks[block], key=lambda registration: registration[0]) - - blocks = sorted( - blocks.items(), - key=lambda block: block[0].start, - ) - - users = sorted(users.items(), key=lambda user: user[0].human_readable) - subteams = sorted( - ((s, t) for s, t in subteams.items() if s), key=lambda subteam: subteam[0].name - ) - registration_url = parse.urljoin( - request.host_url, url_for("events.register", event_id=event.id) - ) - return render_template( - "event_stats.html.jinja2", - event=event, - users=users, - subteams=subteams, - blocks=blocks, - registration_url=registration_url, - ) - - -@bp.route("/bulk", methods=["GET", "POST"]) -@mentor_required -def bulk(): - form = BulkEventForm() - - if form.validate_on_submit(): - start_time = datetime.combine(form.start_day.data, form.start_time.data) - end_time = datetime.combine(form.end_day.data, form.end_time.data) - days = rrule( - WEEKLY, byweekday=[WEEKDAYS.index(d) for d in form.days.data] - ).between(start_time, end_time, inc=True) - event_type = db.session.get(EventType, form.type_id.data) - for d in [d.date() for d in days]: - Event.create( - name=form.name.data, - description=form.description.data, - location=form.location.data, - start=datetime.combine(d, form.start_time.data), - end=datetime.combine(d, form.end_time.data), - event_type=event_type, - ) - db.session.commit() - - return redirect(url_for("events.upcoming")) - return render_template("form.html.jinja2", form=form, title="Bulk Event Add") - - -@bp.route("/new", methods=["GET", "POST"]) -@mentor_required -def new(): - form = EventForm() - if form.validate_on_submit(): - event_type = db.session.get(EventType, form.type_id.data) - ev = Event.create( - name=form.name.data, - description=form.description.data, - location=form.location.data, - start=form.start.data, - end=form.end.data, - event_type=event_type, - registration_open=form.registration_open.data, - ) - ev.cost = int(form.cost.data * 100) - ev.funds = int(form.funds.data * 100) - ev.overhead = int(form.overhead.data * 100) - db.session.commit() - - return redirect(url_for("events.upcoming")) - - return render_template("form.html.jinja2", form=form, title="New Event") - - -@bp.route("/edit", methods=["GET", "POST"]) -@mentor_required -def edit(): - event: Event = db.session.get(Event, request.args["event_id"]) - if not event: - flash("Event does not exist") - return redirect(url_for("events.list")) - - form = EventForm(obj=event) - # Only re-format the times when initialy sending the form. - if not form.is_submitted(): - form.start.data = correct_time_from_storage(form.start.data) - form.end.data = correct_time_from_storage(form.end.data) - - if form.validate_on_submit(): - form.start.data = correct_time_for_storage(form.start.data) - form.end.data = correct_time_for_storage(form.end.data) - form.populate_obj(event) - event.cost = int(form.cost.data * 100) - event.funds = int(form.funds.data * 100) - event.overhead = int(form.overhead.data * 100) - db.session.commit() - return redirect(url_for("events.list")) - - form.cost.process_data(form.cost.data / 100) - form.funds.process_data(form.funds.data / 100) - form.overhead.process_data(form.overhead.data / 100) - - form.type_id.process_data(event.type_id) - return render_template( - "form.html.jinja2", - form=form, - title=f"Edit Event {event.name}", - ) - - -@bp.route("/search", methods=["GET", "POST"]) -@mentor_required -def search(): - form = EventSearchForm() - - if form.validate_on_submit(): - event_type = db.session.get(EventType, form.category.data) - results = db.session.scalars(select(Event).where(Event.type_ == event_type)) - return render_template("search/events.html.jinja2", form=form, results=results) - - return render_template("search/hours.html.jinja2", form=form, results=None) - - -@bp.route("/register", methods=["GET", "POST"]) -@login_required -def register(): - event: list[Event] = db.session.get(Event, request.args["event_id"]) - if not event: - flash("Event does not exist, please double check the URL") - return redirect(url_for("index")) - - data = {"blocks": []} - for block in event.blocks: - registration = db.session.scalar( - select(EventRegistration).filter_by( - user_id=current_user.id, event_block=block - ) - ) - # If no registration exists for this block, or they've chosen not to register for this event block - registered = registration and registration.registered - comment = registration.comment if registration else "" - data["blocks"].append( - { - "data": { - "start": correct_time_from_storage(block.start), - "end": correct_time_from_storage(block.end), - "block_id": block.id, - "register": registered, - "comment": comment, - } - } - ) - form = EventRegistrationForm(data=data) - - if form.validate_on_submit(): - for block in form.blocks.data: - EventRegistration.upsert( - event_block_id=block["block_id"], - user=current_user, - comment=block["comment"], - registered=block["register"], - ) - db.session.commit() - flash(f"Successfully signed up for {event.name}") - return redirect(url_for("index")) - - return render_template( - "event_register.html.jinja2", - event=event, - form=form, - title=f"Register Event {event.name}", - ) - - -@bp.route("/delete", methods=["GET", "POST"]) -@mentor_required -def delete(): - ev = db.session.get(Event, request.args["event_id"]) - if not ev: - flash("Invalid event ID") - return redirect(url_for("events.list")) - - form = DeleteEventForm(obj=ev) - if form.validate_on_submit(): - db.session.delete(ev) - db.session.commit() - return redirect(url_for("events.list")) - - return render_template( - "form.html.jinja2", - form=form, - title=f"Delete Event {ev.name}", - ) - - -def init_app(app: Flask): - app.register_blueprint(bp) +from collections import defaultdict +from datetime import UTC, date, datetime, timedelta +from urllib import parse + +from dateutil.rrule import WEEKLY, rrule +from flask import Blueprint, Flask, flash, redirect, request, url_for +from flask.templating import render_template +from flask_login import current_user, login_required +from flask_wtf import FlaskForm +from sqlalchemy import func +from sqlalchemy.future import select +from wtforms import ( + BooleanField, + DateField, + DateTimeLocalField, + DecimalField, + FieldList, + Form, + FormField, + HiddenField, + SelectField, + SelectMultipleField, + StringField, + SubmitField, + TextAreaField, + TimeField, +) +from wtforms.validators import DataRequired, EqualTo, NumberRange, ValidationError + +from .model import ( + Event, + EventRegistration, + EventType, + db, + gen_code, + get_form_ids, + school_year_for_date, +) +from .util import correct_time_for_storage, correct_time_from_storage, mentor_required + +bp = Blueprint("events", __name__, url_prefix="/events") + +DATE_FORMATS = [ + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%dT%H:%M", +] + +WEEKDAYS = [ + "Monday", + "Tuesday", + "Wednesday", + "Thursday", + "Friday", + "Saturday", + "Sunday", +] + + +class EventForm(FlaskForm): + name = StringField(validators=[DataRequired()]) + description = StringField() + location = StringField() + code = StringField(validators=[DataRequired()], default=gen_code) + start = DateTimeLocalField(format=DATE_FORMATS) + end = DateTimeLocalField(format=DATE_FORMATS) + type_id = SelectField(label="Type", choices=lambda: get_form_ids(EventType)) + registration_open = BooleanField( + default=False, description="Whether this event shows up for users to register" + ) + funds = DecimalField(label="Funds Received", default=0) + cost = DecimalField(label="Event Cost", default=0) + overhead = DecimalField( + label="Overhead Portion", + validators=[NumberRange(min=0.0, max=1.0, message="Must be between 0.0 and 1.0")], + default=1.0, + ) + submit = SubmitField() + + def validate_end(self, field): + if self.start.data >= field.data: + raise ValidationError("End time must not be before start time") + + +class BulkEventForm(FlaskForm): + name = StringField(validators=[DataRequired()]) + description = StringField() + location = StringField() + start_day = DateField(validators=[DataRequired()]) + end_day = DateField(validators=[DataRequired()]) + days = SelectMultipleField(choices=WEEKDAYS, validators=[DataRequired()]) + start_time = TimeField(validators=[DataRequired()]) + end_time = TimeField(validators=[DataRequired()]) + type_id = SelectField(label="Type", choices=lambda: get_form_ids(EventType)) + # No funds or cost field because we don't know this amount up front + submit = SubmitField() + + def validate_end_time(self, field): + if self.start_time.data >= field.data: + raise ValidationError("End time must not be before start time") + + def validate_end_day(self, field): + if self.start_day.data >= field.data: + raise ValidationError("End day must not be before start day") + + +class EventSearchForm(FlaskForm): + category = SelectField(choices=lambda: get_form_ids(EventType)) + submit = SubmitField() + + +class EventBlockForm(Form): + start = DateTimeLocalField(render_kw={"readonly": True}) + end = DateTimeLocalField(render_kw={"readonly": True}) + register = BooleanField() + block_id = HiddenField() + comment = TextAreaField( + "Comment", + description="Comment on event signup", + render_kw={"placeholder": "Comment"}, + ) + + +class EventRegistrationForm(FlaskForm): + blocks = FieldList(FormField(EventBlockForm)) + submit = SubmitField() + + +class DeleteEventForm(FlaskForm): + name = StringField(validators=[DataRequired()], render_kw={"readonly": True}) + start = DateTimeLocalField(render_kw={"readonly": True}) + end = DateTimeLocalField(render_kw={"readonly": True}) + verify = StringField( + "Confirm Name", + validators=[DataRequired(), EqualTo("name", message="Enter the event's name")], + ) + submit = SubmitField() + + +@bp.route("/", endpoint="list") +@mentor_required +def list_events(): + events: list[Event] = list( + db.session.scalars( + select(Event) + .where(Event.school_year == school_year_for_date(date.today())) + .order_by(Event.start) + ) + ) + return render_template("events.html.jinja2", events=events) + + +@bp.route("/previous") +@mentor_required +def previous(): + events: list[Event] = list( + db.session.scalars(select(Event).order_by(Event.start).where(Event.end <= func.now())) + ) + return render_template("events.html.jinja2", prefix="Previous ", events=events) + + +@bp.route("/active") +@mentor_required +def active(): + events: list[Event] = list( + db.session.scalars(select(Event).order_by(Event.start).where(Event.is_active)) + ) + return render_template("events.html.jinja2", prefix="Active ", events=events) + + +@bp.route("/today") +@mentor_required +def todays(): + query = select(Event).order_by(Event.start) + if db.get_engine().name == "postgresql": + query = query.where( + Event.start < func.date_trunc("day", func.now()) + func.make_interval(0, 0, 0, 1), + Event.end > func.date_trunc("day", func.now()), + ) + elif db.get_engine().name == "sqlite": + query = query.where( + Event.start < func.datetime("now", "+1 day", "start of day"), + Event.end > func.datetime("now", "start of day"), + ) + events: list[Event] = list(db.session.scalars(query)) + return render_template("events.html.jinja2", prefix="Today's ", events=events) + + +@bp.route("/upcoming") +@mentor_required +def upcoming(): + events: list[Event] = list( + db.session.scalars( + select(Event).order_by(Event.start.desc()).where(Event.start > func.now()) + ) + ) + return render_template("events.html.jinja2", prefix="Upcoming ", events=events) + + +@bp.route("/open", endpoint="open") +def list_open(): + events: list[Event] = list( + db.session.scalars( + select(Event) + .filter_by(registration_open=True) + .order_by(Event.start.desc(), Event.end.desc()) + .where(Event.end > func.now()) + ) + ) + return render_template("open_events.html.jinja2", events=events) + + +@bp.route("/stats") +@mentor_required +def stats(): + event: Event = db.session.get(Event, request.args["event_id"]) + users = defaultdict(timedelta) + subteams = defaultdict(timedelta) + blocks = defaultdict(list) + for stamp in event.stamps: + users[stamp.user] += stamp.elapsed + subteams[stamp.user.subteam] += stamp.elapsed + now = datetime.now(tz=UTC) + for active in event.active: + users[active.user] += now - correct_time_from_storage(active.start) + subteams[active.user.subteam] += now - correct_time_from_storage(active.start) + + for block in event.blocks: + for registration in block.registrations: + if registration.registered: + blocks[block].append((registration.user.name, registration.comment)) + # Sort user names + blocks[block] = sorted(blocks[block], key=lambda registration: registration[0]) + + blocks = sorted( + blocks.items(), + key=lambda block: block[0].start, + ) + + users = sorted(users.items(), key=lambda user: user[0].human_readable) + subteams = sorted( + ((s, t) for s, t in subteams.items() if s), key=lambda subteam: subteam[0].name + ) + registration_url = parse.urljoin( + request.host_url, url_for("events.register", event_id=event.id) + ) + return render_template( + "event_stats.html.jinja2", + event=event, + users=users, + subteams=subteams, + blocks=blocks, + registration_url=registration_url, + ) + + +@bp.route("/bulk", methods=["GET", "POST"]) +@mentor_required +def bulk(): + form = BulkEventForm() + + if form.validate_on_submit(): + start_time = datetime.combine(form.start_day.data, form.start_time.data) + end_time = datetime.combine(form.end_day.data, form.end_time.data) + days = rrule(WEEKLY, byweekday=[WEEKDAYS.index(d) for d in form.days.data]).between( + start_time, end_time, inc=True + ) + event_type = db.session.get(EventType, form.type_id.data) + for d in [d.date() for d in days]: + Event.create( + name=form.name.data, + description=form.description.data, + location=form.location.data, + start=datetime.combine(d, form.start_time.data), + end=datetime.combine(d, form.end_time.data), + event_type=event_type, + ) + db.session.commit() + + return redirect(url_for("events.upcoming")) + return render_template("form.html.jinja2", form=form, title="Bulk Event Add") + + +@bp.route("/new", methods=["GET", "POST"]) +@mentor_required +def new(): + form = EventForm() + if form.validate_on_submit(): + event_type = db.session.get(EventType, form.type_id.data) + ev = Event.create( + name=form.name.data, + description=form.description.data, + location=form.location.data, + start=form.start.data, + end=form.end.data, + event_type=event_type, + registration_open=form.registration_open.data, + ) + ev.cost = int(form.cost.data * 100) + ev.funds = int(form.funds.data * 100) + ev.overhead = int(form.overhead.data * 100) + db.session.commit() + + return redirect(url_for("events.upcoming")) + + return render_template("form.html.jinja2", form=form, title="New Event") + + +@bp.route("/edit", methods=["GET", "POST"]) +@mentor_required +def edit(): + event: Event = db.session.get(Event, request.args["event_id"]) + if not event: + flash("Event does not exist") + return redirect(url_for("events.list")) + + form = EventForm(obj=event) + # Only re-format the times when initialy sending the form. + if not form.is_submitted(): + form.start.data = correct_time_from_storage(form.start.data) + form.end.data = correct_time_from_storage(form.end.data) + + if form.validate_on_submit(): + form.start.data = correct_time_for_storage(form.start.data) + form.end.data = correct_time_for_storage(form.end.data) + form.populate_obj(event) + event.cost = int(form.cost.data * 100) + event.funds = int(form.funds.data * 100) + event.overhead = int(form.overhead.data * 100) + db.session.commit() + return redirect(url_for("events.list")) + + form.cost.process_data(form.cost.data / 100) + form.funds.process_data(form.funds.data / 100) + form.overhead.process_data(form.overhead.data / 100) + + form.type_id.process_data(event.type_id) + return render_template( + "form.html.jinja2", + form=form, + title=f"Edit Event {event.name}", + ) + + +@bp.route("/search", methods=["GET", "POST"]) +@mentor_required +def search(): + form = EventSearchForm() + + if form.validate_on_submit(): + event_type = db.session.get(EventType, form.category.data) + results = db.session.scalars(select(Event).where(Event.type_ == event_type)) + return render_template("search/events.html.jinja2", form=form, results=results) + + return render_template("search/hours.html.jinja2", form=form, results=None) + + +@bp.route("/register", methods=["GET", "POST"]) +@login_required +def register(): + event: list[Event] = db.session.get(Event, request.args["event_id"]) + if not event: + flash("Event does not exist, please double check the URL") + return redirect(url_for("index")) + + data = {"blocks": []} + for block in event.blocks: + registration = db.session.scalar( + select(EventRegistration).filter_by(user_id=current_user.id, event_block=block) + ) + registered = registration and registration.registered + comment = registration.comment if registration else "" + data["blocks"].append( + { + "data": { + "start": correct_time_from_storage(block.start), + "end": correct_time_from_storage(block.end), + "block_id": block.id, + "register": registered, + "comment": comment, + } + } + ) + form = EventRegistrationForm(data=data) + + if form.validate_on_submit(): + for block in form.blocks.data: + EventRegistration.upsert( + event_block_id=block["block_id"], + user=current_user, + comment=block["comment"], + registered=block["register"], + ) + db.session.commit() + flash(f"Successfully signed up for {event.name}") + return redirect(url_for("index")) + + return render_template( + "event_register.html.jinja2", + event=event, + form=form, + title=f"Register Event {event.name}", + ) + + +@bp.route("/delete", methods=["GET", "POST"]) +@mentor_required +def delete(): + ev = db.session.get(Event, request.args["event_id"]) + if not ev: + flash("Invalid event ID") + return redirect(url_for("events.list")) + + form = DeleteEventForm(obj=ev) + if form.validate_on_submit(): + db.session.delete(ev) + db.session.commit() + return redirect(url_for("events.list")) + + return render_template( + "form.html.jinja2", + form=form, + title=f"Delete Event {ev.name}", + ) + + +def init_app(app: Flask): + app.register_blueprint(bp) diff --git a/signinapp/finance.py b/signinapp/finance.py index 77dc786..d985cb6 100644 --- a/signinapp/finance.py +++ b/signinapp/finance.py @@ -1,29 +1,26 @@ -import locale - -from flask import Blueprint, Flask, render_template -from sqlalchemy.future import select - -from .model import Event, Role, User, db -from .util import admin_required - -finance = Blueprint("finance", __name__) - - -@finance.route("/finance") -@admin_required -def overview(): - all_events: list[Event] = db.session.scalars(select(Event)) - all_users: list[User] = db.session.scalars( - select(User).join(Role).where(Role.visible == True, Role.receives_funds == True) - ) - total_overhead = ( - sum(((ev.funds - ev.cost) * ev.overhead) for ev in all_events) / 100.0 - ) - user_funds = sorted([(u.display_name, u.yearly_funds()) for u in all_users]) - return render_template( - "finance.html.jinja2", total_overhead=total_overhead, user_funds=user_funds - ) - - -def init_app(app: Flask): - app.register_blueprint(finance) + +from flask import Blueprint, Flask, render_template +from sqlalchemy.future import select + +from .model import Event, Role, User, db +from .util import admin_required + +finance = Blueprint("finance", __name__) + + +@finance.route("/finance") +@admin_required +def overview(): + all_events: list[Event] = db.session.scalars(select(Event)) + all_users: list[User] = db.session.scalars( + select(User).join(Role).where(Role.visible == True, Role.receives_funds == True) # noqa: E712 + ) + total_overhead = sum(((ev.funds - ev.cost) * ev.overhead) for ev in all_events) / 100.0 + user_funds = sorted([(u.display_name, u.yearly_funds()) for u in all_users]) + return render_template( + "finance.html.jinja2", total_overhead=total_overhead, user_funds=user_funds + ) + + +def init_app(app: Flask): + app.register_blueprint(finance) diff --git a/signinapp/forms.py b/signinapp/forms.py index 9dd2b7a..40773be 100644 --- a/signinapp/forms.py +++ b/signinapp/forms.py @@ -112,9 +112,7 @@ class UserForm(FlaskForm): validators=[Regexp(PHONE_RE)], render_kw={"placeholder": "555-555-5555"}, ) - address = StringField( - "Street Address", validators=[DataRequired(), Regexp(ADDRESS_RE)] - ) + address = StringField("Street Address", validators=[DataRequired(), Regexp(ADDRESS_RE)]) tshirt_size = SelectField( "T-Shirt Size", choices=ShirtSizes.get_size_names(), validators=[DataRequired()] ) diff --git a/signinapp/jobs.py b/signinapp/jobs.py index a26cc38..fd9d940 100644 --- a/signinapp/jobs.py +++ b/signinapp/jobs.py @@ -1,33 +1,32 @@ -from datetime import datetime -from typing import List -from zoneinfo import ZoneInfo - -from flask import current_app -from flask_apscheduler import APScheduler -from sqlalchemy.future import select - -from .model import Active, db - -# initialize scheduler -scheduler = APScheduler() - - -@scheduler.task("interval", id="UserSignOutJob", seconds=30) -def EventEndJob(): - """ - This monitor checks all of the entries in the active table - For each entry check if the adjusted end time has passed - """ - with scheduler.app.app_context(): - time = datetime.now(tz=ZoneInfo(current_app.config["TIME_ZONE"])) - active_entries: List[Active] = db.session.scalars(select(Active)) - - for active in active_entries: - if active.event.adjusted_end < time: - if current_app.config["AUTO_SIGNOUT_BEHAVIOR"] == "Credit": - # Expire the active session - active.convert_to_stamp(active.event.end) - elif current_app.config["AUTO_SIGNOUT_BEHAVIOR"] == "Discard": - # Delete active entry without crediting the user - db.session.delete(active) - db.session.commit() +from datetime import datetime +from zoneinfo import ZoneInfo + +from flask import current_app +from flask_apscheduler import APScheduler +from sqlalchemy.future import select + +from .model import Active, db + +# initialize scheduler +scheduler = APScheduler() + + +@scheduler.task("interval", id="UserSignOutJob", seconds=30) +def EventEndJob(): + """ + This monitor checks all of the entries in the active table + For each entry check if the adjusted end time has passed + """ + with scheduler.app.app_context(): + time = datetime.now(tz=ZoneInfo(current_app.config["TIME_ZONE"])) + active_entries: list[Active] = db.session.scalars(select(Active)) + + for active in active_entries: + if active.event.adjusted_end < time: + if current_app.config["AUTO_SIGNOUT_BEHAVIOR"] == "Credit": + # Expire the active session + active.convert_to_stamp(active.event.end) + elif current_app.config["AUTO_SIGNOUT_BEHAVIOR"] == "Discard": + # Delete active entry without crediting the user + db.session.delete(active) + db.session.commit() diff --git a/signinapp/model.py b/signinapp/model.py index 824489d..94c36af 100644 --- a/signinapp/model.py +++ b/signinapp/model.py @@ -1,816 +1,800 @@ -from __future__ import annotations - -import dataclasses -import enum -import locale -import secrets -from datetime import date, datetime, timedelta, timezone -from http import HTTPStatus -from typing import Annotated - -from flask import Response, current_app -from flask_login import UserMixin -from flask_sqlalchemy import SQLAlchemy -from sqlalchemy import and_, func, MetaData -from sqlalchemy.ext.associationproxy import AssociationProxy, association_proxy -from sqlalchemy.ext.hybrid import hybrid_property -from sqlalchemy.future import select -from sqlalchemy.orm import Mapped, mapped_column -from werkzeug.security import generate_password_hash -from wtforms import FieldList - -from .util import ( - correct_time_for_storage, - correct_time_from_storage, - generate_grade_choices, - normalize_phone_number_for_storage, - normalize_phone_number_from_storage, -) - - -convention = { - "ix": "ix_%(column_0_label)s", - "uq": "uq_%(table_name)s_%(column_0_name)s", - "ck": "ck_%(table_name)s_%(constraint_name)s", - "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", - "pk": "pk_%(table_name)s", -} - -metadata = MetaData(naming_convention=convention) - -# this variable, db, will be used for all SQLAlchemy commands -db = SQLAlchemy(metadata=metadata) - -intpk = Annotated[int, mapped_column(primary_key=True)] -NonNullBool = Annotated[bool, mapped_column(default=False)] - - -def school_year_for_date(d: date): - year = d.year - if d.month > 6: - year += 1 - return year - - -def gen_code(): - "Generate an event code" - return secrets.token_urlsafe(16) - - -def get_form_ids(model, add_null_id=False, filters=()): - prefix = [(0, "None")] if add_null_id else [] - stmt = select(model) - if filters: - stmt = stmt.where(*filters) - return prefix + [(x.id, x.name) for x in db.session.scalars(stmt)] - - -@dataclasses.dataclass -class StampEvent: - name: str - event: str - - -class ShirtSizes(enum.Enum): - Small = "Small" - Medium = "Medium" - Large = "Large" - X_Large = "X-Large" - XX_Large = "XX-Large" - XXX_Large = "XXX-Large" - - @classmethod - def get_size_names(cls): - return [(size.name, size.value) for size in cls] - - -class Pronoun(enum.Enum): - He_Him = "He/Him" - She_Her = "She/Her" - They_Them = "They/Them" - He_They = "He/They" - She_They = "She/They" - - @classmethod - def get_pronoun_options(cls): - return [(p.name, p.value) for p in cls] - - -class Badge(db.Model): - 'Represents an "achievement", accomplishment, or certification' - __tablename__ = "badges" - id: Mapped[intpk] - name: Mapped[str] - description: Mapped[str | None] - emoji: Mapped[str | None] - icon: Mapped[str | None] - color: Mapped[str] = mapped_column(default="black") - - awards: Mapped[list[BadgeAward]] = db.relationship(back_populates="badge") - - @staticmethod - def from_name(name) -> Badge: - "Get a badge by name" - return db.session.scalar(select(Badge).filter_by(name=name)) - - -class BadgeAward(db.Model): - "Represents a pairing of user to badge, with received date" - __tablename__ = "badge_awards" - user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id"), primary_key=True) - badge_id: Mapped[int] = mapped_column(db.ForeignKey("badges.id"), primary_key=True) - received: Mapped[datetime] = mapped_column(server_default=func.now()) - - owner: Mapped[User] = db.relationship(back_populates="awards", uselist=False) - badge: Mapped[Badge] = db.relationship() - - def __init__(self, badge=None, owner=None): - self.owner = owner - self.badge = badge - - -parent_child_association_table = db.Table( - "parent_child_association", - db.metadata, - db.Column("guardians", db.ForeignKey("guardians.id"), primary_key=True), - db.Column("user_id", db.ForeignKey("students.id"), primary_key=True), -) - - -class User(UserMixin, db.Model): - __tablename__ = "users" - id: Mapped[intpk] - email: Mapped[str] = mapped_column(unique=True) - name: Mapped[str] - preferred_name: Mapped[str | None] - password: Mapped[str | None] - subteam_id: Mapped[int | None] = mapped_column(db.ForeignKey("subteams.id")) - phone_number: Mapped[str | None] - address: Mapped[str | None] - tshirt_size: Mapped[ShirtSizes | None] - pronouns: Mapped[Pronoun | None] - - code: Mapped[str] = mapped_column(unique=True, default=gen_code) - role_id: Mapped[int] = mapped_column(db.ForeignKey("account_types.id")) - approved: Mapped[NonNullBool] - - stamps: Mapped[list[Stamps]] = db.relationship( - "Stamps", - back_populates="user", - cascade="all, delete, delete-orphan", - ) - role: Mapped[Role] = db.relationship(back_populates="users") - subteam: Mapped[Subteam] = db.relationship(back_populates="members") - - awards: Mapped[list[BadgeAward]] = db.relationship( - back_populates="owner", cascade="all, delete-orphan" - ) - badges: AssociationProxy[list[Badge]] = association_proxy("awards", "badge") - - # Guardian specific data - guardian_user_data: Mapped[Guardian | None] = db.relationship( - back_populates="user", - cascade="all, delete, delete-orphan", - ) - - # Student specific data - student_user_data: Mapped[Student | None] = db.relationship( - back_populates="user", - cascade="all, delete, delete-orphan", - ) - - @hybrid_property - def is_active(self) -> bool: - "Required by Flask-Login" - return self.approved - - @property - def total_time(self) -> timedelta: - "Total time for all stamps" - return self.yearly_time() - - def yearly_time(self, year: int | None = None) -> timedelta: - "Total time for all stamps in a year" - year = year or school_year_for_date(date.today()) - return sum( - (s.elapsed for s in self.stamps if s.event.school_year == year), - start=timedelta(), - ) - - @property - def formatted_phone_number(self) -> str: - return normalize_phone_number_from_storage(self.phone_number) - - def award_badge(self, badge: Badge): - "Assign a badge to a user" - if badge not in self.badges: - self.badges.append(badge) - db.session.commit() - - def remove_badge(self, badge: Badge): - "Remove a badge from a user" - if badge in self.badges: - self.badges.remove(badge) - db.session.commit() - - def stamps_for(self, type_: EventType, year: int | None = None): - "Get all stamps for an event type" - year = year or school_year_for_date(date.today()) - return [ - s - for s in self.stamps - if s.event.type_ == type_ and s.event.school_year == year - ] - - def total_stamps_for(self, type_: EventType, year: int | None = None) -> timedelta: - "Total time for an event type" - return sum( - (s.elapsed for s in self.stamps_for(type_, year)), - start=timedelta(), - ) - - def stamps_for_event(self, event: Event) -> list[Stamps]: - "Get all stamps for an event" - return [s for s in self.stamps if s.event == event] - - def can_view(self, user: User): - "Whether the user in question can view this user" - return ( - self.role.mentor - or self.role.admin - or (self == user) - or ( - self.guardian_user_data - and user.student_user_data - and user.student_user_data in self.guardian_user_data.students - ) - ) - - @property - def human_readable(self) -> str: - "Human readable string for display on a web page" - return f"{'*' if self.role.mentor else ''}{self.display_name}" - - @property - def display_name(self) -> str: - return self.preferred_name or self.name - - @property - def full_name(self) -> str: - if self.preferred_name and self.preferred_name != self.name.split(" ")[0]: - return f"{self.name} ({self.preferred_name})" - return self.name - - def is_signed_into(self, ev: str | Event) -> bool: - if isinstance(ev, str): - ev = Event.get_from_code(ev) - return bool(db.session.scalar(select(Active).filter_by(user=self, event=ev))) - - @property - def total_funds(self) -> str: - all_funds = [ev.raw_funds_for(self) for ev in db.session.scalars(select(Event))] - money = sum(all_funds, start=0.0) - return locale.currency(money) - - def yearly_funds(self, year: int | None = None) -> str: - year = year or school_year_for_date(date.today()) - event_funds = [ - ev.raw_funds_for(self) - for ev in db.session.scalars(select(Event).where(Event.school_year == year)) - ] - money = sum(event_funds, start=0.0) - return locale.currency(money) - - @staticmethod - def get_visible_users() -> list[User]: - return list(db.session.scalars(select(User).where(User.role.has(visible=True)))) - - @staticmethod - def make( - email: str, - name: str, - password: str, - role: Role | str, - approved=False, - subteam: Subteam | str = None, - **kwargs, - ) -> User: - "Make a user, with password and hash" - if "phone_number" in kwargs: - kwargs["phone_number"] = normalize_phone_number_for_storage( - kwargs["phone_number"] - ) - - if isinstance(role, str): - role = Role.from_name(role) - - if isinstance(subteam, str): - subteam = Subteam.from_name(subteam) - - user = User( - email=email, - name=name, - password=generate_password_hash(password), - role_id=role.id, - subteam_id=subteam.id if subteam else None, - approved=approved, - **kwargs, - ) - db.session.add(user) - db.session.flush() - return user - - @staticmethod - def make_guardian(name: str, phone_number: str, email: str): - role = Role.from_name("guardian_limited") - pn = normalize_phone_number_for_storage(phone_number) - guardian = User( - name=name, - email=email, - role_id=role.id, - phone_number=pn, - ) - db.session.add(guardian) - db.session.flush() - return guardian - - @staticmethod - def from_email(email: str) -> User | None: - "Look up user by email" - email = email.lower() - return db.session.scalar(select(User).filter(func.lower(User.email) == email)) - - @staticmethod - def from_code(user_code: str) -> User | None: - "Look up user by secret code" - return db.session.scalar(select(User).filter_by(code=user_code)) - - -class Guardian(db.Model): - """ - This table is a bit strange as it has a one to one link with a User (Parent) as well as - Many to Many links with User (Children). - - """ - - __tablename__ = "guardians" - id: Mapped[intpk] - user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) - contact_order: Mapped[int] - - # One to One: Links User to row in Guardian table - user: Mapped[User] = db.relationship(back_populates="guardian_user_data") - - # Many to Many: Links Guardian to Children - students: Mapped[list[Student]] = db.relationship( - secondary=parent_child_association_table, back_populates="guardians" - ) - - @staticmethod - def get_from( - name: str, phone_number: str, email: str, contact_order: int - ) -> Guardian: - guardian_user = db.session.scalar(select(User).where(User.email == email)) - if guardian_user: - # If we found the guardian user, then return the extra guardian data (This object/table) - return guardian_user.guardian_user_data - # Create the guardian user, and add the guardian user object - guardian = User.make_guardian(name=name, phone_number=phone_number, email=email) - guardian_user_data = Guardian(user_id=guardian.id, contact_order=contact_order) - guardian.guardian_user_data = guardian_user_data - db.session.add(guardian_user_data) - return guardian_user_data - - -class Student(db.Model): - __tablename__ = "students" - id: Mapped[intpk] - user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) - - # Extra student information - graduation_year: Mapped[int] - - # One to One: Links User to extra student information - user: Mapped[User] = db.relationship(back_populates="student_user_data") - - # Many to Many: Links Student to Guardian - guardians: Mapped[list[Guardian]] = db.relationship( - secondary=parent_child_association_table, back_populates="students" - ) - - def add_guardian(self, guardian: Guardian): - if guardian not in self.guardians: - self.guardians.append(guardian) - - def update_guardians(self, gs: FieldList): - self.guardians.clear() - # Don't use enumerate in case they skip entries for some reason - i = 0 - for guard in (guard.data for guard in gs): - if guard["name"] and guard["phone_number"] and guard["email"]: - i += 1 - self.add_guardian( - guardian=Guardian.get_from( - name=guard["name"], - phone_number=guard["phone_number"], - email=guard["email"], - contact_order=i, - ) - ) - - @property - def display_grade(self): - grades = generate_grade_choices() - if self.graduation_year in grades: - return grades[self.graduation_year] - else: - return f"Alumni (Graduated: {self.graduation_year})" - - @staticmethod - def make( - email: str, name: str, password: str, graduation_year: int, **kwargs - ) -> User: - role = Role.from_name("student") - student = User.make( - name=name, email=email, password=password, role=role, **kwargs - ) - student_user_data = Student(user_id=student.id, graduation_year=graduation_year) - - student.student_user_data = student_user_data - db.session.add(student_user_data) - return student - - -class Event(db.Model): - __tablename__ = "events" - id: Mapped[intpk] - # User-visible name - name: Mapped[str] - # Description of the event - description: Mapped[str] = mapped_column(default="") - # Unique code for tracking - code: Mapped[str] = mapped_column(unique=True, default=gen_code) - # Location the event takes place at - location: Mapped[str] - # Start time - start: Mapped[datetime] - # End time - end: Mapped[datetime] - # Event type - type_id: Mapped[int] = mapped_column(db.ForeignKey("event_types.id")) - # Whether users can register for the event - registration_open: Mapped[NonNullBool] - - # Total funds for event, in cents - funds: Mapped[int] = mapped_column(default=0) - # Total running cost for event, in cents - cost: Mapped[int] = mapped_column(default=0) - # Percentage of funds that go to the team - overhead: Mapped[float] = mapped_column(default=0.3) - - stamps: Mapped[list[Stamps]] = db.relationship( - back_populates="event", cascade="all, delete, delete-orphan" - ) - active: Mapped[list[Active]] = db.relationship( - back_populates="event", cascade="all, delete, delete-orphan" - ) - type_: Mapped[EventType] = db.relationship(back_populates="events") - blocks: Mapped[list[EventBlock]] = db.relationship( - back_populates="event", cascade="all, delete, delete-orphan" - ) - - @staticmethod - def get_from_code(event_code: str) -> Event | None: - return db.session.scalar(select(Event).filter_by(code=event_code)) - - @property - def start_local(self) -> str: - "Start time in local time zone" - return correct_time_from_storage(self.start).strftime("%c") - - @property - def end_local(self) -> str: - "End time in local time zone" - return correct_time_from_storage(self.end).strftime("%c") - - @property - def funds_human(self) -> str: - "Get the funds in a human readable format" - return locale.currency(self.funds / 100.0) - - @property - def cost_human(self) -> str: - "Get the cost in a human readable format" - return locale.currency(self.cost / 100.0) - - @hybrid_property - def net_funds(self) -> int: - "Get the net funds" - return self.funds - self.cost - - @property - def net_funds_human(self) -> str: - "Get the net funds in a human readable format" - return locale.currency(self.net_funds / 100.0) - - @property - def adjusted_start(self) -> datetime: - start = correct_time_from_storage(self.start) - return start - timedelta(minutes=current_app.config["PRE_EVENT_ACTIVE_TIME"]) - - @property - def adjusted_end(self) -> datetime: - end = correct_time_from_storage(self.end) - return end + timedelta(minutes=current_app.config["POST_EVENT_ACTIVE_TIME"]) - - @hybrid_property - def is_active(self) -> bool: - "Test for if the event is currently active" - now = datetime.now(tz=timezone.utc) - return (self.adjusted_start < now) & (now < self.adjusted_end) - - @is_active.expression - def is_active(cls): - "Usable in queries" - if db.get_engine().name == "postgresql": - pre_adj = cls.start - func.make_interval( - 0, 0, 0, 0, 0, current_app.config["PRE_EVENT_ACTIVE_TIME"] - ) - post_adj = cls.end + func.make_interval( - 0, 0, 0, 0, 0, current_app.config["POST_EVENT_ACTIVE_TIME"] - ) - elif db.get_engine().name == "sqlite": - pre_adj = func.datetime( - cls.start, - f"-{current_app.config['PRE_EVENT_ACTIVE_TIME']} minutes", - ) - post_adj = func.datetime( - cls.end, - f"+{current_app.config['POST_EVENT_ACTIVE_TIME']} minutes", - ) - return and_((pre_adj < func.now()), (post_adj > func.now())).label("is_active") - - @hybrid_property - def school_year(self) -> int: - return school_year_for_date(self.adjusted_start.date()) - - @school_year.expression - def school_year(cls): - "Usable in queries" - if db.get_engine().name == "postgresql": - adj_date = func.extract("year", cls.start + func.make_interval(0, 6)) - elif db.get_engine().name == "sqlite": - adj_date = func.datetime(cls.start, "+6 months", "year") - return adj_date.label("school_year") - - @property - def total_time(self) -> timedelta: - return sum((s.elapsed for s in self.stamps), start=timedelta()) - - def raw_funds_for(self, user: User) -> float: - "Calculate funds from an event for the given user" - if not user.role.receives_funds: - return 0.0 - user_stamps = user.stamps_for_event(self) - user_hours = sum((stamp.elapsed for stamp in user_stamps), start=timedelta()) - total_hours = sum( - (stamp.elapsed for stamp in self.stamps if stamp.user.role.receives_funds), - start=timedelta(), - ) - user_proportion = (user_hours / total_hours) if total_hours else 0.0 - return user_proportion * (1 - self.overhead) * self.net_funds / 100.0 - - def funds_for(self, user: User) -> str: - return locale.currency(self.raw_funds_for(user)) - - @property - def overhead_funds(self) -> str: - return locale.currency(self.net_funds * self.overhead / 100.0) - - def scan(self, user: User) -> StampEvent: - active: Active | None = db.session.scalar( - select(Active).filter_by(user=user, event=self) - ) - if active: - stamp = active.convert_to_stamp() - # Elapsed needs to be taken after committing to the DB - # otherwise it won't be populated - sign = f"out after {stamp.elapsed}" - return StampEvent(user.human_readable, sign) - else: - self.sign_in(user) - return StampEvent(user.human_readable, "in") - - def sign_in(self, user: User): - active = Active(user=user, event=self) - db.session.add(active) - db.session.commit() - - @staticmethod - def create( - name: str, - description: str, - location: str, - start: datetime, - end: datetime, - event_type: EventType | str, - code: int = None, - registration_open: bool = False, - ): - start = correct_time_for_storage(start) - end = correct_time_for_storage(end) - - if isinstance(event_type, str): - event_type = EventType.from_name(event_type) - - ev = Event( - name=name, - description=description, - location=location, - start=start, - end=end, - type_=event_type, - code=code, - registration_open=registration_open, - ) - db.session.add(ev) - db.session.flush() - - # Add default block for the entire event time - block = EventBlock(start=start, end=end, event_id=ev.id) - db.session.add(block) - return ev - - -class EventType(db.Model): - __tablename__ = "event_types" - id: Mapped[intpk] - name: Mapped[str] - description: Mapped[str] - autoload: Mapped[NonNullBool] - - events: Mapped[list[Event]] = db.relationship(back_populates="type_") - - @staticmethod - def from_name(name: str) -> EventType: - return db.session.scalar(select(EventType).filter_by(name=name)) - - -class Active(db.Model): - __tablename__ = "active" - id: Mapped[int] = mapped_column(primary_key=True) - user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) - event_id: Mapped[int] = mapped_column(db.ForeignKey("events.id")) - start: Mapped[datetime] = mapped_column(server_default=func.now()) - - user: Mapped[User] = db.relationship() - event: Mapped[Event] = db.relationship() - - @property - def start_local(self) -> str: - "Start time in local time zone" - return correct_time_from_storage(self.start).strftime("%c") - - def as_dict(self): - "Return a dictionary for sending to the web page" - return { - "user": self.user.human_readable, - "start": self.start, - "event": self.event.name, - } - - def convert_to_stamp(self: Active, end: datetime | None = None): - stamp = Stamps( - user=self.user, - event=self.event, - start=self.start, - ) - if end is not None: - stamp.end = end - db.session.delete(self) - db.session.add(stamp) - db.session.commit() - return stamp - - -class Stamps(db.Model): - __tablename__ = "stamps" - id: Mapped[intpk] - user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) - event_id: Mapped[int] = mapped_column(db.ForeignKey("events.id")) - start: Mapped[datetime] - end: Mapped[datetime] = mapped_column(server_default=func.now()) - - user: Mapped[User] = db.relationship(back_populates="stamps") - event: Mapped[Event] = db.relationship(back_populates="stamps") - - @hybrid_property - def elapsed(self) -> timedelta: - "Elapsed time for a stamp" - return self.end - self.start - - -class Role(db.Model): - __tablename__ = "account_types" - id: Mapped[intpk] - name: Mapped[str] - - admin: Mapped[NonNullBool] - mentor: Mapped[NonNullBool] - guardian: Mapped[NonNullBool] - can_display: Mapped[NonNullBool] - autoload: Mapped[NonNullBool] - can_see_subteam: Mapped[NonNullBool] - visible: Mapped[bool] = mapped_column(default=True) - receives_funds: Mapped[NonNullBool] - - users: Mapped[list[User]] = db.relationship(back_populates="role") - - @staticmethod - def from_name(name) -> Role: - "Get a role by name" - return db.session.scalar(select(Role).filter_by(name=name)) - - @staticmethod - def get_visible() -> list[Role]: - return db.session.scalars(select(Role).filter_by(visible=True)) - - -class Subteam(db.Model): - __tablename__ = "subteams" - id: Mapped[intpk] - name: Mapped[str] - - members: Mapped[list[User]] = db.relationship(back_populates="subteam") - - @staticmethod - def from_name(name) -> Subteam: - "Get a subteam by name" - return db.session.scalar(select(Subteam).filter_by(name=name)) - - -class EventRegistration(db.Model): - __tablename__ = "eventregistrations" - id: Mapped[intpk] - - # Link to event block - event_block_id: Mapped[int] = mapped_column(db.ForeignKey("eventblocks.id")) - event_block: Mapped[EventBlock] = db.relationship(back_populates="registrations") - # Link to user - user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) - user: Mapped[User] = db.relationship() - # User comment for event block - comment: Mapped[str] - - registered: Mapped[NonNullBool] - - @staticmethod - def upsert( - event_block_id: int, - user: User, - comment: str, - registered: bool, - ): - existing_registration = db.session.scalar( - select(EventRegistration).filter_by( - user=user, event_block_id=event_block_id - ) - ) - if existing_registration: - existing_registration.registered = registered - existing_registration.comment = comment - else: - registration = EventRegistration( - event_block_id=event_block_id, - user=user, - comment=comment, - registered=registered, - ) - db.session.add(registration) - - -class EventBlock(db.Model): - __tablename__ = "eventblocks" - id: Mapped[intpk] - - # Start Time for block - start: Mapped[datetime] - # End time for block - end: Mapped[datetime] - # Link to Event - event_id: Mapped[int] = mapped_column(db.ForeignKey("events.id")) - event: Mapped[Event] = db.relationship(back_populates="blocks") - - registrations: Mapped[list[EventRegistration]] = db.relationship( - back_populates="event_block", cascade="all, delete, delete-orphan" - ) - - @property - def start_local(self) -> str: - "Start time in local time zone" - return correct_time_from_storage(self.start).strftime("%c") - - @property - def end_local(self) -> str: - "End time in local time zone" - return correct_time_from_storage(self.end).strftime("%c") +from __future__ import annotations + +import dataclasses +import enum +import locale +import secrets +from datetime import UTC, date, datetime, timedelta +from typing import Annotated + +from flask import current_app +from flask_login import UserMixin +from flask_sqlalchemy import SQLAlchemy +from sqlalchemy import MetaData, and_, func +from sqlalchemy.ext.associationproxy import AssociationProxy, association_proxy +from sqlalchemy.ext.hybrid import hybrid_property +from sqlalchemy.future import select +from sqlalchemy.orm import Mapped, mapped_column +from werkzeug.security import generate_password_hash +from wtforms import FieldList + +from .util import ( + correct_time_for_storage, + correct_time_from_storage, + generate_grade_choices, + normalize_phone_number_for_storage, + normalize_phone_number_from_storage, +) + +convention = { + "ix": "ix_%(column_0_label)s", + "uq": "uq_%(table_name)s_%(column_0_name)s", + "ck": "ck_%(table_name)s_%(constraint_name)s", + "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", + "pk": "pk_%(table_name)s", +} + +metadata = MetaData(naming_convention=convention) + +# this variable, db, will be used for all SQLAlchemy commands +db = SQLAlchemy(metadata=metadata) + +intpk = Annotated[int, mapped_column(primary_key=True)] +NonNullBool = Annotated[bool, mapped_column(default=False)] + + +def school_year_for_date(d: date): + year = d.year + if d.month > 6: + year += 1 + return year + + +def gen_code(): + "Generate an event code" + return secrets.token_urlsafe(16) + + +def get_form_ids(model, add_null_id=False, filters=()): + prefix = [(0, "None")] if add_null_id else [] + stmt = select(model) + if filters: + stmt = stmt.where(*filters) + return prefix + [(x.id, x.name) for x in db.session.scalars(stmt)] + + +@dataclasses.dataclass +class StampEvent: + name: str + event: str + + +class ShirtSizes(enum.Enum): + Small = "Small" + Medium = "Medium" + Large = "Large" + X_Large = "X-Large" + XX_Large = "XX-Large" + XXX_Large = "XXX-Large" + + @classmethod + def get_size_names(cls): + return [(size.name, size.value) for size in cls] + + +class Pronoun(enum.Enum): + He_Him = "He/Him" + She_Her = "She/Her" + They_Them = "They/Them" + He_They = "He/They" + She_They = "She/They" + + @classmethod + def get_pronoun_options(cls): + return [(p.name, p.value) for p in cls] + + +class Badge(db.Model): + 'Represents an "achievement", accomplishment, or certification' + + __tablename__ = "badges" + id: Mapped[intpk] + name: Mapped[str] + description: Mapped[str | None] + emoji: Mapped[str | None] + icon: Mapped[str | None] + color: Mapped[str] = mapped_column(default="black") + + awards: Mapped[list[BadgeAward]] = db.relationship(back_populates="badge") + + @staticmethod + def from_name(name) -> Badge: + "Get a badge by name" + return db.session.scalar(select(Badge).filter_by(name=name)) + + +class BadgeAward(db.Model): + "Represents a pairing of user to badge, with received date" + + __tablename__ = "badge_awards" + user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id"), primary_key=True) + badge_id: Mapped[int] = mapped_column(db.ForeignKey("badges.id"), primary_key=True) + received: Mapped[datetime] = mapped_column(server_default=func.now()) + + owner: Mapped[User] = db.relationship(back_populates="awards", uselist=False) + badge: Mapped[Badge] = db.relationship() + + def __init__(self, badge=None, owner=None): + self.owner = owner + self.badge = badge + + +parent_child_association_table = db.Table( + "parent_child_association", + db.metadata, + db.Column("guardians", db.ForeignKey("guardians.id"), primary_key=True), + db.Column("user_id", db.ForeignKey("students.id"), primary_key=True), +) + + +class User(UserMixin, db.Model): + __tablename__ = "users" + id: Mapped[intpk] + email: Mapped[str] = mapped_column(unique=True) + name: Mapped[str] + preferred_name: Mapped[str | None] + password: Mapped[str | None] + subteam_id: Mapped[int | None] = mapped_column(db.ForeignKey("subteams.id")) + phone_number: Mapped[str | None] + address: Mapped[str | None] + tshirt_size: Mapped[ShirtSizes | None] + pronouns: Mapped[Pronoun | None] + + code: Mapped[str] = mapped_column(unique=True, default=gen_code) + role_id: Mapped[int] = mapped_column(db.ForeignKey("account_types.id")) + approved: Mapped[NonNullBool] + + stamps: Mapped[list[Stamps]] = db.relationship( + "Stamps", + back_populates="user", + cascade="all, delete, delete-orphan", + ) + role: Mapped[Role] = db.relationship(back_populates="users") + subteam: Mapped[Subteam] = db.relationship(back_populates="members") + + awards: Mapped[list[BadgeAward]] = db.relationship( + back_populates="owner", cascade="all, delete-orphan" + ) + badges: AssociationProxy[list[Badge]] = association_proxy("awards", "badge") + + # Guardian specific data + guardian_user_data: Mapped[Guardian | None] = db.relationship( + back_populates="user", + cascade="all, delete, delete-orphan", + ) + + # Student specific data + student_user_data: Mapped[Student | None] = db.relationship( + back_populates="user", + cascade="all, delete, delete-orphan", + ) + + @hybrid_property + def is_active(self) -> bool: + "Required by Flask-Login" + return self.approved + + @property + def total_time(self) -> timedelta: + "Total time for all stamps" + return self.yearly_time() + + def yearly_time(self, year: int | None = None) -> timedelta: + "Total time for all stamps in a year" + year = year or school_year_for_date(date.today()) + return sum( + (s.elapsed for s in self.stamps if s.event.school_year == year), + start=timedelta(), + ) + + @property + def formatted_phone_number(self) -> str: + return normalize_phone_number_from_storage(self.phone_number) + + def award_badge(self, badge: Badge): + "Assign a badge to a user" + if badge not in self.badges: + self.badges.append(badge) + db.session.commit() + + def remove_badge(self, badge: Badge): + "Remove a badge from a user" + if badge in self.badges: + self.badges.remove(badge) + db.session.commit() + + def stamps_for(self, type_: EventType, year: int | None = None): + "Get all stamps for an event type" + year = year or school_year_for_date(date.today()) + return [s for s in self.stamps if s.event.type_ == type_ and s.event.school_year == year] + + def total_stamps_for(self, type_: EventType, year: int | None = None) -> timedelta: + "Total time for an event type" + return sum( + (s.elapsed for s in self.stamps_for(type_, year)), + start=timedelta(), + ) + + def stamps_for_event(self, event: Event) -> list[Stamps]: + "Get all stamps for an event" + return [s for s in self.stamps if s.event == event] + + def can_view(self, user: User): + "Whether the user in question can view this user" + return ( + self.role.mentor + or self.role.admin + or (self == user) + or ( + self.guardian_user_data + and user.student_user_data + and user.student_user_data in self.guardian_user_data.students + ) + ) + + @property + def human_readable(self) -> str: + "Human readable string for display on a web page" + return f"{'*' if self.role.mentor else ''}{self.display_name}" + + @property + def display_name(self) -> str: + return self.preferred_name or self.name + + @property + def full_name(self) -> str: + if self.preferred_name and self.preferred_name != self.name.split(" ")[0]: + return f"{self.name} ({self.preferred_name})" + return self.name + + def is_signed_into(self, ev: str | Event) -> bool: + if isinstance(ev, str): + ev = Event.get_from_code(ev) + return bool(db.session.scalar(select(Active).filter_by(user=self, event=ev))) + + @property + def total_funds(self) -> str: + all_funds = [ev.raw_funds_for(self) for ev in db.session.scalars(select(Event))] + money = sum(all_funds, start=0.0) + return locale.currency(money) + + def yearly_funds(self, year: int | None = None) -> str: + year = year or school_year_for_date(date.today()) + event_funds = [ + ev.raw_funds_for(self) + for ev in db.session.scalars(select(Event).where(Event.school_year == year)) + ] + money = sum(event_funds, start=0.0) + return locale.currency(money) + + @staticmethod + def get_visible_users() -> list[User]: + return list(db.session.scalars(select(User).where(User.role.has(visible=True)))) + + @staticmethod + def make( + email: str, + name: str, + password: str, + role: Role | str, + approved=False, + subteam: Subteam | str = None, + **kwargs, + ) -> User: + "Make a user, with password and hash" + if "phone_number" in kwargs: + kwargs["phone_number"] = normalize_phone_number_for_storage(kwargs["phone_number"]) + + if isinstance(role, str): + role = Role.from_name(role) + + if isinstance(subteam, str): + subteam = Subteam.from_name(subteam) + + user = User( + email=email, + name=name, + password=generate_password_hash(password), + role_id=role.id, + subteam_id=subteam.id if subteam else None, + approved=approved, + **kwargs, + ) + db.session.add(user) + db.session.flush() + return user + + @staticmethod + def make_guardian(name: str, phone_number: str, email: str): + role = Role.from_name("guardian_limited") + pn = normalize_phone_number_for_storage(phone_number) + guardian = User( + name=name, + email=email, + role_id=role.id, + phone_number=pn, + ) + db.session.add(guardian) + db.session.flush() + return guardian + + @staticmethod + def from_email(email: str) -> User | None: + "Look up user by email" + email = email.lower() + return db.session.scalar(select(User).filter(func.lower(User.email) == email)) + + @staticmethod + def from_code(user_code: str) -> User | None: + "Look up user by secret code" + return db.session.scalar(select(User).filter_by(code=user_code)) + + +class Guardian(db.Model): + """ + This table is a bit strange as it has a one to one link with a User (Parent) as well as + Many to Many links with User (Children). + + """ + + __tablename__ = "guardians" + id: Mapped[intpk] + user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) + contact_order: Mapped[int] + + # One to One: Links User to row in Guardian table + user: Mapped[User] = db.relationship(back_populates="guardian_user_data") + + # Many to Many: Links Guardian to Children + students: Mapped[list[Student]] = db.relationship( + secondary=parent_child_association_table, back_populates="guardians" + ) + + @staticmethod + def get_from(name: str, phone_number: str, email: str, contact_order: int) -> Guardian: + guardian_user = db.session.scalar(select(User).where(User.email == email)) + if guardian_user: + # If we found the guardian user, then return the extra guardian data (This object/table) + return guardian_user.guardian_user_data + # Create the guardian user, and add the guardian user object + guardian = User.make_guardian(name=name, phone_number=phone_number, email=email) + guardian_user_data = Guardian(user_id=guardian.id, contact_order=contact_order) + guardian.guardian_user_data = guardian_user_data + db.session.add(guardian_user_data) + return guardian_user_data + + +class Student(db.Model): + __tablename__ = "students" + id: Mapped[intpk] + user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) + + # Extra student information + graduation_year: Mapped[int] + + # One to One: Links User to extra student information + user: Mapped[User] = db.relationship(back_populates="student_user_data") + + # Many to Many: Links Student to Guardian + guardians: Mapped[list[Guardian]] = db.relationship( + secondary=parent_child_association_table, back_populates="students" + ) + + def add_guardian(self, guardian: Guardian): + if guardian not in self.guardians: + self.guardians.append(guardian) + + def update_guardians(self, gs: FieldList): + self.guardians.clear() + # Don't use enumerate in case they skip entries for some reason + i = 0 + for guard in (guard.data for guard in gs): + if guard["name"] and guard["phone_number"] and guard["email"]: + i += 1 + self.add_guardian( + guardian=Guardian.get_from( + name=guard["name"], + phone_number=guard["phone_number"], + email=guard["email"], + contact_order=i, + ) + ) + + @property + def display_grade(self): + grades = generate_grade_choices() + if self.graduation_year in grades: + return grades[self.graduation_year] + else: + return f"Alumni (Graduated: {self.graduation_year})" + + @staticmethod + def make(email: str, name: str, password: str, graduation_year: int, **kwargs) -> User: + role = Role.from_name("student") + student = User.make(name=name, email=email, password=password, role=role, **kwargs) + student_user_data = Student(user_id=student.id, graduation_year=graduation_year) + + student.student_user_data = student_user_data + db.session.add(student_user_data) + return student + + +class Event(db.Model): + __tablename__ = "events" + id: Mapped[intpk] + # User-visible name + name: Mapped[str] + # Description of the event + description: Mapped[str] = mapped_column(default="") + # Unique code for tracking + code: Mapped[str] = mapped_column(unique=True, default=gen_code) + # Location the event takes place at + location: Mapped[str] + # Start time + start: Mapped[datetime] + # End time + end: Mapped[datetime] + # Event type + type_id: Mapped[int] = mapped_column(db.ForeignKey("event_types.id")) + # Whether users can register for the event + registration_open: Mapped[NonNullBool] + + # Total funds for event, in cents + funds: Mapped[int] = mapped_column(default=0) + # Total running cost for event, in cents + cost: Mapped[int] = mapped_column(default=0) + # Percentage of funds that go to the team + overhead: Mapped[float] = mapped_column(default=0.3) + + stamps: Mapped[list[Stamps]] = db.relationship( + back_populates="event", cascade="all, delete, delete-orphan" + ) + active: Mapped[list[Active]] = db.relationship( + back_populates="event", cascade="all, delete, delete-orphan" + ) + type_: Mapped[EventType] = db.relationship(back_populates="events") + blocks: Mapped[list[EventBlock]] = db.relationship( + back_populates="event", cascade="all, delete, delete-orphan" + ) + + @staticmethod + def get_from_code(event_code: str) -> Event | None: + return db.session.scalar(select(Event).filter_by(code=event_code)) + + @property + def start_local(self) -> str: + "Start time in local time zone" + return correct_time_from_storage(self.start).strftime("%c") + + @property + def end_local(self) -> str: + "End time in local time zone" + return correct_time_from_storage(self.end).strftime("%c") + + @property + def funds_human(self) -> str: + "Get the funds in a human readable format" + return locale.currency(self.funds / 100.0) + + @property + def cost_human(self) -> str: + "Get the cost in a human readable format" + return locale.currency(self.cost / 100.0) + + @hybrid_property + def net_funds(self) -> int: + "Get the net funds" + return self.funds - self.cost + + @property + def net_funds_human(self) -> str: + "Get the net funds in a human readable format" + return locale.currency(self.net_funds / 100.0) + + @property + def adjusted_start(self) -> datetime: + start = correct_time_from_storage(self.start) + return start - timedelta(minutes=current_app.config["PRE_EVENT_ACTIVE_TIME"]) + + @property + def adjusted_end(self) -> datetime: + end = correct_time_from_storage(self.end) + return end + timedelta(minutes=current_app.config["POST_EVENT_ACTIVE_TIME"]) + + @hybrid_property + def is_active(self) -> bool: + "Test for if the event is currently active" + now = datetime.now(tz=UTC) + return (self.adjusted_start < now) & (now < self.adjusted_end) + + @is_active.expression + def is_active(cls): + "Usable in queries" + if db.get_engine().name == "postgresql": + pre_adj = cls.start - func.make_interval( + 0, 0, 0, 0, 0, current_app.config["PRE_EVENT_ACTIVE_TIME"] + ) + post_adj = cls.end + func.make_interval( + 0, 0, 0, 0, 0, current_app.config["POST_EVENT_ACTIVE_TIME"] + ) + elif db.get_engine().name == "sqlite": + pre_adj = func.datetime( + cls.start, + f"-{current_app.config['PRE_EVENT_ACTIVE_TIME']} minutes", + ) + post_adj = func.datetime( + cls.end, + f"+{current_app.config['POST_EVENT_ACTIVE_TIME']} minutes", + ) + return and_((pre_adj < func.now()), (post_adj > func.now())).label("is_active") + + @hybrid_property + def school_year(self) -> int: + return school_year_for_date(self.adjusted_start.date()) + + @school_year.expression + def school_year(cls): + "Usable in queries" + if db.get_engine().name == "postgresql": + adj_date = func.extract("year", cls.start + func.make_interval(0, 6)) + elif db.get_engine().name == "sqlite": + adj_date = func.datetime(cls.start, "+6 months", "year") + return adj_date.label("school_year") + + @property + def total_time(self) -> timedelta: + return sum((s.elapsed for s in self.stamps), start=timedelta()) + + def raw_funds_for(self, user: User) -> float: + "Calculate funds from an event for the given user" + if not user.role.receives_funds: + return 0.0 + user_stamps = user.stamps_for_event(self) + user_hours = sum((stamp.elapsed for stamp in user_stamps), start=timedelta()) + total_hours = sum( + (stamp.elapsed for stamp in self.stamps if stamp.user.role.receives_funds), + start=timedelta(), + ) + user_proportion = (user_hours / total_hours) if total_hours else 0.0 + return user_proportion * (1 - self.overhead) * self.net_funds / 100.0 + + def funds_for(self, user: User) -> str: + return locale.currency(self.raw_funds_for(user)) + + @property + def overhead_funds(self) -> str: + return locale.currency(self.net_funds * self.overhead / 100.0) + + def scan(self, user: User) -> StampEvent: + active: Active | None = db.session.scalar(select(Active).filter_by(user=user, event=self)) + if active: + stamp = active.convert_to_stamp() + # Elapsed needs to be taken after committing to the DB + # otherwise it won't be populated + sign = f"out after {stamp.elapsed}" + return StampEvent(user.human_readable, sign) + else: + self.sign_in(user) + return StampEvent(user.human_readable, "in") + + def sign_in(self, user: User): + active = Active(user=user, event=self) + db.session.add(active) + db.session.commit() + + @staticmethod + def create( + name: str, + description: str, + location: str, + start: datetime, + end: datetime, + event_type: EventType | str, + code: int = None, + registration_open: bool = False, + ): + start = correct_time_for_storage(start) + end = correct_time_for_storage(end) + + if isinstance(event_type, str): + event_type = EventType.from_name(event_type) + + ev = Event( + name=name, + description=description, + location=location, + start=start, + end=end, + type_=event_type, + code=code, + registration_open=registration_open, + ) + db.session.add(ev) + db.session.flush() + + # Add default block for the entire event time + block = EventBlock(start=start, end=end, event_id=ev.id) + db.session.add(block) + return ev + + +class EventType(db.Model): + __tablename__ = "event_types" + id: Mapped[intpk] + name: Mapped[str] + description: Mapped[str] + autoload: Mapped[NonNullBool] + + events: Mapped[list[Event]] = db.relationship(back_populates="type_") + + @staticmethod + def from_name(name: str) -> EventType: + return db.session.scalar(select(EventType).filter_by(name=name)) + + +class Active(db.Model): + __tablename__ = "active" + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) + event_id: Mapped[int] = mapped_column(db.ForeignKey("events.id")) + start: Mapped[datetime] = mapped_column(server_default=func.now()) + + user: Mapped[User] = db.relationship() + event: Mapped[Event] = db.relationship() + + @property + def start_local(self) -> str: + "Start time in local time zone" + return correct_time_from_storage(self.start).strftime("%c") + + def as_dict(self): + "Return a dictionary for sending to the web page" + return { + "user": self.user.human_readable, + "start": self.start, + "event": self.event.name, + } + + def convert_to_stamp(self: Active, end: datetime | None = None): + stamp = Stamps( + user=self.user, + event=self.event, + start=self.start, + ) + if end is not None: + stamp.end = end + db.session.delete(self) + db.session.add(stamp) + db.session.commit() + return stamp + + +class Stamps(db.Model): + __tablename__ = "stamps" + id: Mapped[intpk] + user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) + event_id: Mapped[int] = mapped_column(db.ForeignKey("events.id")) + start: Mapped[datetime] + end: Mapped[datetime] = mapped_column(server_default=func.now()) + + user: Mapped[User] = db.relationship(back_populates="stamps") + event: Mapped[Event] = db.relationship(back_populates="stamps") + + @hybrid_property + def elapsed(self) -> timedelta: + "Elapsed time for a stamp" + return self.end - self.start + + +class Role(db.Model): + __tablename__ = "account_types" + id: Mapped[intpk] + name: Mapped[str] + + admin: Mapped[NonNullBool] + mentor: Mapped[NonNullBool] + guardian: Mapped[NonNullBool] + can_display: Mapped[NonNullBool] + autoload: Mapped[NonNullBool] + can_see_subteam: Mapped[NonNullBool] + visible: Mapped[bool] = mapped_column(default=True) + receives_funds: Mapped[NonNullBool] + + users: Mapped[list[User]] = db.relationship(back_populates="role") + + @staticmethod + def from_name(name) -> Role: + "Get a role by name" + return db.session.scalar(select(Role).filter_by(name=name)) + + @staticmethod + def get_visible() -> list[Role]: + return db.session.scalars(select(Role).filter_by(visible=True)) + + +class Subteam(db.Model): + __tablename__ = "subteams" + id: Mapped[intpk] + name: Mapped[str] + + members: Mapped[list[User]] = db.relationship(back_populates="subteam") + + @staticmethod + def from_name(name) -> Subteam: + "Get a subteam by name" + return db.session.scalar(select(Subteam).filter_by(name=name)) + + +class EventRegistration(db.Model): + __tablename__ = "eventregistrations" + id: Mapped[intpk] + + # Link to event block + event_block_id: Mapped[int] = mapped_column(db.ForeignKey("eventblocks.id")) + event_block: Mapped[EventBlock] = db.relationship(back_populates="registrations") + # Link to user + user_id: Mapped[int] = mapped_column(db.ForeignKey("users.id")) + user: Mapped[User] = db.relationship() + # User comment for event block + comment: Mapped[str] + + registered: Mapped[NonNullBool] + + @staticmethod + def upsert( + event_block_id: int, + user: User, + comment: str, + registered: bool, + ): + existing_registration = db.session.scalar( + select(EventRegistration).filter_by(user=user, event_block_id=event_block_id) + ) + if existing_registration: + existing_registration.registered = registered + existing_registration.comment = comment + else: + registration = EventRegistration( + event_block_id=event_block_id, + user=user, + comment=comment, + registered=registered, + ) + db.session.add(registration) + + +class EventBlock(db.Model): + __tablename__ = "eventblocks" + id: Mapped[intpk] + + # Start Time for block + start: Mapped[datetime] + # End time for block + end: Mapped[datetime] + # Link to Event + event_id: Mapped[int] = mapped_column(db.ForeignKey("events.id")) + event: Mapped[Event] = db.relationship(back_populates="blocks") + + registrations: Mapped[list[EventRegistration]] = db.relationship( + back_populates="event_block", cascade="all, delete, delete-orphan" + ) + + @property + def start_local(self) -> str: + "Start time in local time zone" + return correct_time_from_storage(self.start).strftime("%c") + + @property + def end_local(self) -> str: + "End time in local time zone" + return correct_time_from_storage(self.end).strftime("%c") diff --git a/signinapp/proxy.py b/signinapp/proxy.py index c8de547..ca5f3e7 100644 --- a/signinapp/proxy.py +++ b/signinapp/proxy.py @@ -1,68 +1,68 @@ -from flask import Blueprint, Flask, current_app, request, Response -from flask_login import current_user, login_required -import requests - -bp = Blueprint("kanboard", __name__, url_prefix="/kanboard") - - -def calculate_headers(headers: dict): - new_headers = dict() - for header_name in [ - "cookie", - "X-Requested-With", - ]: - if header_name in headers: - new_headers[header_name] = headers[header_name] - new_headers.update( - { - "REMOTE-USER": current_user.email, - "REMOTE-EMAIL": current_user.email, - "REMOTE-NAME": current_user.display_name, - } - ) - return new_headers - - -@bp.route("/", methods=["GET", "POST"]) -@bp.route("/", methods=["GET", "POST"]) -@login_required -def index(path=""): - url = current_app.config.get("PROXY_URL") + path - headers = calculate_headers(request.headers) - - if request.method == "GET": - resp = requests.get(url, params=dict(request.args), headers=headers) - elif request.method == "POST": - # If the content type isn't form-data then copy from data - # Need to use find here since the content type looks like: - # 'multipart/form-data; boundary=---------------------------5113784293436132453515092771' - if not request.content_type.startswith( - "multipart/form-data" - ) and not request.content_type.startswith("application/x-www-form-urlencoded"): - data = request.data - headers["Content-Type"] = request.content_type - else: - data = request.form - resp = requests.post( - url, - data=data, - params=dict(request.args), - headers=headers, - ) - excluded_headers = [ - "content-encoding", - "content-length", - "transfer-encoding", - "connection", - ] - headers = [ - (name, value) - for (name, value) in resp.raw.headers.items() - if name.lower() not in excluded_headers - ] - response = Response(resp.content, resp.status_code, headers) - return response - - -def init_app(app: Flask): - app.register_blueprint(bp) +import requests +from flask import Blueprint, Flask, Response, current_app, request +from flask_login import current_user, login_required + +bp = Blueprint("kanboard", __name__, url_prefix="/kanboard") + + +def calculate_headers(headers: dict): + new_headers = dict() + for header_name in [ + "cookie", + "X-Requested-With", + ]: + if header_name in headers: + new_headers[header_name] = headers[header_name] + new_headers.update( + { + "REMOTE-USER": current_user.email, + "REMOTE-EMAIL": current_user.email, + "REMOTE-NAME": current_user.display_name, + } + ) + return new_headers + + +@bp.route("/", methods=["GET", "POST"]) +@bp.route("/", methods=["GET", "POST"]) +@login_required +def index(path=""): + url = current_app.config.get("PROXY_URL") + path + headers = calculate_headers(request.headers) + + if request.method == "GET": + resp = requests.get(url, params=dict(request.args), headers=headers) + elif request.method == "POST": + # If the content type isn't form-data then copy from data + # Need to use find here since the content type looks like: + # 'multipart/form-data; boundary=---------------------------5113784293436132453515092771' + if not request.content_type.startswith( + "multipart/form-data" + ) and not request.content_type.startswith("application/x-www-form-urlencoded"): + data = request.data + headers["Content-Type"] = request.content_type + else: + data = request.form + resp = requests.post( + url, + data=data, + params=dict(request.args), + headers=headers, + ) + excluded_headers = [ + "content-encoding", + "content-length", + "transfer-encoding", + "connection", + ] + headers = [ + (name, value) + for (name, value) in resp.raw.headers.items() + if name.lower() not in excluded_headers + ] + response = Response(resp.content, resp.status_code, headers) + return response + + +def init_app(app: Flask): + app.register_blueprint(bp) diff --git a/signinapp/qr.py b/signinapp/qr.py index d033b49..7d01d1b 100644 --- a/signinapp/qr.py +++ b/signinapp/qr.py @@ -7,17 +7,13 @@ @qr.route("/register/qr") def register_qr(): url = request.host_url - return render_template( - "qr.html.jinja2", register_url=f"{url}{url_for('auth.register')}" - ) + return render_template("qr.html.jinja2", register_url=f"{url}{url_for('auth.register')}") @qr.route("/register/mentor/qr") def register_mentor_qr(): url = request.host_url - return render_template( - "qr.html.jinja2", register_url=f"{url}{url_for('auth.register_mentor')}" - ) + return render_template("qr.html.jinja2", register_url=f"{url}{url_for('auth.register_mentor')}") @qr.route("/register/guardian/qr") diff --git a/signinapp/search.py b/signinapp/search.py index 1da8aab..a1f0609 100644 --- a/signinapp/search.py +++ b/signinapp/search.py @@ -1,41 +1,41 @@ -from flask import Blueprint, Flask -from flask.templating import render_template -from flask_wtf import FlaskForm -from wtforms import BooleanField, SelectField, SubmitField - -from .model import Badge, EventType, Role, Subteam, User, db, get_form_ids -from .util import MultiCheckboxField, mentor_required - -search = Blueprint("search", __name__) - - -class HoursForm(FlaskForm): - role = MultiCheckboxField() - category = SelectField(choices=lambda: get_form_ids(EventType)) - submit = SubmitField() - - -@search.route("/search/hours", methods=["GET", "POST"]) -@mentor_required -def hours(): - form = HoursForm() - form.role.choices = [r.name for r in Role.get_visible()] - - if form.validate_on_submit(): - users = User.get_visible_users() - event_type = db.session.get(EventType, form.category.data) - roles = [Role.from_name(r).id for r in form.role.data] - results = sorted( - [ - (u.display_name, u.total_stamps_for(event_type)) - for u in users - if u.role_id in roles and u.total_time and u.approved - ] - ) - return render_template("search/hours.html.jinja2", form=form, results=results) - - return render_template("search/hours.html.jinja2", form=form, results=None) - - -def init_app(app: Flask): - app.register_blueprint(search) +from flask import Blueprint, Flask +from flask.templating import render_template +from flask_wtf import FlaskForm +from wtforms import SelectField, SubmitField + +from .model import EventType, Role, User, db, get_form_ids +from .util import MultiCheckboxField, mentor_required + +search = Blueprint("search", __name__) + + +class HoursForm(FlaskForm): + role = MultiCheckboxField() + category = SelectField(choices=lambda: get_form_ids(EventType)) + submit = SubmitField() + + +@search.route("/search/hours", methods=["GET", "POST"]) +@mentor_required +def hours(): + form = HoursForm() + form.role.choices = [r.name for r in Role.get_visible()] + + if form.validate_on_submit(): + users = User.get_visible_users() + event_type = db.session.get(EventType, form.category.data) + roles = [Role.from_name(r).id for r in form.role.data] + results = sorted( + [ + (u.display_name, u.total_stamps_for(event_type)) + for u in users + if u.role_id in roles and u.total_time and u.approved + ] + ) + return render_template("search/hours.html.jinja2", form=form, results=results) + + return render_template("search/hours.html.jinja2", form=form, results=None) + + +def init_app(app: Flask): + app.register_blueprint(search) diff --git a/signinapp/team.py b/signinapp/team.py index c6f8e4a..be8e20e 100644 --- a/signinapp/team.py +++ b/signinapp/team.py @@ -54,18 +54,14 @@ def list_students(): @team.route("/users/guardians") @mentor_required def list_guardians(): - users = db.session.scalars( - select(User).where(User.role.has(guardian=True)).order_by(User.name) - ) + users = db.session.scalars(select(User).where(User.role.has(guardian=True)).order_by(User.name)) return render_template("user_list.html.jinja2", role="Guardian", users=users) @team.route("/users/mentors") @mentor_required def list_mentors(): - users = db.session.scalars( - select(User).where(User.role.has(mentor=True)).order_by(User.name) - ) + users = db.session.scalars(select(User).where(User.role.has(mentor=True)).order_by(User.name)) return render_template("user_list.html.jinja2", role="Mentor", users=users) diff --git a/signinapp/user.py b/signinapp/user.py index e803c9c..8ffd58d 100644 --- a/signinapp/user.py +++ b/signinapp/user.py @@ -1,32 +1,32 @@ -from flask import Blueprint, Flask, redirect, url_for, flash -from flask.templating import render_template -from flask_login import current_user, login_required -from sqlalchemy.future import select - -from .model import EventType, User, db - -user = Blueprint("user", __name__) - - -@user.route("/profile/") -@login_required -def profile_self(): - return redirect(url_for("user.profile", email=current_user.email)) - - -@user.route("/profile/") -@login_required -def profile(email): - user = User.from_email(email) - if not user: - flash("Invalid user for profile") - return redirect(url_for("index")) - if not current_user.can_view(user): - flash(f"No access to view user data for {user.name}") - return redirect(url_for("index")) - event_types = db.session.scalars(select(EventType)) - return render_template("profile.html.jinja2", user=user, event_types=event_types) - - -def init_app(app: Flask): - app.register_blueprint(user) +from flask import Blueprint, Flask, flash, redirect, url_for +from flask.templating import render_template +from flask_login import current_user, login_required +from sqlalchemy.future import select + +from .model import EventType, User, db + +user = Blueprint("user", __name__) + + +@user.route("/profile/") +@login_required +def profile_self(): + return redirect(url_for("user.profile", email=current_user.email)) + + +@user.route("/profile/") +@login_required +def profile(email): + user = User.from_email(email) + if not user: + flash("Invalid user for profile") + return redirect(url_for("index")) + if not current_user.can_view(user): + flash(f"No access to view user data for {user.name}") + return redirect(url_for("index")) + event_types = db.session.scalars(select(EventType)) + return render_template("profile.html.jinja2", user=user, event_types=event_types) + + +def init_app(app: Flask): + app.register_blueprint(user) diff --git a/signinapp/util.py b/signinapp/util.py index 8f0e71b..a11c48a 100644 --- a/signinapp/util.py +++ b/signinapp/util.py @@ -1,82 +1,80 @@ -from datetime import datetime, timezone, date -from functools import wraps -import string -from zoneinfo import ZoneInfo - -from flask import current_app, redirect, request, url_for -from flask_login import current_user -from flask_login.config import EXEMPT_METHODS -from wtforms import SelectMultipleField, widgets - - -class MultiCheckboxField(SelectMultipleField): - widget = widgets.ListWidget(prefix_label=False) - option_widget = widgets.CheckboxInput() - - -def permission_required(perm): - def wrapper(func): - @wraps(func) - def decorated_view(*args, **kwargs): - if request.method in EXEMPT_METHODS or current_app.config.get( - "LOGIN_DISABLED" - ): - pass - elif not current_user.is_authenticated: - return current_app.login_manager.unauthorized() - elif not perm(current_user): - return redirect(url_for("auth.forbidden")) - try: - # current_app.ensure_sync available in Flask >= 2.0 - return current_app.ensure_sync(func)(*args, **kwargs) - except AttributeError: - return func(*args, **kwargs) - - return decorated_view - - return wrapper - - -admin_required = permission_required(lambda u: u.role.admin) -mentor_required = permission_required(lambda u: u.role.mentor) - - -def correct_time_for_storage(time: datetime) -> datetime: - # Check if datetime has a tz already: - if time.tzinfo is None: - # If not, set TZ to ET - time = time.replace(tzinfo=ZoneInfo(current_app.config["TIME_ZONE"])) - return time.astimezone(timezone.utc) - - -def correct_time_from_storage(time: datetime) -> datetime: - # Check if datetime has a tz already: - if time.tzinfo is None: - # If not, set TZ to ET - time = time.replace(tzinfo=timezone.utc) - return time.astimezone(ZoneInfo(current_app.config["TIME_ZONE"])) - - -def normalize_phone_number_for_storage(number: str): - """Remove all extra whitespace and characters from a phone number""" - return "".join(c for c in (number or "").strip() if c in string.digits) - - -def normalize_phone_number_from_storage(number: str): - """Format the phone number for display""" - return f"({number[0:3]}) {number[3:6]}-{number[6:10]}" if number else "" - - -def generate_grade_choices(): - today = date.today() - this_grad_year = today.year - # If it's past June then graduation is a year from now - if today.month > 6: - this_grad_year += 1 - - return { - this_grad_year + 3: f"Freshman ({this_grad_year+3})", - this_grad_year + 2: f"Sophomore ({this_grad_year+2})", - this_grad_year + 1: f"Junior ({this_grad_year+1})", - this_grad_year: f"Senior ({this_grad_year})", - } +import string +from datetime import UTC, date, datetime +from functools import wraps +from zoneinfo import ZoneInfo + +from flask import current_app, redirect, request, url_for +from flask_login import current_user +from flask_login.config import EXEMPT_METHODS +from wtforms import SelectMultipleField, widgets + + +class MultiCheckboxField(SelectMultipleField): + widget = widgets.ListWidget(prefix_label=False) + option_widget = widgets.CheckboxInput() + + +def permission_required(perm): + def wrapper(func): + @wraps(func) + def decorated_view(*args, **kwargs): + if request.method in EXEMPT_METHODS or current_app.config.get("LOGIN_DISABLED"): + pass + elif not current_user.is_authenticated: + return current_app.login_manager.unauthorized() + elif not perm(current_user): + return redirect(url_for("auth.forbidden")) + try: + # current_app.ensure_sync available in Flask >= 2.0 + return current_app.ensure_sync(func)(*args, **kwargs) + except AttributeError: + return func(*args, **kwargs) + + return decorated_view + + return wrapper + + +admin_required = permission_required(lambda u: u.role.admin) +mentor_required = permission_required(lambda u: u.role.mentor) + + +def correct_time_for_storage(time: datetime) -> datetime: + # Check if datetime has a tz already: + if time.tzinfo is None: + # If not, set TZ to ET + time = time.replace(tzinfo=ZoneInfo(current_app.config["TIME_ZONE"])) + return time.astimezone(UTC) + + +def correct_time_from_storage(time: datetime) -> datetime: + # Check if datetime has a tz already: + if time.tzinfo is None: + # If not, set TZ to ET + time = time.replace(tzinfo=UTC) + return time.astimezone(ZoneInfo(current_app.config["TIME_ZONE"])) + + +def normalize_phone_number_for_storage(number: str): + """Remove all extra whitespace and characters from a phone number""" + return "".join(c for c in (number or "").strip() if c in string.digits) + + +def normalize_phone_number_from_storage(number: str): + """Format the phone number for display""" + return f"({number[0:3]}) {number[3:6]}-{number[6:10]}" if number else "" + + +def generate_grade_choices(): + today = date.today() + this_grad_year = today.year + # If it's past June then graduation is a year from now + if today.month > 6: + this_grad_year += 1 + + return { + this_grad_year + 3: f"Freshman ({this_grad_year+3})", + this_grad_year + 2: f"Sophomore ({this_grad_year+2})", + this_grad_year + 1: f"Junior ({this_grad_year+1})", + this_grad_year: f"Senior ({this_grad_year})", + } diff --git a/signinapp/webapp.py b/signinapp/webapp.py index bd7e319..307e00c 100644 --- a/signinapp/webapp.py +++ b/signinapp/webapp.py @@ -54,9 +54,7 @@ def trim_stamps_command(): stamp.start = start_time end_time = stamp.event.adjusted_end if stamp.end > end_time: - click.echo( - f"Adjusting end stamp for event {stamp.id} from {stamp.end} to {end_time}" - ) + click.echo(f"Adjusting end stamp for event {stamp.id} from {stamp.end} to {end_time}") stamp.end = end_time db.session.commit()