diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a43bbc --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +config/bash/env.sh +**/.ipynb* +tools/ diff --git a/README.md b/README.md index 0422536..652a3f6 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,65 @@ -# insight-crystal-project -Crystal Database +# Crystal-Base +# Table of Contents +1. [Protein Crystallization Challenges](README.md#Protein-Crystallization-Challenges) +2. [Dataset](README.md#Dataset) +3. [Architecture](README.md#Architecture) +4. [Web App](README.md#Web-App) + +## Protein Crystallization Challenges + +Crystal-Base is an image classification pipeline that reports whether or not an image contains a protein crystal. Crystal-Base caters towards both academic and industrial researchers who are running large scale HTS protein crystallization projects who do not want to spend time on the mundane task of identifying possible protein crystals from their crystallization screens. + +![Image of Protein Crystal Screen](images/Crystal-Screen.png) + +## Dataset + +All protein crystal data was obtained from the [Marco Database](https://marco.ccr.buffalo.edu/) + +## Architecture +![Image of Pipeline](images/Pipeline.png) + +### Setting up AWS + +Crystal-base uses **pegasus** to setup AWS clusters with configurations in **yaml** files. + +Run `./main.sh --setup-pegasus` to install pegasus. + +Run `./main.sh --setup-config` to setup the bash environment + +Run `./main.sh --setup-database` to setup a Postgres database. + +Run `./main.sh --setup-hadoop` to setup a hadoop cluster. + +Run `./main.sh --setup-spark` to setup a spark cluster + +Run `./main.sh --setup-web-server` to setup a web server. + +### Ingestion + +Crystal base ingests files from the [Marco Database](https://marco.ccr.buffalo.edu/) using **bash** and an EC2 instance to an S3 bucket. + +Run `source src/bash/ingestMarcoFiles.sh && ingestMarcosFiles` to ingest files + +### Training + +Crystal-base uses transfer learning [inceptionv3](https://www.tensorflow.org/tutorials/images/image_recognition) training model to identify protein drop crystals from the [Marco Database](https://marco.ccr.buffalo.edu/). + +Run `python3 src/python/classifyImagesTrainer.py` to train the image classifier and write to a Postgres Database. + +### Distributed Image Classification + +Data is ingested with Spark from S3 buckets and batch processedon a distributed tensorflow cluster using executors running their own tensorflow instances. + +Run `./main.sh --classify-images simple` to use the simple test classifier. Results are expected to output to a Postgres database. + +## Web App + +Crystal-base has a web interface that runs its own instance of the trained tensorflow model. + +![Image of Web App](images/crystal-base-web-app.png) + +Run `./main.sh --run-webs-server` to run this web-server instance. + +### Try it out! + +Upload protein crystal jpeg images at [Crystal-Base](http://www.crystal-base.com) diff --git a/config/database-cluster/master.yml b/config/database-cluster/master.yml new file mode 100644 index 0000000..0ccc9ba --- /dev/null +++ b/config/database-cluster/master.yml @@ -0,0 +1,10 @@ +purchase_type: on_demand +subnet_id: subnet-01bc006215b4bfa36 +num_instances: 1 +key_name: insight-aws2 +security_group_ids: sg-08d81fc64ffc3f309 +instance_type: m4.large +tag_name: crystal-project-database-cluster +vol_size: 100 +role: master +use_eips: true diff --git a/config/database-cluster/pg_hba.conf b/config/database-cluster/pg_hba.conf new file mode 100644 index 0000000..d09ceee --- /dev/null +++ b/config/database-cluster/pg_hba.conf @@ -0,0 +1,100 @@ +# PostgreSQL Client Authentication Configuration File +# =================================================== +# +# Refer to the "Client Authentication" section in the PostgreSQL +# documentation for a complete description of this file. A short +# synopsis follows. +# +# This file controls: which hosts are allowed to connect, how clients +# are authenticated, which PostgreSQL user names they can use, which +# databases they can access. Records take one of these forms: +# +# local DATABASE USER METHOD [OPTIONS] +# host DATABASE USER ADDRESS METHOD [OPTIONS] +# hostssl DATABASE USER ADDRESS METHOD [OPTIONS] +# hostnossl DATABASE USER ADDRESS METHOD [OPTIONS] +# +# (The uppercase items must be replaced by actual values.) +# +# The first field is the connection type: "local" is a Unix-domain +# socket, "host" is either a plain or SSL-encrypted TCP/IP socket, +# "hostssl" is an SSL-encrypted TCP/IP socket, and "hostnossl" is a +# plain TCP/IP socket. +# +# DATABASE can be "all", "sameuser", "samerole", "replication", a +# database name, or a comma-separated list thereof. The "all" +# keyword does not match "replication". Access to replication +# must be enabled in a separate record (see example below). +# +# USER can be "all", a user name, a group name prefixed with "+", or a +# comma-separated list thereof. In both the DATABASE and USER fields +# you can also write a file name prefixed with "@" to include names +# from a separate file. +# +# ADDRESS specifies the set of hosts the record matches. It can be a +# host name, or it is made up of an IP address and a CIDR mask that is +# an integer (between 0 and 32 (IPv4) or 128 (IPv6) inclusive) that +# specifies the number of significant bits in the mask. A host name +# that starts with a dot (.) matches a suffix of the actual host name. +# Alternatively, you can write an IP address and netmask in separate +# columns to specify the set of hosts. Instead of a CIDR-address, you +# can write "samehost" to match any of the server's own IP addresses, +# or "samenet" to match any address in any subnet that the server is +# directly connected to. +# +# METHOD can be "trust", "reject", "md5", "password", "gss", "sspi", +# "ident", "peer", "pam", "ldap", "radius" or "cert". Note that +# "password" sends passwords in clear text; "md5" is preferred since +# it sends encrypted passwords. +# +# OPTIONS are a set of options for the authentication in the format +# NAME=VALUE. The available options depend on the different +# authentication methods -- refer to the "Client Authentication" +# section in the documentation for a list of which options are +# available for which authentication methods. +# +# Database and user names containing spaces, commas, quotes and other +# special characters must be quoted. Quoting one of the keywords +# "all", "sameuser", "samerole" or "replication" makes the name lose +# its special character, and just match a database or username with +# that name. +# +# This file is read on server startup and when the postmaster receives +# a SIGHUP signal. If you edit the file on a running system, you have +# to SIGHUP the postmaster for the changes to take effect. You can +# use "pg_ctl reload" to do that. + +# Put your actual configuration here +# ---------------------------------- +# +# If you want to allow non-local connections, you need to add more +# "host" records. In that case you will also need to make PostgreSQL +# listen on a non-local interface via the listen_addresses +# configuration parameter, or via the -i or -h command line switches. + + + + +# DO NOT DISABLE! +# If you change this first entry you will need to make sure that the +# database superuser can access the database using some other method. +# Noninteractive access to all databases is required during automatic +# maintenance (custom daily cronjobs, replication, and similar tasks). +# +# Database administrative login by Unix domain socket +local all postgres peer + +# TYPE DATABASE USER ADDRESS METHOD + +# "local" is for Unix domain socket connections only +local all all peer +# IPv4 local connections: +host all all 127.0.0.1/32 md5 +# IPv6 local connections: +host all all ::1/128 md5 +# Allow replication connections from localhost, by a user with the +# replication privilege. +#local replication postgres peer +#host replication postgres 127.0.0.1/32 md5 +#host replication postgres ::1/128 md5 +host all all 0.0.0.0/0 md5 diff --git a/config/database-cluster/postgresql.conf b/config/database-cluster/postgresql.conf new file mode 100644 index 0000000..16f71e0 --- /dev/null +++ b/config/database-cluster/postgresql.conf @@ -0,0 +1,625 @@ +# ----------------------------- +# PostgreSQL configuration file +# ----------------------------- +# +# This file consists of lines of the form: +# +# name = value +# +# (The "=" is optional.) Whitespace may be used. Comments are introduced with +# "#" anywhere on a line. The complete list of parameter names and allowed +# values can be found in the PostgreSQL documentation. +# +# The commented-out settings shown in this file represent the default values. +# Re-commenting a setting is NOT sufficient to revert it to the default value; +# you need to reload the server. +# +# This file is read on server startup and when the server receives a SIGHUP +# signal. If you edit the file on a running system, you have to SIGHUP the +# server for the changes to take effect, or use "pg_ctl reload". Some +# parameters, which are marked below, require a server shutdown and restart to +# take effect. +# +# Any parameter can also be given as a command-line option to the server, e.g., +# "postgres -c log_connections=on". Some parameters can be changed at run time +# with the "SET" SQL command. +# +# Memory units: kB = kilobytes Time units: ms = milliseconds +# MB = megabytes s = seconds +# GB = gigabytes min = minutes +# TB = terabytes h = hours +# d = days + + +#------------------------------------------------------------------------------ +# FILE LOCATIONS +#------------------------------------------------------------------------------ + +# The default values of these variables are driven from the -D command-line +# option or PGDATA environment variable, represented here as ConfigDir. + +data_directory = '/var/lib/postgresql/9.5/main' # use data in another directory + # (change requires restart) +hba_file = '/etc/postgresql/9.5/main/pg_hba.conf' # host-based authentication file + # (change requires restart) +ident_file = '/etc/postgresql/9.5/main/pg_ident.conf' # ident configuration file + # (change requires restart) + +# If external_pid_file is not explicitly set, no extra PID file is written. +external_pid_file = '/var/run/postgresql/9.5-main.pid' # write an extra PID file + # (change requires restart) + + +#------------------------------------------------------------------------------ +# CONNECTIONS AND AUTHENTICATION +#------------------------------------------------------------------------------ + +# - Connection Settings - + +listen_addresses = '*' # what IP address(es) to listen on; + # comma-separated list of addresses; + # defaults to 'localhost'; use '*' for all + # (change requires restart) +port = 5432 # (change requires restart) +max_connections = 100 # (change requires restart) +#superuser_reserved_connections = 3 # (change requires restart) +unix_socket_directories = '/var/run/postgresql' # comma-separated list of directories + # (change requires restart) +#unix_socket_group = '' # (change requires restart) +#unix_socket_permissions = 0777 # begin with 0 to use octal notation + # (change requires restart) +#bonjour = off # advertise server via Bonjour + # (change requires restart) +#bonjour_name = '' # defaults to the computer name + # (change requires restart) + +# - Security and Authentication - + +#authentication_timeout = 1min # 1s-600s +ssl = true # (change requires restart) +#ssl_ciphers = 'HIGH:MEDIUM:+3DES:!aNULL' # allowed SSL ciphers + # (change requires restart) +#ssl_prefer_server_ciphers = on # (change requires restart) +#ssl_ecdh_curve = 'prime256v1' # (change requires restart) +ssl_cert_file = '/etc/ssl/certs/ssl-cert-snakeoil.pem' # (change requires restart) +ssl_key_file = '/etc/ssl/private/ssl-cert-snakeoil.key' # (change requires restart) +#ssl_ca_file = '' # (change requires restart) +#ssl_crl_file = '' # (change requires restart) +#password_encryption = on +#db_user_namespace = off +#row_security = on + +# GSSAPI using Kerberos +#krb_server_keyfile = '' +#krb_caseins_users = off + +# - TCP Keepalives - +# see "man 7 tcp" for details + +#tcp_keepalives_idle = 0 # TCP_KEEPIDLE, in seconds; + # 0 selects the system default +#tcp_keepalives_interval = 0 # TCP_KEEPINTVL, in seconds; + # 0 selects the system default +#tcp_keepalives_count = 0 # TCP_KEEPCNT; + # 0 selects the system default + + +#------------------------------------------------------------------------------ +# RESOURCE USAGE (except WAL) +#------------------------------------------------------------------------------ + +# - Memory - + +shared_buffers = 128MB # min 128kB + # (change requires restart) +#huge_pages = try # on, off, or try + # (change requires restart) +#temp_buffers = 8MB # min 800kB +#max_prepared_transactions = 0 # zero disables the feature + # (change requires restart) +# Caution: it is not advisable to set max_prepared_transactions nonzero unless +# you actively intend to use prepared transactions. +#work_mem = 4MB # min 64kB +#maintenance_work_mem = 64MB # min 1MB +#autovacuum_work_mem = -1 # min 1MB, or -1 to use maintenance_work_mem +#max_stack_depth = 2MB # min 100kB +dynamic_shared_memory_type = posix # the default is the first option + # supported by the operating system: + # posix + # sysv + # windows + # mmap + # use none to disable dynamic shared memory + # (change requires restart) + +# - Disk - + +#temp_file_limit = -1 # limits per-session temp file space + # in kB, or -1 for no limit + +# - Kernel Resource Usage - + +#max_files_per_process = 1000 # min 25 + # (change requires restart) +#shared_preload_libraries = '' # (change requires restart) + +# - Cost-Based Vacuum Delay - + +#vacuum_cost_delay = 0 # 0-100 milliseconds +#vacuum_cost_page_hit = 1 # 0-10000 credits +#vacuum_cost_page_miss = 10 # 0-10000 credits +#vacuum_cost_page_dirty = 20 # 0-10000 credits +#vacuum_cost_limit = 200 # 1-10000 credits + +# - Background Writer - + +#bgwriter_delay = 200ms # 10-10000ms between rounds +#bgwriter_lru_maxpages = 100 # 0-1000 max buffers written/round +#bgwriter_lru_multiplier = 2.0 # 0-10.0 multipler on buffers scanned/round + +# - Asynchronous Behavior - + +#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching +#max_worker_processes = 8 + + +#------------------------------------------------------------------------------ +# WRITE AHEAD LOG +#------------------------------------------------------------------------------ + +# - Settings - + +#wal_level = minimal # minimal, archive, hot_standby, or logical + # (change requires restart) +#fsync = on # turns forced synchronization on or off +#synchronous_commit = on # synchronization level; + # off, local, remote_write, or on +#wal_sync_method = fsync # the default is the first option + # supported by the operating system: + # open_datasync + # fdatasync (default on Linux) + # fsync + # fsync_writethrough + # open_sync +#full_page_writes = on # recover from partial page writes +#wal_compression = off # enable compression of full-page writes +#wal_log_hints = off # also do full page writes of non-critical updates + # (change requires restart) +#wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers + # (change requires restart) +#wal_writer_delay = 200ms # 1-10000 milliseconds + +#commit_delay = 0 # range 0-100000, in microseconds +#commit_siblings = 5 # range 1-1000 + +# - Checkpoints - + +#checkpoint_timeout = 5min # range 30s-1h +#max_wal_size = 1GB +#min_wal_size = 80MB +#checkpoint_completion_target = 0.5 # checkpoint target duration, 0.0 - 1.0 +#checkpoint_warning = 30s # 0 disables + +# - Archiving - + +#archive_mode = off # enables archiving; off, on, or always + # (change requires restart) +#archive_command = '' # command to use to archive a logfile segment + # placeholders: %p = path of file to archive + # %f = file name only + # e.g. 'test ! -f /mnt/server/archivedir/%f && cp %p /mnt/server/archivedir/%f' +#archive_timeout = 0 # force a logfile segment switch after this + # number of seconds; 0 disables + + +#------------------------------------------------------------------------------ +# REPLICATION +#------------------------------------------------------------------------------ + +# - Sending Server(s) - + +# Set these on the master and on any standby that will send replication data. + +#max_wal_senders = 0 # max number of walsender processes + # (change requires restart) +#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables +#wal_sender_timeout = 60s # in milliseconds; 0 disables + +#max_replication_slots = 0 # max number of replication slots + # (change requires restart) +#track_commit_timestamp = off # collect timestamp of transaction commit + # (change requires restart) + +# - Master Server - + +# These settings are ignored on a standby server. + +#synchronous_standby_names = '' # standby servers that provide sync rep + # comma-separated list of application_name + # from standby(s); '*' = all +#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed + +# - Standby Servers - + +# These settings are ignored on a master server. + +#hot_standby = off # "on" allows queries during recovery + # (change requires restart) +#max_standby_archive_delay = 30s # max delay before canceling queries + # when reading WAL from archive; + # -1 allows indefinite delay +#max_standby_streaming_delay = 30s # max delay before canceling queries + # when reading streaming WAL; + # -1 allows indefinite delay +#wal_receiver_status_interval = 10s # send replies at least this often + # 0 disables +#hot_standby_feedback = off # send info from standby to prevent + # query conflicts +#wal_receiver_timeout = 60s # time that receiver waits for + # communication from master + # in milliseconds; 0 disables +#wal_retrieve_retry_interval = 5s # time to wait before retrying to + # retrieve WAL after a failed attempt + + +#------------------------------------------------------------------------------ +# QUERY TUNING +#------------------------------------------------------------------------------ + +# - Planner Method Configuration - + +#enable_bitmapscan = on +#enable_hashagg = on +#enable_hashjoin = on +#enable_indexscan = on +#enable_indexonlyscan = on +#enable_material = on +#enable_mergejoin = on +#enable_nestloop = on +#enable_seqscan = on +#enable_sort = on +#enable_tidscan = on + +# - Planner Cost Constants - + +#seq_page_cost = 1.0 # measured on an arbitrary scale +#random_page_cost = 4.0 # same scale as above +#cpu_tuple_cost = 0.01 # same scale as above +#cpu_index_tuple_cost = 0.005 # same scale as above +#cpu_operator_cost = 0.0025 # same scale as above +#effective_cache_size = 4GB + +# - Genetic Query Optimizer - + +#geqo = on +#geqo_threshold = 12 +#geqo_effort = 5 # range 1-10 +#geqo_pool_size = 0 # selects default based on effort +#geqo_generations = 0 # selects default based on effort +#geqo_selection_bias = 2.0 # range 1.5-2.0 +#geqo_seed = 0.0 # range 0.0-1.0 + +# - Other Planner Options - + +#default_statistics_target = 100 # range 1-10000 +#constraint_exclusion = partition # on, off, or partition +#cursor_tuple_fraction = 0.1 # range 0.0-1.0 +#from_collapse_limit = 8 +#join_collapse_limit = 8 # 1 disables collapsing of explicit + # JOIN clauses + + +#------------------------------------------------------------------------------ +# ERROR REPORTING AND LOGGING +#------------------------------------------------------------------------------ + +# - Where to Log - + +#log_destination = 'stderr' # Valid values are combinations of + # stderr, csvlog, syslog, and eventlog, + # depending on platform. csvlog + # requires logging_collector to be on. + +# This is used when logging to stderr: +#logging_collector = off # Enable capturing of stderr and csvlog + # into log files. Required to be on for + # csvlogs. + # (change requires restart) + +# These are only used if logging_collector is on: +#log_directory = 'pg_log' # directory where log files are written, + # can be absolute or relative to PGDATA +#log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log' # log file name pattern, + # can include strftime() escapes +#log_file_mode = 0600 # creation mode for log files, + # begin with 0 to use octal notation +#log_truncate_on_rotation = off # If on, an existing log file with the + # same name as the new log file will be + # truncated rather than appended to. + # But such truncation only occurs on + # time-driven rotation, not on restarts + # or size-driven rotation. Default is + # off, meaning append to existing files + # in all cases. +#log_rotation_age = 1d # Automatic rotation of logfiles will + # happen after that time. 0 disables. +#log_rotation_size = 10MB # Automatic rotation of logfiles will + # happen after that much log output. + # 0 disables. + +# These are relevant when logging to syslog: +#syslog_facility = 'LOCAL0' +#syslog_ident = 'postgres' + +# This is only relevant when logging to eventlog (win32): +# (change requires restart) +#event_source = 'PostgreSQL' + +# - When to Log - + +#client_min_messages = notice # values in order of decreasing detail: + # debug5 + # debug4 + # debug3 + # debug2 + # debug1 + # log + # notice + # warning + # error + +#log_min_messages = warning # values in order of decreasing detail: + # debug5 + # debug4 + # debug3 + # debug2 + # debug1 + # info + # notice + # warning + # error + # log + # fatal + # panic + +#log_min_error_statement = error # values in order of decreasing detail: + # debug5 + # debug4 + # debug3 + # debug2 + # debug1 + # info + # notice + # warning + # error + # log + # fatal + # panic (effectively off) + +#log_min_duration_statement = -1 # -1 is disabled, 0 logs all statements + # and their durations, > 0 logs only + # statements running at least this number + # of milliseconds + + +# - What to Log - + +#debug_print_parse = off +#debug_print_rewritten = off +#debug_print_plan = off +#debug_pretty_print = on +#log_checkpoints = off +#log_connections = off +#log_disconnections = off +#log_duration = off +#log_error_verbosity = default # terse, default, or verbose messages +#log_hostname = off +log_line_prefix = '%t [%p-%l] %q%u@%d ' # special values: + # %a = application name + # %u = user name + # %d = database name + # %r = remote host and port + # %h = remote host + # %p = process ID + # %t = timestamp without milliseconds + # %m = timestamp with milliseconds + # %i = command tag + # %e = SQL state + # %c = session ID + # %l = session line number + # %s = session start timestamp + # %v = virtual transaction ID + # %x = transaction ID (0 if none) + # %q = stop here in non-session + # processes + # %% = '%' + # e.g. '<%u%%%d> ' +#log_lock_waits = off # log lock waits >= deadlock_timeout +#log_statement = 'none' # none, ddl, mod, all +#log_replication_commands = off +#log_temp_files = -1 # log temporary files equal or larger + # than the specified size in kilobytes; + # -1 disables, 0 logs all temp files +log_timezone = 'UTC' + + +# - Process Title - + +#cluster_name = '' # added to process titles if nonempty + # (change requires restart) +#update_process_title = on + + +#------------------------------------------------------------------------------ +# RUNTIME STATISTICS +#------------------------------------------------------------------------------ + +# - Query/Index Statistics Collector - + +#track_activities = on +#track_counts = on +#track_io_timing = off +#track_functions = none # none, pl, all +#track_activity_query_size = 1024 # (change requires restart) +stats_temp_directory = '/var/run/postgresql/9.5-main.pg_stat_tmp' + + +# - Statistics Monitoring - + +#log_parser_stats = off +#log_planner_stats = off +#log_executor_stats = off +#log_statement_stats = off + + +#------------------------------------------------------------------------------ +# AUTOVACUUM PARAMETERS +#------------------------------------------------------------------------------ + +#autovacuum = on # Enable autovacuum subprocess? 'on' + # requires track_counts to also be on. +#log_autovacuum_min_duration = -1 # -1 disables, 0 logs all actions and + # their durations, > 0 logs only + # actions running at least this number + # of milliseconds. +#autovacuum_max_workers = 3 # max number of autovacuum subprocesses + # (change requires restart) +#autovacuum_naptime = 1min # time between autovacuum runs +#autovacuum_vacuum_threshold = 50 # min number of row updates before + # vacuum +#autovacuum_analyze_threshold = 50 # min number of row updates before + # analyze +#autovacuum_vacuum_scale_factor = 0.2 # fraction of table size before vacuum +#autovacuum_analyze_scale_factor = 0.1 # fraction of table size before analyze +#autovacuum_freeze_max_age = 200000000 # maximum XID age before forced vacuum + # (change requires restart) +#autovacuum_multixact_freeze_max_age = 400000000 # maximum multixact age + # before forced vacuum + # (change requires restart) +#autovacuum_vacuum_cost_delay = 20ms # default vacuum cost delay for + # autovacuum, in milliseconds; + # -1 means use vacuum_cost_delay +#autovacuum_vacuum_cost_limit = -1 # default vacuum cost limit for + # autovacuum, -1 means use + # vacuum_cost_limit + + +#------------------------------------------------------------------------------ +# CLIENT CONNECTION DEFAULTS +#------------------------------------------------------------------------------ + +# - Statement Behavior - + +#search_path = '"$user", public' # schema names +#default_tablespace = '' # a tablespace name, '' uses the default +#temp_tablespaces = '' # a list of tablespace names, '' uses + # only default tablespace +#check_function_bodies = on +#default_transaction_isolation = 'read committed' +#default_transaction_read_only = off +#default_transaction_deferrable = off +#session_replication_role = 'origin' +#statement_timeout = 0 # in milliseconds, 0 is disabled +#lock_timeout = 0 # in milliseconds, 0 is disabled +#vacuum_freeze_min_age = 50000000 +#vacuum_freeze_table_age = 150000000 +#vacuum_multixact_freeze_min_age = 5000000 +#vacuum_multixact_freeze_table_age = 150000000 +#bytea_output = 'hex' # hex, escape +#xmlbinary = 'base64' +#xmloption = 'content' +#gin_fuzzy_search_limit = 0 +#gin_pending_list_limit = 4MB + +# - Locale and Formatting - + +datestyle = 'iso, mdy' +#intervalstyle = 'postgres' +timezone = 'UTC' +#timezone_abbreviations = 'Default' # Select the set of available time zone + # abbreviations. Currently, there are + # Default + # Australia (historical usage) + # India + # You can create your own file in + # share/timezonesets/. +#extra_float_digits = 0 # min -15, max 3 +#client_encoding = sql_ascii # actually, defaults to database + # encoding + +# These settings are initialized by initdb, but they can be changed. +lc_messages = 'en_US.UTF-8' # locale for system error message + # strings +lc_monetary = 'en_US.UTF-8' # locale for monetary formatting +lc_numeric = 'en_US.UTF-8' # locale for number formatting +lc_time = 'en_US.UTF-8' # locale for time formatting + +# default configuration for text search +default_text_search_config = 'pg_catalog.english' + +# - Other Defaults - + +#dynamic_library_path = '$libdir' +#local_preload_libraries = '' +#session_preload_libraries = '' + + +#------------------------------------------------------------------------------ +# LOCK MANAGEMENT +#------------------------------------------------------------------------------ + +#deadlock_timeout = 1s +#max_locks_per_transaction = 64 # min 10 + # (change requires restart) +#max_pred_locks_per_transaction = 64 # min 10 + # (change requires restart) + + +#------------------------------------------------------------------------------ +# VERSION/PLATFORM COMPATIBILITY +#------------------------------------------------------------------------------ + +# - Previous PostgreSQL Versions - + +#array_nulls = on +#backslash_quote = safe_encoding # on, off, or safe_encoding +#default_with_oids = off +#escape_string_warning = on +#lo_compat_privileges = off +#operator_precedence_warning = off +#quote_all_identifiers = off +#sql_inheritance = on +#standard_conforming_strings = on +#synchronize_seqscans = on + +# - Other Platforms and Clients - + +#transform_null_equals = off + + +#------------------------------------------------------------------------------ +# ERROR HANDLING +#------------------------------------------------------------------------------ + +#exit_on_error = off # terminate session on any error? +#restart_after_crash = on # reinitialize after backend crash? + + +#------------------------------------------------------------------------------ +# CONFIG FILE INCLUDES +#------------------------------------------------------------------------------ + +# These options allow settings to be loaded from files other than the +# default postgresql.conf. + +#include_dir = 'conf.d' # include files ending in '.conf' from + # directory 'conf.d' +#include_if_exists = 'exists.conf' # include file only if it exists +#include = 'special.conf' # include file + + +#------------------------------------------------------------------------------ +# CUSTOMIZED OPTIONS +#------------------------------------------------------------------------------ + +# Add settings for extensions here diff --git a/config/database-cluster/workers.yml b/config/database-cluster/workers.yml new file mode 100644 index 0000000..0579f1a --- /dev/null +++ b/config/database-cluster/workers.yml @@ -0,0 +1,10 @@ +purchase_type: on_demand +subnet_id: subnet-01bc006215b4bfa36 +num_instances: 3 +key_name: insight-aws2 +security_group_ids: sg-08d81fc64ffc3f309 +instance_type: m4.large +tag_name: crystal-project-database-cluster +vol_size: 100 +role: worker +use_eips: true diff --git a/master.yml b/config/spark-cluster/master.yml similarity index 73% rename from master.yml rename to config/spark-cluster/master.yml index c5d9e12..0f30452 100644 --- a/master.yml +++ b/config/spark-cluster/master.yml @@ -3,8 +3,8 @@ subnet_id: subnet-01bc006215b4bfa36 num_instances: 1 key_name: insight-aws2 security_group_ids: sg-04950952b61aa079b -instance_type: m4.large -tag_name: crystal-project +instance_type: m4.4xlarge +tag_name: crystal-project-spark-cluster vol_size: 100 role: master use_eips: true diff --git a/workers.yml b/config/spark-cluster/workers.yml similarity index 73% rename from workers.yml rename to config/spark-cluster/workers.yml index f43c25c..1e28dd0 100644 --- a/workers.yml +++ b/config/spark-cluster/workers.yml @@ -3,8 +3,8 @@ subnet_id: subnet-01bc006215b4bfa36 num_instances: 3 key_name: insight-aws2 security_group_ids: sg-04950952b61aa079b -instance_type: m4.large -tag_name: crystal-project +instance_type: m4.4xlarge +tag_name: crystal-project-spark-cluster vol_size: 100 role: worker use_eips: true diff --git a/config/web-server/master.yml b/config/web-server/master.yml new file mode 100644 index 0000000..3dba087 --- /dev/null +++ b/config/web-server/master.yml @@ -0,0 +1,10 @@ +purchase_type: on_demand +subnet_id: subnet-01bc006215b4bfa36 +num_instances: 1 +key_name: insight-aws2 +security_group_ids: sg-0a0422eec1ff30887 +instance_type: m4.4xlarge +tag_name: crystal-project-web-server +vol_size: 100 +role: master +use_eips: true diff --git a/images/Crystal-Screen.png b/images/Crystal-Screen.png new file mode 100644 index 0000000..6f4522e Binary files /dev/null and b/images/Crystal-Screen.png differ diff --git a/images/Pipeline.png b/images/Pipeline.png new file mode 100644 index 0000000..7cd5c26 Binary files /dev/null and b/images/Pipeline.png differ diff --git a/images/crystal-base-web-app.png b/images/crystal-base-web-app.png new file mode 100644 index 0000000..50abe87 Binary files /dev/null and b/images/crystal-base-web-app.png differ diff --git a/main.sh b/main.sh new file mode 100755 index 0000000..2216619 --- /dev/null +++ b/main.sh @@ -0,0 +1,165 @@ +#!/bin/bash + +#################### THIS IS WHERE THE CODE RUNS ############################# +## setup ssh-agent if needed +if ! [ $SSH_AGENT_PID ]; then + if ! [ $SSH_AUTH_SOCK ]; then + eval $(ssh-agent -s) + fi +else + echo "ssh-agent not found, launching ssh-agent..." + eval $(ssh-agent -s) +fi + +## need to source bash environment +echo "sourcing config/bash/env.sh..." +if [ ! -e config/bash/env.sh ]; then + echo "Missing config/bash/env.sh. Please run main.sh --setup-config" +else + source config/bash/env.sh +fi + +## source all bash source files +for fun in src/bash/*.sh; do + source "$fun" +done; + +## main insertion function +function main { + ## setup pegasus + setupPegasus + ## setup environment variables + setupConfig + ## setup the hadoop cluster + launchHadoop + ## setup the spark cluster + launchSpark + ## setup the database cluster + launchDatabase + ## ingest marcos data files + ingestMarcosFiles + ## multiply the marcos images + multiplyImages + ## classify images and write results to database + classifyImages +} +############ AFTER THIS IS LIBRARY FUNCTIONS AND SWITCHES ##################### + +function setupPegasus { + if [ ! -d tools ]; then + mkdir tools + fi + + ## get needed tools + if [ ! -d tools/pegasus ]; then + echo "Couldn't find pegasus. Cloning from git repo..." + git clone https://github.com/bluerider/pegasus.git tools/pegasus + else + echo "Found pegasus" + fi +} + +function setupConfig { + ## generate the needed environment variables + ## setup config/bash/env.sh if it isn't already setup + if [ ! -e config/bash/env.sh ]; then + if [ ! -d config/bash ]; then + mkdir config/bash + fi + echo "Creating bash environment variables..." + read -p "AWS Access Key: " -a AWS_ACCESS_KEY_ID + read -p "AWS Secret Access Key: " -a AWS_SECRET_ACCESS_KEY + read -p "AWS Default Region: " -a AWS_DEFAULT_REGION + read -p "Postgres User: " -a POSTGRES_USER + read -p "Postgres Password: " -a POSTGRES_PASSWORD + printf '%s\n' " + export AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID + export AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY + export AWS_DEFAULT_REGION=$AWS_DEFAULT_REGION + export REM_USER=ubuntu + export PEGASUS_HOME=\$PWD/tools/pegasus + export PATH=$PEGASUS_HOME:$PATH + export POSTGRES_USER=$POSTGRES_USER + export POSTGRES_PASSWORD=$POSTGRES_PASSWORD + " > config/bash/env.sh && + echo "Wrote conifg/bash/env.sh" + else + echo "Found config/bash/env.sh..." + fi +} + +## let's have some switches +case $1 in + --run) + main + ;; + --config) + setupPegasus + setupConfig + launchHadoop + launchSpark + launchDatabase + setupWebServer + ;; + --setup-config) + setupConfig + ;; + --setup-pegasus) + setupPegasus + ;; + --setup-hadoop) + launchHadoop + ;; + --setup-spark) + launchSpark + ;; + --setup-database) + launchDatabase + ;; + --classify-images|--run-pipeline) + case $2 in + marco) + echo "Running Marco classifier..." + classifyImages marco + ;; + Inceptionv3) + echo "Not fully supported yet!" + #classifyImages inception + ;; + simple|*) + echo "Running simple classifier..." + classifyImages simple + ;; + esac + ;; + --multiply-images) + multiplyImages + ;; + --setup-web-server) + setupWebServer + ;; + --run-web-server) + runDashServer + ;; + --help|*) + cat < Classify images from S3; args : marco, Inceptionv3, simple + --multiply-images Transform and multiply images from S3 and save back to bucket + --run-pipeline Run the crystal-base pipeline + --help Print this help function + +EOF +esac \ No newline at end of file diff --git a/src/README.md b/src/README.md new file mode 100644 index 0000000..02acdd5 --- /dev/null +++ b/src/README.md @@ -0,0 +1 @@ +This folder contains the functions needed to run the crystal-base pipeline. I highly suggest interacting with crystal-base using `main.sh ` instead of calling these functions directly. \ No newline at end of file diff --git a/src/bash/README.md b/src/bash/README.md new file mode 100644 index 0000000..d9b100e --- /dev/null +++ b/src/bash/README.md @@ -0,0 +1 @@ +These are functions that are meant to be called by `src/main.sh`. Functions are designed to generally setup and submit pyspark functions found in `src/python/` via ssh. All bash functions source `config/bash/env.sh` for AWS, postgres setup. In addition, each bash function passes sensitive AWS and postgres keys over ssh ensuring there is no permanent key storage on cloud instances. \ No newline at end of file diff --git a/src/bash/classifyImages.sh b/src/bash/classifyImages.sh new file mode 100644 index 0000000..03677e4 --- /dev/null +++ b/src/bash/classifyImages.sh @@ -0,0 +1,23 @@ +## let's try creating a spark context for the S3 files +function classifyImages { + ## get the needed config + source config/bash/env.sh + ## get the needed hostnames + spark_hosts=($(cat ${PEGASUS_HOME}/tmp/crystal-project-spark-cluster/public_dns)) + postgresql_hosts=($(cat ${PEGASUS_HOME}/tmp/crystal-project-database-cluster/public_dns)) + ## copy the pyspark python script + echo "Copying python scripts to spark master..." + for a in src/python/classifyImages{,{Marco,Simple}Partition}.py; do + scp "$a" ubuntu@${spark_hosts[0]}: + done + ## launch the command + echo "Running spark classification job..." + ssh ubuntu@${spark_hosts[0]} " + export PYSPARK_PYTHON=python3 + export LD_LIBRARY_PATH+=:/usr/local/hadoop/lib/native + export SPARK_HOME=/usr/local/spark + export HADOOP_HOME=/usr/local/hadoop + wget -N https://storage.googleapis.com/marco-168219-model/savedmodel.zip + spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.1,org.postgresql:postgresql:42.1.4 --master spark://${spark_hosts[0]}:7077 --driver-memory 63G --num-executors 6 --executor-cores 15 --executor-memory 51GB --py-files classifyImages.py,classifyImagesMarcoPartition.py,classifyImagesSimplePartition.py --files savedmodel.zip classifyImages.py $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY $AWS_DEFAULT_REGION ${postgresql_hosts[0]} $POSTGRES_USER $POSTGRES_PASSWORD $1 + " +} \ No newline at end of file diff --git a/src/bash/ingestMarcosFiles.sh b/src/bash/ingestMarcosFiles.sh new file mode 100644 index 0000000..54453e8 --- /dev/null +++ b/src/bash/ingestMarcosFiles.sh @@ -0,0 +1,38 @@ +## download database-instances +## need to download all files in MARCO database +function ingestMarcosFiles { + ## source the configuration file + source config/bash/env.sh + + ## let's temporarily use ssh command instead of pegasus + hadoop_master=$(cat ${PEGASUS_HOME}/tmp/crystal-project-spark-cluster/public_dns | head -1) + echo "Ingesting files using an ec2 instance..." + ## we should do this remotely + ssh ubuntu@$hadoop_master " + ## got to install awscli since it is not included + dpkg -l awscli || sudo apt-install awscli -y + ## set the needed keys + export AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID + export AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY + export AWS_DEFAULT_REGION=$AWS_DEFAULT_REGION + + ## need to get terf tool to extract images + terf_name=terf-0.0.3-0-gde97e3c-linux-amd64 + wget https://github.com/ubccr/terf/releases/download/v0.0.3/\${terf_name}.zip + unzip \${terf_name}.zip + ## there are issues with not enough storage space + ## we need to switch to a streaming method + for a in train test; do + ## download file and stream to tar extractor and stream to aws S3 + wget -O - "https://marco.ccr.buffalo.edu/data/archive/\$a-jpg-tfrecords.tar" | \ + tar xf - + done + ## extract the data set + mkdir marco-data + for a in train-jpg test-jpg; do + \${terf_name}/terf extract --input "$a" -o "marcos-data/\${a}" + done + ## send the marco data to amazon S3 + aws s3 sync {,s3://}marcos-data + " +} diff --git a/src/bash/launchDatabase.sh b/src/bash/launchDatabase.sh new file mode 100644 index 0000000..a8dbbd0 --- /dev/null +++ b/src/bash/launchDatabase.sh @@ -0,0 +1,53 @@ +## launch database instances +function launchDatabase { + echo "Launching ec2 instances..." + ## requires the use of peg + for a in config/database-cluster/*yml; do + peg up "$a" + done + + ## get the cluster information + echo "Getting cluster information..." + peg fetch crystal-project-database-cluster + + ## install needed pegasus dependencies + echo "Installing postgreSQL..." + for a in ssh aws environment; do + peg install crystal-project-database-cluster $a + done + + ## get the needed hostnames + database_hosts=($(cat ${PEGASUS_HOME}/tmp/crystal-project-database-cluster/public_dns)) + + ## copy the needed files + echo "Copying configuration files to databse..." + scp config/database-cluster/*.conf ubuntu@${database_hosts[0]}: + + ## install postgresql + echo "Setting up and launching postgreSQL..." + ssh ubuntu@${database_hosts[0]} ' + sudo apt-get install postgresql{,-contrib} + for file in pg_hba.conf postgresql.conf; do + sudo chown postgres:postgres "$file" + sudo chmod o-rw "$file" + sudo chmod g-w "$file" + sudo mv "$file" /etc/postgresql/9.5/main/ + done + sudo service postgresql start + sudo -u postgres createdb crystal-base + sudo -u postgres psql < +def getImages(sc, addr): + """ + Return an RDD of images from S3. + """ + crystal_imgs = sc.binaryFiles(addr+"/marcos-data.bak/test-jpg/*/*.jpeg") + return(crystal_imgs) + +if __name__ == '__main__': + """ + Setup Spark session and AWS, postgres access keys. + """ + sc = SparkContext(conf=SparkConf().setAppName("Crystal-Image-Classifier")) + os.environ["AWS_ACCESS_KEY_ID"] = sys.argv[1] + os.environ["AWS_SECRET_ACCESS_KEY"] = sys.argv[2] + os.environ["AWS_DEFAULT_REGION"] = sys.argv[3] + os.environ["POSTGRES_URL"] = sys.argv[4] + os.environ["POSTGRES_USER"] = sys.argv[5] + os.environ["POSTGRES_PASSWORD"] = sys.argv[6] + os.environ["CLASSIFIER_TYPE"] = sys.argv[7] + ## create spark session + spark_session = SparkSession.builder.appName("Crystal-Image-Classifier").getOrCreate() + sc = spark_session.sparkContext + ## run the main insertion function + main(sc) \ No newline at end of file diff --git a/src/python/classifyImagesMarcoPartition.py b/src/python/classifyImagesMarcoPartition.py new file mode 100644 index 0000000..6b9df30 --- /dev/null +++ b/src/python/classifyImagesMarcoPartition.py @@ -0,0 +1,33 @@ +## got to pass in the aws keys by arguments +import os +import tensorflow as tf +from zipfile import ZipFile + +## classify partitions of images to reduce writes +def classifyImagesMarcoPartition(partition): + """ + Classify images using marco predictor. + Returns a list of (url, boolean). + """ + model = ZipFile('savedmodel.zip', 'r')\ + .extractall("") + predictor = tf.contrib.predictor.from_saved_model('savedmodel') + ## grab the byte arrays for the imgs + ## use zip since we are passed a list of + ## tuples : + urls, imgs = zip(*partition) + ## create a dictionary entry for the imgs for + ## tensorflow model + dictionary = {"image_bytes" : imgs} + ## classify images as a batch + prediction = predictor(dictionary) + ## get the predicted state of each image + ## the first reported class is the right one + test_string = bytes("Crystals", 'ascii') + bools = [test_string == array[0] for array in prediction["classes"]] + ## we want to return the values as key value tuples + ## + values = [(url, bool(crystal_bool)) for url, crystal_bool in zip(urls, bools)] + + ## return the values + return(values) \ No newline at end of file diff --git a/src/python/classifyImagesMarcoPartitionOneOff.py b/src/python/classifyImagesMarcoPartitionOneOff.py new file mode 100644 index 0000000..bb2c239 --- /dev/null +++ b/src/python/classifyImagesMarcoPartitionOneOff.py @@ -0,0 +1,45 @@ +import os +import tensorflow as tf +from zipfile import ZipFile + +## bring the model out of the function to speed up loading +## for dash instances +model = ZipFile('savedmodel.zip', 'r')\ + .extractall("") +predictor = tf.contrib.predictor.from_saved_model('savedmodel') + +## define the one off function, uses a preloaded predictor +def classifyImagesMarcoPartitionOneOff(partition): + """ + Classify images using a preloaded model. + Returns (url, boolean). + """ + ## return the values with the global predictor + values = classifyImagesMarcoPartition(partition, predictor) + return(values) + +## classify partitions of images to reduce writes +## pass in the predictor to reuse a predictor +def classifyImagesMarcoPartition(partition, predictor): + """ + Classify images using a preloaded model. + Returns (url, boolean). + """ + ## use zip since we are passed a list of + ## tuples : + urls, imgs = zip(*partition) + ## create a dictionary entry for the imgs for + ## tensorflow model + dictionary = {"image_bytes" : imgs} + ## classify images as a batch + prediction = predictor(dictionary) + ## get the predicted state of each image + ## the first reported class is the right one + test_string = bytes("Crystals", 'ascii') + bools = [test_string == array[0] for array in prediction["classes"]] + ## we want to return the values as key value tuples + ## + values = [(url, bool(crystal_bool)) for url, crystal_bool in zip(urls, bools)] + + ## return the values + return(values) \ No newline at end of file diff --git a/src/python/classifyImagesSimplePartition.py b/src/python/classifyImagesSimplePartition.py new file mode 100644 index 0000000..3c3deab --- /dev/null +++ b/src/python/classifyImagesSimplePartition.py @@ -0,0 +1,18 @@ +## classify partitions of images to reduce writes +def classifyImagesSimplePartition(partition): + """ + Simple image classifier using url parsing. + Returns (urls, boolean). + """ + ## get the urls and imgs byte data + urls, imgs = zip(*partition) + ## determine crystal boolean by url + bools = [lambda url: "Crystal" in url for url in urls] + ## we want to return + ## classify images as a batch + values = [(url, bool(crystal_bool)) for url, crystal_bool in zip(urls, bools)] + + ## return the values + ## we want to return the values as key value tuples + ## + return(values) \ No newline at end of file diff --git a/src/python/classifyImagesTrainer.py b/src/python/classifyImagesTrainer.py new file mode 100644 index 0000000..5ec6e83 --- /dev/null +++ b/src/python/classifyImagesTrainer.py @@ -0,0 +1,153 @@ +import os, sys +from pyspark import SparkContext, SparkConf, SQLContext +from sparkdl import DeepImageFeaturizer +from pyspark.ml.image import ImageSchema +from pyspark.sql.functions import lit +from pyspark.ml import Pipeline +from pyspark.ml.classification import LogisticRegression +from pyspark.ml.evaluation import MulticlassClassificationEvaluator + +## main insertion function +def main(sc): + """ + Main insertion function. + Train an inceptionv3 neuralnetwork to classify crystal images. + Refits the output layer to fit a binary classifier for crystal images. + """ + ## we need to pass in the AWS keys + sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"]) + sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"]) + ## get the training and testing dataframes + train_df, test_df = genDataFrames("s3a://marcos-data/") + ## setup the transfer learner + p = transferLearner(20, 0.05, 0.3) + ## run the model + p_model = runModel(train_df, p) + ## get the predictions data frame + predictions_df = predictWithModel(test_df, p_model) + ## get the accuracy of the predictions + accuracy = validate(predictions_df) + print("Model was accurate to: "+str(accuracy)) + ## write the results to database + writeToPostgreSQL(predictions_df) + +## let's work on getting this classifier up +## returns a training and test dataframe +def genDataFrames(url): + """ + Return a train dataframe and a test dataframe for training. + """ + ## training set + ## let's get and combine the negative image data set + clear_train_imgs = ImageSchema.readImages(url+"train-jpg/Clear/*.jpeg").withColumn("label", lit(0)) + precipitate_train_imgs = ImageSchema.readImages(url+"train-jpg/Precipitate/*.jpeg").withColumn("label", lit(0)) + other_train_imgs = ImageSchema.readImages(url+"train-jpg/Other/*.jpeg").withColumn("label", lit(0)) + negative_train_imgs = clear_train_imgs.unionAll(precipitate_train_imgs).unionAll(other_train_imgs) + + ## let's get crystal data set + positive_train_imgs = ImageSchema.readImages(url+"train-jpg/Crystals/*.jpeg").withColumn("label", lit(1)) + + ## testing set + ## let's get and combine the negative image data set + clear_test_imgs = ImageSchema.readImages(url+"test-jpg/Clear/*.jpeg").withColumn("label", lit(0)) + precipitate_test_imgs = ImageSchema.readImages(url+"test-jpg/Precipitate/*.jpeg").withColumn("label", lit(0)) + other_test_imgs = ImageSchema.readImages(url+"test-jpg/Other/*.jpeg").withColumn("label", lit(0)) + negative_test_imgs = clear_test_imgs.unionAll(precipitate_test_imgs).unionAll(other_test_imgs) + + ## let's get crystal data set + positive_test_imgs = ImageSchema.readImages(url+"test-jpg/Crystals/*.jpeg").withColumn("label", lit(1)) + + ## let's combine them to create the training df + train_df = positive_train_imgs.unionAll(negative_train_imgs) + test_df = positive_test_imgs.unionAll(negative_test_imgs) + + ## return the dataframes + return(train_df, test_df) + +## let's setup the trainer +## returns a pipeline +def transferLearner(max_iter, reg_param, elastic_net_param): + """ + Setup the inceptionv3 transfer learner pipeline. + """ + ## let's setup some parameters + featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3") + lr = LogisticRegression(maxIter=max_iter, + regParam=reg_param, + elasticNetParam=elastic_net_param, + labelCol="label") + p = Pipeline(stages=[featurizer, lr]) + + ## return the pipeline + return(p) + +## run the model +## return a model +def runModel(train_df, pipeline): + """ + Run the model with a training dataframe and a pipeline. + """ + ## run the model + p_model = pipeline.fit(train_df) + + ## return the model + return(p_model) + +## get some predictions +## returns a dataframe +def predictWithModel(test_df, p_model): + """ + Predict classification results with test dataframe + and trained model. + """ + predictions = p_model.transform(test_df) + df = p_model.transform(test_df) + + ## return the dataframe + return(df) + +## validate +## returns the accuracy +def validate(df): + """ + Validate the model with a predicted dataframe. + """ + predictionAndLabels = df.select("prediction", "label") + evaluator = MulticlassClassificationEvaluator(metricName="accuracy") + + ## return the accuracy + return(evaluator.evaluate(predictionAndLabels)) + +## write to postgres database +## THIS FUNCTION SHOULD BE SPLIT INTO ITS OWN FILE +## WHEN THE MODEL PARAMETERS CAN BE STORED ON DISK +def writeToPostgreSQL(sql): + """ + Write results to postgres. + """ + ## let's just write grab the crystal info here + postgres_df = sql.select("image_id", "label_text").selectExpr("image_id as id", "label_text as crystal") + postgres_df = postgres_df.withColumn('crystal_bool', when(postgres_df.crystal == "Crystals", True).otherwise(False)).drop(postgres_df.crystal).select(col("crystal_bool").alias("crystal"),col("id")) + postgres_df = postgres_df.select("id", "crystal") + postgres_df = postgres_df.withColumn("id", postgres_df.id.cast('integer')) + ## let's add the dataframe to Postgresql + postgres_df.write.jdbc(url = "jdbc:postgresql://"+os.environ["POSTGRES_URL"]+":5432/crystal-base", + table = "marcos", + mode = "append", + properties={"driver": 'org.postgresql.Driver', + "user": os.environ["POSTGRES_USER"], + "password": os.environ["POSTGRES_PASSWORD"]}) + +if __name__ == '__main__': + """ + Setup the spark instance and AWS, postgres keys. + """ + sc = SparkContext(conf=SparkConf().setAppName("Crystal-Image-Classifier")) + os.environ["AWS_ACCESS_KEY_ID"]=sys.argv[1] + os.environ["AWS_SECRET_ACCESS_KEY"]=sys.argv[2] + os.environ["AWS_DEFAULT_REGION"]=sys.argv[3] + os.environ["POSTGRES_URL"]=sys.argv[4] + os.environ["POSTGRES_USER"]=sys.argv[5] + os.environ["POSTGRES_PASSWORD"]=sys.argv[6] + ## run the main insertion function + main(sc) \ No newline at end of file diff --git a/src/python/dash-bootstrap/README.md b/src/python/dash-bootstrap/README.md new file mode 100644 index 0000000..9b44b1c --- /dev/null +++ b/src/python/dash-bootstrap/README.md @@ -0,0 +1,3 @@ +This folder contains web app for the crystal-base frontend. The python function is designed to be called by its corresponding bash function in `src/bash/`. + +![Image of Web App](/images/crystal-base-web-app.png) diff --git a/src/python/dash-bootstrap/runDashServer.py b/src/python/dash-bootstrap/runDashServer.py new file mode 100644 index 0000000..d36fcd4 --- /dev/null +++ b/src/python/dash-bootstrap/runDashServer.py @@ -0,0 +1,129 @@ +import os, datetime, base64, sys +import dash +import dash_core_components as dcc +import dash_bootstrap_components as dbc +from dash.dependencies import Input, Output, State +import dash_html_components as html +import dash_table_experiments as dash_table +from sqlalchemy import create_engine +from sqlalchemy_utils import database_exists, create_database +import pandas as pd +from classifyImagesMarcoPartitionOneOff import classifyImagesMarcoPartitionOneOff + +## start the ap +app = dash.Dash(__name__, + static_folder = 'static', + external_stylesheets=[dbc.themes.BOOTSTRAP]) +app.title = "Crystal-Base" +app.scripts.config.serve_locally = True + +## setup the postgres connector +host = sys.argv[1] +user = sys.argv[2] +password = sys.argv[3] +dbname = sys.argv[4] +db = create_engine('postgresql://%s:%s@%s:5432/%s'%(user, password, host, dbname)) + +## setup the navigation bar +navbar = dbc.NavbarSimple( + children = [html.Div([dbc.NavItem(id = 'sql_count'), + dcc.Interval(id = 'interval-component', + interval = 1*1000, + n_intervals = 0)], + style = {'padding-left' : '10px', + 'padding-right' : '10px'}), + html.A([html.Img(src='/static/GitHub-Mark-32px.png')], + href = 'https://github.com/bluerider/crystal-base', + target = "_blank")], + brand = "Crystal-Base", + brand_href = '#', + sticky = "top", + color = "#6699ff") + +## setup the body of the document +body = dbc.Container( + [html.Div([dcc.Upload(id = 'upload-image', + children=html.Div(['Upload images']), + style = {'lineHeight': '60px', + 'borderStyle': 'dashed', + 'borderRadius': '5px', + 'textAlign': 'center'}, + ## allow uploading multiple files + multiple = True)]), + html.Div(id = 'output-image-upload') + ]) + +## set the app layout +app.layout = html.Div([navbar, body]) + +## parse the contents of passed images and +## return a html divider +def parse_contents(contents, values, date, ): + """ + Return a HTML divider for prediction results + and the uploaded image. + """ + name, crystal_bool = values + return html.Div([ + html.Hr(), + html.H5(name+"; Prediction: "+str(crystal_bool)), + # HTML images accept base64 encoded strings in the same format + # that is supplied by the upload + html.Img(src=contents), + html.Hr() + ]) + +## call back for upload images +@app.callback(Output('output-image-upload', 'children'), + [Input('upload-image', 'contents')], + [State('upload-image', 'filename'), + State('upload-image', 'last_modified')]) +def update_output(list_of_contents, list_of_names, list_of_dates): + """ + Classify uploaded images. + Returns classification of images as well as the image itself + for display on web app. + """ + if list_of_contents is not None: + ## get the raw image byte date + imgs = [base64.b64decode(img[23:]) for img in list_of_contents] + ## classify the images + values = classifyImagesMarcoPartitionOneOff(zip(list_of_names, imgs)) + ## we got to resize the pictures to form a grid that will fit + resized_imgs = list_of_contents + children = [ + parse_contents(c, n, d) for c, n, d in + zip(resized_imgs, values, list_of_dates)] + return children + +## update text +@app.callback(Output('sql_count', 'children'), + [Input('interval-component', 'n_intervals')]) +def update_metrics(n): + """ + Connect to postgres and get the current number of classified images. + Return the count for negative and positive results for display + in the web app. + """ + with db.connect() as con: + sql_query = """ + SELECT count(*) FROM marcos where crystal is true; + """ + positive_count = str(con.execute(sql_query).fetchone()[0]) + sql_query = """ + SELECT count(*) FROM marcos where crystal is false; + """ + negative_count = str(con.execute(sql_query).fetchone()[0]) + return html.Div([dbc.Button("Crystals: "+positive_count, + color = "success"), + dbc.Button("Junk: "+negative_count, + color = "secondary")]) + +if __name__ == '__main__': + """ + Run the app. + """ + ## run the app as a server + ## use 5001 to avoid screwing + ## with the currently running server + app.run_server(port=5000, host = '0.0.0.0') \ No newline at end of file diff --git a/src/python/dash-bootstrap/static/GitHub-Mark-32px.png b/src/python/dash-bootstrap/static/GitHub-Mark-32px.png new file mode 100644 index 0000000..8b25551 Binary files /dev/null and b/src/python/dash-bootstrap/static/GitHub-Mark-32px.png differ diff --git a/src/python/dash-bootstrap/static/sample.jpg b/src/python/dash-bootstrap/static/sample.jpg new file mode 100644 index 0000000..4cf7b8d Binary files /dev/null and b/src/python/dash-bootstrap/static/sample.jpg differ diff --git a/src/python/multiplyImages.py b/src/python/multiplyImages.py new file mode 100644 index 0000000..6172a88 --- /dev/null +++ b/src/python/multiplyImages.py @@ -0,0 +1,113 @@ +## got to pass in the aws keys by arguments +import sys, os, io +## context is labeled sc +from pyspark import SparkContext, SparkConf +## need this to do image classification +from pyspark.ml.image import ImageSchema +## use pillow +from PIL import Image +## use boto for AWS S3 file access +import boto3, s3fs + +## let's pass in the sc (if running within pyspark) +def main(sc): + """ + Main insertion function. + Multiply images from S3 using rotation and mirror operations. + Upload the images to S3. + """ + ## we need to pass in the AWS keys + sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"]) + sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"]) + # use S3 streaming + addr = "s3a:/" + ## let's get the rdd for images + crystal_imgs = getImages(sc, addr) + ## create a broadcast variable containing AWS keys + aws_info = sc.broadcast((os.environ["AWS_ACCESS_KEY_ID"], + os.environ["AWS_SECRET_ACCESS_KEY"])) + ## let's generate some images and upload them to S3 + crystal_imgs.foreach(lambda row: genImagesToS3(row, aws_info)) + +## get the RDDs from images +## we have to switch to an RDD instead +## return an RDD +def getImages(sc, addr): + """ + Return RDDs for crystal images from S3. + """ + crystal_imgs= ImageSchema.readImages(addr+"/marcos-data/*/*/*.jpeg", numPartitions = 2000) + return(crystal_imgs) + +## generate variations of an image +def genImagesToS3(row, aws_info): + """ + Generate variations of an image from a dataframe row. + Upload images to S3. + """ + urls, imgs = transformImages(row) + aws_access_key, aws_secret_key = aws_info.value + s3 = s3fs.S3FileSystem(key = aws_access_key, + secret = aws_secret_key, + use_ssl = True, + anon = False) + addToS3(urls, imgs, s3) + +## generate images +## we will need to do several transform operations +## mirror -> rotate (90 degrees) +def transformImages(row): + """ + Transform images from a dataframe by rotating and mirroring. + Returns (url, jpeg). + """ + ## name is stored in url + url_name = row.image.origin.split('.')[:-1][0] + ## get the byte array + byte_array = row.image.data + ## mirror the image + img = Image.frombytes("RGB", + (row.image.width, row.image.height), + bytes(byte_array)) + mirrored_img = img.transpose(Image.FLIP_LEFT_RIGHT) + ## create an array of rotated images + imgs = [a.rotate(deg) for deg in range(0, 360, 90) for a in [img, mirrored_img]] + ## let's give the images a new id + urls = [url_name+"-"+str(num)+".jpeg" for num in range(len(imgs))] + return(urls, imgs) + +## add images to S3 +def addToS3(urls, imgs, s3): + """ + Upload images to S3 using url scheme. + """ + ## strip urls to become buckets + buckets = [url.split("s3a://")[1] for url in urls] + ## loop for buckets and imgs + for pair in zip(buckets,imgs): + bucket = pair[0] + img = pair[1] + ## get a streaming byte array + byte_array = io.BytesIO() + ## save the image with compression + ## to byte array + img.save(byte_array, format = "JPEG") + with s3.open(bucket, 'wb') as f: + ## write the img + f.write(byte_array.getvalue()) + ## close the file + f.close() + + + +if __name__ == '__main__': + """ + Setup spark and AWS keys. + """ + ## pass the AWS access keys + os.environ["AWS_ACCESS_KEY_ID"]=sys.argv[1] + os.environ["AWS_SECRET_ACCESS_KEY"]=sys.argv[2] + os.environ["AWS_DEFAULT_REGION"]=sys.argv[3] + sc = SparkContext(conf=SparkConf().setAppName("Crystal-Image-Generator")) + ## run the main insertion function + main(sc) \ No newline at end of file diff --git a/test/data/Asprosin-Fluoride.jpg b/test/data/Asprosin-Fluoride.jpg new file mode 100644 index 0000000..7791c62 Binary files /dev/null and b/test/data/Asprosin-Fluoride.jpg differ diff --git a/test/data/crystal2.jpg b/test/data/crystal2.jpg new file mode 100644 index 0000000..eb4d95f Binary files /dev/null and b/test/data/crystal2.jpg differ