// src/main/java/com/coupang/cgfs/retry/adapter/mysql/config/KirinDatasourceConfig.java package com.coupang.cgfs.retry.adapter.mysql.config; import com.coupang.storageservice.client.KirinDataSourceImportRegistrar; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Profile; @Profile("!local") // Use Kirin in non-local environments @Configuration @Import({KirinDataSourceImportRegistrar.class}) public class KirinDatasourceConfig { // No explicit DataSource bean definitions are needed here. // KirinDataSourceImportRegistrar handles this based on application.yml. } # src/main/resources/application-develop.yml (Example - Adapt to your settings) spring: application: name: cgfs-retry-service datasource: #We are not defining the datasource, url here. we will define it in Kirin # url: jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=UTF8 driver-class-name: com.mysql.cj.jdbc.Driver # MySQL driver # Add hikari configurations if needed jpa: hibernate: ddl-auto: validate # Or 'update' in dev, but NEVER 'create' or 'create-drop' in production properties: hibernate: dialect: org.hibernate.dialect.MySQLDialect # Or MySQL8Dialect, etc. show_sql: true # Log SQL queries (for debugging - disable in production) format_sql: true jdbc: time_zone: UTC kirin: vault: authentication: APPROLE appRole: roleId: ${VAULT_ROLE_ID} # Get from environment variable secretId: ${VAULT_SECRET_ID} # Get from environment variable datasources: readDataSource: # Name of your read datasource type: DBCP2 # Use DBCP2 connection pooling vaultCreds: credsType: DATABASE_STATIC databaseStatic: roles: [fcs_db_s_fcsdev_1, fcs_db_s_fcsdev_2] # Your read-only DB roles rotation: intervalMinInDev: 1 # Rotation interval url: jdbc:mysql://aurora-mysql-fcs-dev.cluster-ro-cgz4ugkbn4yc.ap-northeast-2.rds.amazonaws.com:3306/fcs_db?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&roundRobinLoadBalance=true&connectTimeout=3000&socketTimeout=60000&useSSL=false pool: # DBCP2 connection pool settings (tune these!) driverClassName: com.mysql.cj.jdbc.Driver #make sure of driver class initialSize: 5 maxTotal: 60 # Maximum number of connections minIdle: 4 # Minimum idle connections maxIdle: 20 testOnBorrow: true # Validate connection before borrowing testWhileIdle: true # Validate idle connections validationQuery: SELECT 1 timeBetweenEvictionRunsMillis: 60000 # Check for idle connections every 60 seconds minEvictableIdleTimeMillis: 300000 # Minimum idle time before eviction (5 minutes) writeDataSource: #name of write datasource type: DBCP2 vaultCreds: credsType: DATABASE_STATIC databaseStatic: roles: [ fcs_db_s_fcsdev_1, fcs_db_s_fcsdev_2 ] # Your write-capable DB roles rotation: intervalMinInDev: 1 url: jdbc:mysql://aurora-mysql-fcs-dev.cluster-cgz4ugkbn4yc.ap-northeast-2.rds.amazonaws.com:3306/fcs_db?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&roundRobinLoadBalance=true&connectTimeout=3000&socketTimeout=60000&useSSL=false pool: driverClassName: com.mysql.cj.jdbc.Driver #make sure of driver class name initialSize: 5 maxTotal: 60 minIdle: 4 maxIdle: 20 testOnBorrow: true testWhileIdle: true validationQuery: SELECT 1 timeBetweenEvictionRunsMillis: 60000 minEvictableIdleTimeMillis: 300000 //ProcessingEventRepository package com.coupang.cgfs.retry.adapter.mysql.repository; import com.coupang.cgfs.retry.adapter.mysql.entity.ProcessingEventEntity; import com.coupang.cgfs.retry.domain.model.ProcessingEvent; import com.coupang.cgfs.retry.enums.AdjustmentType; import com.coupang.cgfs.retry.enums.ProcessingEventState; import java.util.List; import java.util.Map; import java.util.Optional; import com.coupang.cgfs.retry.domain.port.ProcessingEventPort; import com.coupang.cgfs.retry.adapter.mysql.mapper.ProcessingEventMapper; import lombok.RequiredArgsConstructor; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; @Repository @RequiredArgsConstructor public class ProcessingEventRepository implements ProcessingEventPort { private final ProcessingEventJpaRepository processingEventJpaRepository; @Override public ProcessingEvent save(ProcessingEvent processingEvent) { ProcessingEventEntity entity = ProcessingEventMapper.INSTANCE.toEntity(processingEvent); return ProcessingEventMapper.INSTANCE.toDomain(processingEventJpaRepository.save(entity)); } @Override public List saveAll(List processingEvents) { List entities = processingEvents.stream().map(ProcessingEventMapper.INSTANCE::toEntity).toList(); return processingEventJpaRepository.saveAll(entities).stream() .map(ProcessingEventMapper.INSTANCE::toDomain) .collect(Collectors.toList()); } @Override public Optional findById(Long id) { return processingEventJpaRepository.findById(id).map(ProcessingEventMapper.INSTANCE::toDomain); } @Override public List findByRequestId(String requestId) { return processingEventJpaRepository.findByRequestId(requestId).stream() .map(ProcessingEventMapper.INSTANCE::toDomain) .collect(Collectors.toList()); } @Override public List findByParentProcessingEventId(Long parentProcessingEventId) { return processingEventJpaRepository.findByParentProcessingEventId(parentProcessingEventId).stream() .map(ProcessingEventMapper.INSTANCE::toDomain) .collect(Collectors.toList()); } @Override public List findByParentProcessingEventIdAndState( Long parentProcessingEventId, ProcessingEventState state) { return processingEventJpaRepository .findByParentProcessingEventIdAndState(parentProcessingEventId, state) .stream() .map(ProcessingEventMapper.INSTANCE::toDomain) .collect(Collectors.toList()); } @Override public Map countByRequestIdAndState(String requestId) { return processingEventJpaRepository.countByRequestIdAndState(requestId); } @Override public List findByEventIdInAndStateIn( List eventIds, List states) { return processingEventJpaRepository.findByEventIdInAndStateIn(eventIds,states).stream() .map(ProcessingEventMapper.INSTANCE::toDomain) .collect(Collectors.toList());; } } // src/main/java/com/coupang/cgfs/retry/adapter/mysql/repository/ProcessingEventJpaRepository.java package com.coupang.cgfs.retry.adapter.mysql.repository; import com.coupang.cgfs.retry.adapter.mysql.entity.ProcessingEventEntity; import com.coupang.cgfs.retry.enums.AdjustmentType; import com.coupang.cgfs.retry.enums.ProcessingEventState; import java.util.List; import java.util.Map; import java.util.Optional; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; public interface ProcessingEventJpaRepository extends JpaRepository { List findByRequestId(String requestId); List findByParentProcessingEventId(Long parentProcessingEventId); List findByParentProcessingEventIdAndState(Long parentProcessingEventId, ProcessingEventState state); // Custom query to get counts grouped by state, for the status API @Query("SELECT pe.state, COUNT(pe) FROM ProcessingEventEntity pe WHERE pe.requestId = :requestId GROUP BY pe.state") Map countByRequestIdAndState(@Param("requestId") String requestId); List findByRequestIdAndType(String requestId, AdjustmentType adjustmentType); // Added method for Search API validation: List findByEventIdInAndStateIn(List eventIds, List states); Optional findById(Long id); // Added findById method } //RequestRepository // src/main/java/com/coupang/cgfs/retry/adapter/mysql/repository/RequestRepository.java package com.coupang.cgfs.retry.adapter.mysql.repository; import com.coupang.cgfs.retry.adapter.mysql.entity.RequestEntity; import com.coupang.cgfs.retry.domain.model.Request; import com.coupang.cgfs.retry.domain.port.RequestPort; import com.coupang.cgfs.retry.adapter.mysql.mapper.RequestMapper; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Repository; import java.util.Optional; @Repository @RequiredArgsConstructor public class RequestRepository implements RequestPort { private final RequestJpaRepository requestJpaRepository; @Override public Request save(Request request) { RequestEntity entity = RequestMapper.INSTANCE.toEntity(request); return RequestMapper.INSTANCE.toDomain(requestJpaRepository.save(entity)); } @Override public Optional findByRequestId(String requestId){ return Optional.ofNullable(RequestMapper.INSTANCE.toDomain(requestJpaRepository.findByRequestId(requestId))); } @Override public Request update(Request request){ RequestEntity entity = RequestMapper.INSTANCE.toEntity(request); return RequestMapper.INSTANCE.toDomain(requestJpaRepository.save(entity)); } } // src/main/java/com/coupang/cgfs/retry/adapter/mysql/mapper/ProcessingEventMapper.java package com.coupang.cgfs.retry.adapter.mysql.mapper; import com.coupang.cgfs.retry.adapter.mysql.entity.ProcessingEventEntity; import com.coupang.cgfs.retry.domain.model.ProcessingEvent; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @Mapper public interface ProcessingEventMapper { ProcessingEventMapper INSTANCE = Mappers.getMapper(ProcessingEventMapper.class); ProcessingEventEntity toEntity(ProcessingEvent processingEvent); ProcessingEvent toDomain(ProcessingEventEntity entity); } // src/main/java/com/coupang/cgfs/retry/adapter/mysql/mapper/RequestMapper.java package com.coupang.cgfs.retry.adapter.mysql.mapper; import com.coupang.cgfs.retry.adapter.mysql.entity.RequestEntity; import com.coupang.cgfs.retry.domain.model.Request; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @Mapper public interface RequestMapper { RequestMapper INSTANCE = Mappers.getMapper(RequestMapper.class); RequestEntity toEntity(Request request); Request toDomain(RequestEntity entity); } // src/main/java/com/coupang/cgfs/retry/domain/model/Request.java package com.coupang.cgfs.retry.domain.model; import com.coupang.cgfs.retry.enums.RequestState; import java.time.LocalDateTime; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; @Getter @Builder @NoArgsConstructor @AllArgsConstructor public class Request { private Long id; private String requestId; private RequestState requestState; private LocalDateTime createdAt; private LocalDateTime updatedAt; public void updateState(RequestState newState) { this.requestState = newState; } } //src/main/java/com/coupang/cgfs/retry/domain/port/RequestPort.java package com.coupang.cgfs.retry.domain.port; import com.coupang.cgfs.retry.domain.model.Request; import com.coupang.cgfs.retry.enums.RequestState; import java.util.List; import java.util.Optional; public interface RequestPort { Request save(Request request); Optional findByRequestId(String requestId); Request update(Request request); } // src/main/java/com/coupang/cgfs/retry/adapter/mysql/entity/ProcessingEventEntity.java package com.coupang.cgfs.retry.adapter.mysql.entity; import com.coupang.cgfs.retry.enums.AdjustmentType; import com.coupang.cgfs.retry.enums.ProcessingEventState; import java.time.LocalDateTime; import javax.persistence.*; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.hibernate.annotations.CreationTimestamp; import org.hibernate.annotations.UpdateTimestamp; @Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor @Entity @Table(name = "processing_event") public class ProcessingEventEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @Column(name = "request_id", nullable = false) private String requestId; @Column(name = "event_id", nullable = false) private String eventId; @Column(name = "parent_processing_event_id") private Long parentProcessingEventId; @Enumerated(EnumType.STRING) @Column(name = "type", nullable = false) private AdjustmentType type; @Enumerated(EnumType.STRING) @Column(name = "state", nullable = false) private ProcessingEventState state; @Column(name = "retry_count", nullable = false) private Integer retryCount; @Column(name = "detail", columnDefinition = "TEXT") private String detail; @CreationTimestamp @Column(name = "created_at", nullable = false, updatable = false) private LocalDateTime createdAt; @UpdateTimestamp @Column(name = "updated_at", nullable = false) private LocalDateTime updatedAt; } // src/main/java/com/coupang/cgfs/retry/adapter/mysql/entity/RequestEntity.java package com.coupang.cgfs.retry.adapter.mysql.entity; import com.coupang.cgfs.retry.enums.RequestState; import java.time.LocalDateTime; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.EnumType; import javax.persistence.Enumerated; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.Table; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.hibernate.annotations.CreationTimestamp; import org.hibernate.annotations.UpdateTimestamp; @Getter @Setter @AllArgsConstructor @NoArgsConstructor @Builder @Entity @Table(name = "request") public class RequestEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @Column(name = "request_id", nullable = false, unique = true) private String requestId; @Enumerated(EnumType.STRING) @Column(name = "request_state", nullable = false) private RequestState requestState; @CreationTimestamp @Column(name = "created_at", nullable = false, updatable = false) private LocalDateTime createdAt; @UpdateTimestamp @Column(name = "updated_at", nullable = false) private LocalDateTime updatedAt; } // src/main/java/com/coupang/cgfs/retry/adapter/mysql/config/CgfsRetryServiceBeansRegistry.java package com.coupang.cgfs.retry.adapter.mysql.config; import com.coupang.cgfs.retry.adapter.mysql.repository.ProcessingEventRepository; import java.util.HashMap; import java.util.Map; import java.util.Properties; import javax.persistence.EntityManager; import javax.persistence.EntityManagerFactory; import javax.sql.DataSource; import org.hibernate.boot.model.naming.CamelCaseToUnderscoresNamingStrategy; import org.hibernate.cfg.AvailableSettings; import org.hibernate.dialect.MySQLDialect; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Profile; import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; @Configuration @DependsOn("kirinDatasourceConfig") // Ensure Kirin config is loaded first @EnableTransactionManagement @Profile("!local") // Use this configuration in non-local environments public class CgfsRetryServiceBeansRegistry { @Bean("cgfsRetryRoutingDataSource") public DataSource replicationRoutingDataSource( @Qualifier("writeDataSource") DataSource writeDataSource, // From Kirin @Qualifier("readDataSource") DataSource readDataSource // From Kirin ) { Map targetDataSources = new HashMap<>(); targetDataSources.put(Boolean.FALSE, writeDataSource); // Key: false = write targetDataSources.put(TRUE, readDataSource); // Key: true = read ReplicationRoutingDataSource replicationRoutingDataSource = new ReplicationRoutingDataSource(); replicationRoutingDataSource.setTargetDataSources(targetDataSources); replicationRoutingDataSource.setDefaultTargetDataSource(writeDataSource); // Default to write return replicationRoutingDataSource; } @Bean("cgfsRetryDataSource") @Primary // This is the main DataSource bean Spring will use public DataSource cgfsRetryDataSource( @Qualifier("cgfsRetryRoutingDataSource") DataSource replicationRoutingDataSource) { return new LazyConnectionDataSourceProxy(replicationRoutingDataSource); // For lazy connection } @Bean public EntityManager entityManager( @Qualifier("entityManagerFactory") EntityManagerFactory entityManagerFactory) { return entityManagerFactory.createEntityManager(); } @Primary @Bean(name = "entityManagerFactory") public LocalContainerEntityManagerFactoryBean entityManagerFactory( @Qualifier("cgfsRetryDataSource") DataSource dataSource) { HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter(); vendorAdapter.setGenerateDdl(false); // Don't let Hibernate auto-create tables vendorAdapter.setShowSql(true); // Log SQL (for debugging) LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean(); factory.setJpaVendorAdapter(vendorAdapter); // VERY IMPORTANT: Point to your entity classes factory.setPackagesToScan(ProcessingEventRepository.class.getPackageName()); factory.setDataSource(dataSource); Properties props = new Properties(); props.put(AvailableSettings.HBM2DDL_AUTO, "validate"); // Validate schema, don't create props.put(AvailableSettings.DIALECT, MySQLDialect.class.getName()); // Or MySQL8Dialect props.put(AvailableSettings.PHYSICAL_NAMING_STRATEGY, CamelCaseToUnderscoresNamingStrategy.class.getName()); factory.setJpaProperties(props); return factory; } @Primary @Bean(name = "transactionManager") public PlatformTransactionManager transactionManager( @Qualifier("entityManagerFactory") EntityManagerFactory entityManagerFactory) { JpaTransactionManager txManager = new JpaTransactionManager(); txManager.setEntityManagerFactory(entityManagerFactory); return txManager; } } // src/main/java/com/coupang/cgfs/retry/adapter/mysql/config/ReplicationRoutingDataSource.java // (Same as in the reference code - routes based on transaction read-only status) package com.coupang.cgfs.retry.adapter.mysql.config; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import org.springframework.transaction.support.TransactionSynchronizationManager; @Slf4j public class ReplicationRoutingDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { boolean readonly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); String name = TransactionSynchronizationManager.getCurrentTransactionName(); log.info("transanction={}, datasource={}",name, readonly ? "READ_DB": "WRITE_DB"); return readonly; } } // src/main/java/com/coupang/cgfs/retry/application/web/ApiApplication.java package com.coupang.cgfs.retry.application.web; import com.coupang.cgfs.retry.adapter.mysql.config.CgfsRetryServiceBeansRegistry; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Import; @Import({CgfsRetryServiceBeansRegistry.class}) // Import the bean registry @SpringBootApplication(scanBasePackages = {"com.coupang.cgfs.retry", "com.coupang.apigateway"}) public class ApiApplication { public static void main(String[] args) { SpringApplication.run(ApiApplication.class, args); } } // Example in SearchServiceImpl package com.coupang.cgfs.retry.domain.service; import com.coupang.cgfs.retry.adapter.mysql.repository.ProcessingEventRepository; import com.coupang.cgfs.retry.domain.model.ProcessingEvent; import com.coupang.cgfs.retry.dto.request.SearchEventsRequest; import com.coupang.cgfs.retry.dto.response.EventDetails; import com.coupang.cgfs.retry.dto.response.SearchEventsResponse; import com.coupang.cgfs.retry.enums.ProcessingEventState; import com.coupang.cgfs.retry.exception.RetryServiceException; import java.util.List; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.DataAccessException; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Slf4j @Service @RequiredArgsConstructor public class SearchServiceImpl implements SearchService { private final ProcessingEventRepository processingEventRepository; @Override @Transactional( readOnly = true) // VERY IMPORTANT: Use the read replica for searches. This is where Kirin shines. public SearchEventsResponse searchEvents(SearchEventsRequest request) { try { List events = processingEventRepository.findByEventIdInAndStateIn( request.getEventIds(), request.getStates()); if (events.isEmpty()) { return SearchEventsResponse.builder() .message("No events found matching the criteria.") .build(); } List eventDetailsList = events.stream() .map( event -> EventDetails.builder() .eventId(event.getEventId()) .state(event.getState().toString()) .retryCount(event.getRetryCount()) .detail(event.getDetail()) .createdAt(event.getCreatedAt()) .updatedAt(event.getUpdatedAt()) .build()) .collect(Collectors.toList()); return SearchEventsResponse.builder() .requestId(events.get(0).getRequestId()) .events(eventDetailsList) .build(); } catch (DataAccessException e) { log.error("Database error during event search", e); throw new RetryServiceException("Database error during event search", e); } catch (Exception e) { log.error("Unexpected error during event search", e); throw new RetryServiceException("Unexpected error during event search", e); } } }