/** * https://docs.awspring.io/spring-cloud-aws/docs/3.0.1/apidocs/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.html#maxMessagesPerPoll(int) * listener container bean 을 별도로 생성하고, 해당 bean 이 @SqsListener 메서드를 호출하는 방식 * queueNames(values): queue 이름 * factory: listener container bean 생성전략 * id: listener container bean id 지정 * maxConcurrentMessages: listener bean 내부 워커스레드 개수 지정, default 10, * pollTimeoutSeconds: SQS Long Polling 시간(초 단위) default 10 * maxMessagesPerPoll: polling 당 가져올 메세지 수 default 10 * messageVisibilitySeconds: 메세지 처리 보장시간 default 30, 시간내에 메세지를 확인처리하지 않으면 다시 소비가능한 상태로 돌아간다. * acknowledgementMode: 메세지 확인후 삭제 모드 default ON_SUCCESS * - ON_SUCCESS: 오류 없이 메서드 완료시 확인 및 삭제처리 * - ALWAYS: 오류 상관 없이 확인 및 삭제처리 * - MANUAL: 수동 확인 및 삭제처리 * */ @SqsListener( queueNames = ["\${demo.aws.sqs.queue-name}"], maxConcurrentMessages = "10", pollTimeoutSeconds = "20", maxMessagesPerPoll = "5", messageVisibilitySeconds = "10", acknowledgementMode = "MANUAL", ) funreceiveMessage( message: String, @Header("demoAttr") demoAttrValue: String, ack: Acknowledgement, ) { logger.info("header: ${demoAttrValue}, payload:${message}") ack.acknowledge() // 수동 확인 처리 }
간결하고 직관적이지만 어노테이션 AOP 로 동작되기 때문에 커스텀하기 힘들 수 있다.
kotlin 과 코루틴을 사용할 수 있다면 software.amazon.awssdk:sqs 라이브러리를 사용해 직접 Polling 처리를 하는것도 좋아보인다. 빠른 SQS 메세지 처리를 수행할 수 있고 실시간으로 maxNumberOfMessages, waitTimeSeconds 를 조절해 서버 부하를 조절할 수 도 있다.
/** * 과도한 흐름을 제어하기 위해 별도의 Thread Pool 적용 * 하나의 scope 로 관리할 경우 과도한 메세지가 들어오면 가장 밭깥의 while 문만 돌고 delete Message 는 되지 않아 악순환반복됨으로 스코프를 나눔 * 일종의 우선순위처럼 동작 가능 * */ privateval pollMessageScope = CoroutineScope( Executors.newFixedThreadPool(1).asCoroutineDispatcher() + SupervisorJob() ) privateval processMessageScope = CoroutineScope( Executors.newFixedThreadPool(2).asCoroutineDispatcher() + SupervisorJob() )
@PreDestroy funstopPolling() { logger.info("Stopping SQS polling...") running = false pollMessageScope.cancel() // 모든 코루틴 중단 processMessageScope.cancel() // 모든 코루틴 중단 }
@PostConstruct funstartPolling() { pollMessageScope.launch { while (running) { // 공통 루프에서 관리 pollMessages() } } }
privatesuspendfunpollMessages() { val receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(sqsComponent.queueUrl) .maxNumberOfMessages(maxNumberOfMessages) // 한 번에 가져올 메시지 수 .waitTimeSeconds(waitTimeSeconds) // Long polling 시간 .build()
val messages = sqsClient.receiveMessage(receiveMessageRequest).messages() logger.info("${messages.size} messages received") var jobs = messages.mapIndexed() { idx, message -> processMessageScope.launch { val workerName = Thread.currentThread().name processMessage(workerName, idx, message) logger.info("Polling cycle complete, workerId: $workerName") } } }
privatesuspendfunprocessMessage(workerName: String, idx: Int, message: Message) { logger.info("Worker($workerName) processing index(${idx}) message: ${message.body()}") delay(10000L) // 메시지 처리 로직 추가 deleteMessage(message) logger.info("Worker($workerName) finished index(${idx}) message: ${message.body()}") }
DynamoDbSecondarySortKey getter, GSI 의 정렬키, LSI 의 정렬키
DynamoDbAttribute getter, 속성의 이름을 변경
DynamoDbIgnore getter, 속성 무시
DynamoDbFlatten getter, 중첩 클래스 평탄화
DynamoDbConvertedBy getter, 컨버터 클래스 지정
DynamoDbIgnoreNulls getter, not null 효과
DynamoDbImmutable class, update 불가한 데이터
DynamoDbPreserveEmptyObject getter, 중첩클래스의 경우 값을 가져올 때 null 대신 기본생성자로 데이터가 설정됨
DynamoDbUpdateBehavior getter, 업데이트시 동작 행동, [WRITE_ALWAYS, WRITE_IF_NOT_EXISTS] 설정 가능
enhanced.dynamodb.extensions.annotations
DynamoDbVersionAttribute getter, 항목 버전 번호를 증가시키고 추적 낙관적 잠금 기능을 제공
DynamoDbAtomicCounter getter, 레코드가 write 될 때마다 태그가 지정된 숫자 속성이 증가
DynamoDbAutoGeneratedTimestampAttribute getter, 레코드가 write 될 때마다 태그가 지정된 속성을 현재 타임스탬프로 지정
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Sort key for primary index and partition key for GSI "SubjectLastPostedDateIndex". @DynamoDbSortKey @DynamoDbSecondaryPartitionKey(indexNames = "SubjectLastPostedDateIndex") public String getSubject() { return Subject; }
public void setSubject(String subject) { Subject = subject; }
// Sort key for GSI "SubjectLastPostedDateIndex" and sort key for LSI "ForumLastPostedDateIndex". @DynamoDbSecondarySortKey(indexNames = {"SubjectLastPostedDateIndex", "ForumLastPostedDateIndex"}) public String getLastPostedDateTime() { return LastPostedDateTime; }
@DynamoDbBean classDemoEntity( @get:DynamoDbPartitionKey var pk: String, // DEMO#{demoId} @get:DynamoDbSortKey var sk: String, // DEMO#{demoId} @get:DynamoDbAttribute("attrKey") val internalKey: String, @get:DynamoDbIgnore val ignoreKey: String, @get:DynamoDbVersionAttribute// 항목 버전 번호, 낙관적 잠금 지원 val version: Long, @get:DynamoDbAtomicCounter(startValue = 0, delta = 1) // 1씩 증가 val updateCounter: Long,
@get:DynamoDbAutoGeneratedTimestampAttribute val updateTimestamp: Instant, @get:DynamoDbUpdateBehavior(UpdateBehavior.WRITE_ALWAYS) val updated: Instant, @get:DynamoDbUpdateBehavior(UpdateBehavior.WRITE_IF_NOT_EXISTS) val created: Instant, ) {
}
@DynamoDbBean classDemoFlatten {
}
Query
기본 키(Primary Key) 를 PartitionKey, SortKey 로 구성하였다면 해당 파티션 내에 Query(범위질의)가 가능하다.
@DynamoDbBean classCustomerEntity( @get:DynamoDbPartitionKey var pk: String, // CUSTOMER#{customerId} @get:DynamoDbSortKey var sk: String, // CUSTOMER#{customerId} var type: String, // CUSTOMER var username: String, var password: String, var expired: Long, // 유효기간 var updated: Instant, var created: Instant )
@DynamoDbBean classCustomerInfoEntity( @get:DynamoDbPartitionKey var pk: String, // CUSTOMER#{customerId} @get:DynamoDbSortKey var sk: String, // CUSTOMER_INFO#{customerId} var type: String, // CUSTOMER_INFO var age: Int, var email: String, var nickname: String, var intro: String, var expired: Long, // 유효기간 var updated: Instant, var created: Instant )
고객 테이블의 로그인 Entity 와 상세정보 Entity 2개를 Transactional 하게 저장
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
val customerTable = dynamodbComponent.generateDynamoDbTable(CustomerEntity::class) val customerInfoTable = dynamodbComponent.generateDynamoDbTable(customerInfoTable::class)
funcreate(request: CustomerAddRequest): CustomerDetailDto { val customerId = UUID.randomUUID().toString() val now = Instant.now() val customerEntity = mapper.toEntity(request, customerId, now) val customerInfoEntity = mapper.toCustomerInfoEntity(request, customerId, now) enhancedClient.transactWriteItems( TransactWriteItemsEnhancedRequest.builder() .addPutItem(customerTable, customerEntity) .addPutItem(customerInfoTable, customerInfoEntity) .build() ) return mapper.toDetailDto(customerEntity, customerInfoEntity) }