Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 41 additions & 201 deletions src/adapters/postgres/cohortMembers-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { FormSubmissionStatus } from 'src/forms/entities/form-submission.entity'
import { FormSubmissionSearchDto } from 'src/forms/dto/form-submission-search.dto';
import { FormsService } from 'src/forms/forms.service';
import { isElasticsearchEnabled } from 'src/common/utils/elasticsearch.util';
import { UserElasticsearchService } from 'src/elasticsearch/user-elasticsearch.service';
import { ElasticsearchSyncService } from 'src/elasticsearch/elasticsearch-sync.service';
@Injectable()
export class PostgresCohortMembersService {
constructor(
Expand All @@ -55,7 +55,7 @@ export class PostgresCohortMembersService {
private readonly userService: PostgresUserService,
private readonly formsService: FormsService,
private readonly formSubmissionService: FormSubmissionService,
private readonly userElasticsearchService: UserElasticsearchService
private readonly elasticsearchSyncService: ElasticsearchSyncService
) {}

//Get cohort member
Expand Down Expand Up @@ -719,50 +719,12 @@ export class PostgresCohortMembersService {
// Update Elasticsearch with cohort member status
if (isElasticsearchEnabled()) {
try {
// First get the existing user document from Elasticsearch
const userDoc = await this.userElasticsearchService.getUser(
cohortMembers.userId
);
// Extract the application array if present
const source =
userDoc && userDoc._source
? (userDoc._source as { applications?: any[] })
: undefined;
let applications = Array.isArray(source?.applications)
? [...source.applications]
: [];

const appIndex = applications.findIndex(
(app) => app.cohortId === cohortMembers.cohortId
);

if (appIndex !== -1) {
// Update the existing application object for this cohortId
applications[appIndex] = {
...applications[appIndex],
cohortId: cohortMembers.cohortId,
cohortmemberstatus: savedCohortMember.status ?? 'active',
// Optionally merge other fields here if needed
};
} else {
// Add a new application object for this cohortId
applications.push({
cohortId: cohortMembers.cohortId,
cohortmemberstatus: savedCohortMember.status ?? 'active',
// Add other default fields as needed
});
}

// Now update the user document in Elasticsearch with the merged applications array
const baseDoc =
typeof userDoc?._source === 'object' ? userDoc._source : {};
await this.userElasticsearchService.updateUser(
// Use the optimized sync service for application updates
await this.elasticsearchSyncService.updateApplicationData(
cohortMembers.userId,
{ doc: { ...baseDoc, applications } },
async (userId: string) => {
return await this.formSubmissionService.buildUserDocumentForElasticsearch(
userId
);
cohortMembers.cohortId,
{
cohortmemberstatus: savedCohortMember.status ?? 'active',
}
);
} catch (elasticError) {
Expand Down Expand Up @@ -1122,55 +1084,18 @@ export class PostgresCohortMembersService {
// Update Elasticsearch with updated cohort member status
if (isElasticsearchEnabled()) {
try {
// First get the existing user document from Elasticsearch
const userDoc = await this.userElasticsearchService.getUser(
cohortMembershipToUpdate.userId
);

// Extract the application array if present
const source =
userDoc && userDoc._source
? (userDoc._source as { applications?: any[] })
: undefined;
const existingApplication =
source && Array.isArray(source.applications)
? source.applications.find(
(app) => app.cohortId === cohortMembershipToUpdate.cohortId
)
: undefined;

if (!existingApplication) {
// If application is missing, build and upsert the full user document (with progress pages)
const fullUserDoc =
await this.formSubmissionService.buildUserDocumentForElasticsearch(
cohortMembershipToUpdate.userId
);
if (fullUserDoc) {
await this.userElasticsearchService.updateUser(
cohortMembershipToUpdate.userId,
{ doc: fullUserDoc },
async (userId: string) => {
return await this.formSubmissionService.buildUserDocumentForElasticsearch(
userId
);
}
);
// Use the optimized sync service to update user application in Elasticsearch
// Update user application in Elasticsearch with cohort member status changes
await this.elasticsearchSyncService.syncUserApplication(
cohortMembershipToUpdate.userId,
cohortMembershipToUpdate.cohortId,
{
cohortmemberstatus: result.status ?? 'active',
statusReason: cohortMembersUpdateDto.statusReason,
// Note: 'status' is not part of the application update interface
// Using cohortmemberstatus to track the cohort member status
}
} else {
// SOLUTION 3: Use field-specific update to preserve existing data
// This prevents the deletion of application data (progress, formData, etc.)
// by only updating specific fields instead of replacing the entire application object
await this.updateElasticsearchWithFieldSpecificChanges(
cohortMembershipToUpdate.userId,
cohortMembershipToUpdate.cohortId,
{
cohortmemberstatus: result.status ?? 'active',
statusReason: cohortMembersUpdateDto.statusReason,
status: cohortMembersUpdateDto.status,
},
existingApplication
);
}
);
} catch (elasticError) {
// Log Elasticsearch error but don't fail the request
LoggerUtil.error(
Expand Down Expand Up @@ -1412,123 +1337,38 @@ export class PostgresCohortMembersService {
* @param updateData - Object containing only the fields to update
* @param existingApplication - Existing application data (for reference)
*/
/**
* Update Elasticsearch with field-specific changes for cohort member updates.
* This method uses the optimized ElasticsearchSyncService for better performance and maintainability.
*
* @param userId - User ID to update
* @param cohortId - Cohort ID for the application
* @param updateData - Data to update in the application
* @param existingApplication - Existing application data (optional)
*/
private async updateElasticsearchWithFieldSpecificChanges(
userId: string,
cohortId: string,
updateData: any,
existingApplication: any
): Promise<void> {
try {
// Create a Painless script that only updates specific fields that have changed
// This prevents data loss by preserving all existing fields not being updated
const script = {
source: `
// Initialize applications array if it doesn't exist
if (ctx._source.applications == null) {
ctx._source.applications = [];
}

boolean found = false;
// Search for existing application with matching cohortId
for (int i = 0; i < ctx._source.applications.length; i++) {
if (ctx._source.applications[i].cohortId == params.cohortId) {
// CRITICAL: Only update specific fields that are provided in updateData
// This prevents deletion of existing data like progress, formData, etc.
if (params.updateData.cohortmemberstatus != null) {
ctx._source.applications[i].cohortmemberstatus = params.updateData.cohortmemberstatus;
}
if (params.updateData.statusReason != null) {
ctx._source.applications[i].statusReason = params.updateData.statusReason;
}

if (params.updateData.completionPercentage != null) {
ctx._source.applications[i].completionPercentage = params.updateData.completionPercentage;
}
if (params.updateData.updatedAt != null) {
ctx._source.applications[i].updatedAt = params.updateData.updatedAt;
}

// KEY IMPROVEMENT: Preserve all existing fields that are not being updated
// This ensures we don't lose any existing data like progress, formData, etc.
// The previous method was replacing the entire application object, causing data loss

found = true;
break;
}
}

if (!found) {
// If application doesn't exist, create a new one with minimal data
// This maintains backward compatibility for new applications
Map newApplication = new HashMap();
newApplication.cohortId = params.cohortId;
newApplication.cohortmemberstatus = params.updateData.cohortmemberstatus;
newApplication.statusReason = params.updateData.statusReason;
newApplication.updatedAt = params.updateData.updatedAt;
newApplication.createdAt = params.updateData.updatedAt;

// Initialize empty structures to preserve existing pattern
// This ensures new applications have the expected structure
newApplication.progress = [:];
newApplication.progress.pages = [:];
newApplication.progress.overall = [:];
newApplication.progress.overall.completed = 0;
newApplication.progress.overall.total = 0;
newApplication.formData = [:];

ctx._source.applications.add(newApplication);
}

// Update the document's updatedAt timestamp to reflect the change
ctx._source.updatedAt = params.updateData.updatedAt;
`,
lang: 'painless',
params: {
cohortId,
updateData: {
cohortmemberstatus: updateData.cohortmemberstatus,
statusReason: updateData.statusReason,
completionPercentage: updateData.completionPercentage,
updatedAt: new Date().toISOString(),
},
// Use the optimized sync service for application updates
await this.elasticsearchSyncService.updateApplicationData(
userId,
cohortId,
{
cohortmemberstatus: updateData.cohortmemberstatus,
statusReason: updateData.statusReason,
completionPercentage: updateData.completionPercentage,
},
};

// Check if user document exists by trying to get it
// This is safer than using private methods and provides the same functionality
const userDoc = await this.userElasticsearchService.getUser(userId);

if (userDoc) {
// Update existing document with field-specific changes using the script
// This ensures only specific fields are updated while preserving existing data
await this.userElasticsearchService.updateUser(
userId,
{ script },
async (userId: string) => {
return await this.formSubmissionService.buildUserDocumentForElasticsearch(
userId
);
}
);
} else {
// If user document doesn't exist, create it with the new application
// This maintains backward compatibility for new users
const fullUserDoc =
await this.formSubmissionService.buildUserDocumentForElasticsearch(
userId
);
if (fullUserDoc) {
await this.userElasticsearchService.updateUser(
userId,
{ doc: fullUserDoc },
async (userId: string) => {
return await this.formSubmissionService.buildUserDocumentForElasticsearch(
userId
);
}
);
async (userId: string, cohortId: string) => {
// Application data provider function
return await this.formSubmissionService.buildUserDocumentForElasticsearch(userId)
.then(userDoc => userDoc?.applications?.find(app => app.cohortId === cohortId))
.catch(() => null);
}
}
);
} catch (error) {
// Log the error but don't fail the entire request
// This ensures that database updates succeed even if Elasticsearch fails
Expand Down
Loading