Okay, here's a comprehensive description suitable for a pull request, covering the changes related to the adjustment trigger endpoint, event generation, and Kafka integration. I've structured it to be clear, concise, and informative for reviewers. **Pull Request Description:** **Title:** Implement Adjustment Trigger Endpoint and Asynchronous Event Processing **Description:** This pull request implements the core functionality for triggering adjustment requests (UNDO and RETRY) within the CGFS Retry Service. It introduces an asynchronous, message-driven architecture using an internal Kafka topic to decouple request handling from event processing, significantly improving resilience and scalability. **Key Changes:** 1. **API Endpoint (`/api/v1/adjustments/adjustment` - POST):** * A new REST endpoint is introduced to accept adjustment requests. * The endpoint accepts a JSON payload (`AdjustmentRequestDto`) containing: * `requestId` (client-generated UUID, for idempotency) * `eventIds` (list of business event IDs to adjust) * `eventType` (ORDER_ID or SHIPMENT_ID) * `business` (business unit) * `countryCode` (region code) * `type` (UNDO or RETRY) * `createdBy` * The endpoint returns a `202 Accepted` response immediately, with the `requestId` in the response body (`AdjustmentResponseDto`). This signifies that the request has been *accepted for processing*, not that the adjustment is complete. * The endpoint has request validations to verify if events are already in progress. 2. **`AdjustmentService` and `AdjustmentServiceImpl`:** * **`processAdjustment(AdjustmentRequestDto)`:** This is the main entry point for handling adjustment requests. * It first validates the request using `validateAdjustmentRequest()`, to check for the events which are already under process. * It checks for existing, completed adjustments with the same `requestId` (idempotency). * It creates a new `AdjustmentRequest` entity (or retrieves an existing one) and sets its initial state to `PENDING`. * It delegates to the `EventGenerationService` to create the initial `ProcessingEvent` records and publish them to the internal Kafka topic. 3. **`EventGenerationService` and `EventGenerationServiceImpl`:** * **`generateUndoEvents(AdjustmentRequest)`:** This method is now the *single* entry point for generating initial events. It *always* creates `UNDO` type events, regardless of the overall request type (`RETRY` or `UNDO`). * It calls BEP via Pylon framework to get Finance Event Ids. * It creates `ProcessingEvent` records with `type = UNDO`, `state = PENDING`, and `parentProcessingEventId = null`. * It saves these `ProcessingEvent` records to the database. * **Crucially**, it publishes a message to the *internal* Kafka topic (`cgfs_retry.processing_events.1`) for *each generated UNDO event*. The message contains the `processingEventId` and `requestId`. * **`generateReprocessEvent(List completedUndoEvents)`:** This method creates `REPROCESS` events *after* successful completion of the corresponding `UNDO` events. It sets the `parentProcessingEventId` to link the REPROCESS event to its corresponding UNDO event. It also publishes a message to the *internal* Kafka topic. 4. **`EventOrchestrator` and `EventOrchestratorImpl`:** * **`EventOrchestratorImpl`** is now a *consumer* of two Kafka topics: * **`cgfs_retry.processing_events.1` (Internal Topic):** Receives messages containing a `processingEventId`. This triggers the processing of a *single* `ProcessingEvent`. * **`fcs_undo` (FCS Feedback Topic):** Receives feedback from FCS about the success or failure of individual event processing (UNDO or REPROCESS). * **`processProcessingEventMessage(ProcessingEventMessage)`:** Handles messages from the internal topic. It: * Retrieves the `ProcessingEvent` from the database. * Checks the `state` and `parentProcessingEventId` to ensure correct processing order and idempotency. * Updates the `ProcessingEvent` state to `IN_PROGRESS`. * Constructs a `AdjustmentBepEvent` and publishes it to BEP (via `CounectProducerService` and the `RetryUpstream` topic). * **`processFeedback(FCSFeedbackMessage)`:** Handles feedback from FCS. It: * Retrieves the corresponding `ProcessingEvent`. * Updates the `ProcessingEvent` status (`COMPLETED` or `FAILED`). * **Decision Logic:** Based on the `ProcessingEvent` type (`UNDO` or `REPROCESS`), the feedback status (`SUCCESS` or `FAILED`), and the *overall* request type (`RETRY` or `UNDO` retrieved from AdjustmentRequest), it determines whether to: * Create `REPROCESS` events (if `UNDO` succeeded and it's a `RETRY` request). * Update the overall `AdjustmentRequest` status (`checkAndUpdateRequestStatus`). * **`checkAndCreateReprocessEvents(String requestId)`:** This method checks if *all* UNDO events for a given `requestId` are `COMPLETED`. If so, it calls `eventGenerationService.generateReprocessEvent()` to create the corresponding REPROCESS events. * **`checkAndUpdateRequestStatus`:** Updates the state of adjustment request to COMPLETED/FAILED/PROCESSING based on processing events states. 5. **Internal Kafka Topic (`cgfs_retry.processing_events.1`):** * This new topic is used for *internal* communication within the `cgfs-retry` service. It decouples event generation from event processing, making the system more resilient. * Producer: `EventGenerationServiceImpl`. * Consumer: `EventOrchestratorImpl` (via `AdjustmentProcessingEventHandler`). * Message Type: `ProcessingEventMessage` (containing `processingEventId` and `requestId`). 6. **Counect Integration:** * **`CounectProducerService`:** Used for sending messages to both the internal topic and the `RetryUpstream` topic (for BEP). * **`CounectConsumerService`:** Used for consuming messages from both `cgfs_retry.processing_events.1` and `fcs_undo`. * **`AdjustmentEventPublisher`:** An interface that abstracts the publishing of messages, hiding the Counect details from the domain layer. * **`CounectAdjustmentEventPublisher`:** The implementation of `AdjustmentEventPublisher` that uses `CounectProducerService`. * **`RetryEventHandlerFactory`:** Registers the appropriate handlers for the internal topic (`AdjustmentProcessingEventHandler`) and the FCS feedback topic (`AdjustmentFcsFeedbackEventHandler`). * **`RetryConsumer` and `RetrySubscriber`:** Counect consumer classes for processing events. * **`CounectJsonSerializer` and `CounectJsonDeserializer`**: for serialization and deserialization. 7. **Database Interactions:** * Added port interfaces `ProcessingEventRepository` and `AdjustmentRequestRepository`. * Implemented the delegator classes which implements port and injects corresponding JPA repositories `ProcessingEventJpaRepository` and `AdjustmentJpaRepository`. 8. **Exception Handling** * Custom `RetryServiceException` is added for handling retry service level exceptions. 9. **DTOs (Data Transfer Objects):** * `ProcessingEventMessage`: For internal Kafka topic messages. * `RetryUpstreamMessage`: For sending messages to BEP. * `FCSFeedbackMessage`: For receiving feedback from FCS. 10. **Entity and Model Classes:** * The model and entity classes are added and updated with necessary class and fields with proper annotations. 11. **Mappers:** * Added mappers for mapping data between entity classes and domain model. **Key Architectural Improvements:** * **Asynchronous Processing:** The use of the internal Kafka topic makes the adjustment process fully asynchronous. The API call returns immediately, and the actual processing happens in the background. * **Decoupling:** The `EventGenerationService` and `EventOrchestrator` are completely decoupled, communicating only via messages. This makes the system more resilient and easier to scale. * **Resiliency:** If the `EventOrchestrator` or any part of the processing pipeline fails, messages will remain in the Kafka topic and will be processed when the service recovers. * **Idempotency:** The design incorporates checks to ensure idempotency, preventing duplicate processing. * **Scalability:** The asynchronous, message-driven architecture allows for horizontal scaling by adding more consumer instances. * **Maintainability:** The code is well-structured, follows SOLID principles, and uses clear interfaces and abstractions. * **Testability:** The use of interfaces and dependency injection. * **Error Handling:** Proper error handling. This comprehensive description provides a clear and detailed overview of the implemented changes and their impact on the CGFS Retry Service, emphasizing the architectural improvements and adherence to best practices. This should be suitable for a detailed pull request review.