Spring Boot - AWS!

AWS Library

해당 포스팅에선 서비스의 설명은 문서로 대체하고 Spring 과의 연동을 위한 라이브러리 사용방법에 대해서만 설명한다.

https://awspring.io/
https://github.com/awspring/spring-cloud-aws
https://docs.aws.amazon.com/ko_kr/sdk-for-java/latest/developer-guide/get-started.html

Spring Boot3 부터는 AWS SDK for Java 2.x, Spring Cloud AWS 2023.0.x 버전 이상을 써야한다.

allprojects {
  // build.gradle.kts
  val springBootVersion="3.2.4"
  val springCloudAwsVersion="3.2.1"
  val softwareAwsSdkBomVersion="2.21.20"

  // ...

  dependencies {
      implementation(platform("software.amazon.awssdk:bom:${softwareAwsSdkBomVersion}"))
      implementation(platform("io.awspring.cloud:spring-cloud-aws-dependencies:${springCloudAwsVersion}"))
  }
}

LocalStack

https://www.localstack.cloud/

테스트는 LocalStack 사용, 각종 AWS 서비스를 로컬에서 테스트할 수 있게 도와준다.

services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}"
    image: localstack/localstack
    ports:
      - "127.0.0.1:4566:4566"            # LocalStack Gateway
      - "127.0.0.1:4510-4559:4510-4559"  # external services port range
    environment:
      # LocalStack configuration: https://docs.localstack.cloud/references/configuration/
      - DEBUG=${DEBUG:-0}
      # - LOCALSTACK_AUTH_TOKEN=${LOCALSTACK_AUTH_TOKEN- } # pro version 에서 요구
    volumes:
      - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"

GUI 사용은 LocalStack Desktop 혹은 아래 브라우저 기반 사이트에서 localhost 에서 동작중인 LocalStack 서비스에 접근 가능

https://app.localstack.cloud/inst/default/resources 회원가입 필수

CLI 에선 awslocal 툴을 사용하는것을 추천.

pip install awscli-local

awslocal configure
AWS Access Key ID [None]: test-access-key
AWS Secret Access Key [None]: test-secret-key
Default region name [None]: us-east-1
Default output format [None]: json

awslocal s3 mb s3://my-local-bucket
awslocal s3 ls

LocalStack 에 접근하기 위한 access key, secret key 는 아무값이나 입력하면 된다.

@Configuration
class AwsCredential {

    @Bean
    fun awsCredentialsProvider(): AwsCredentialsProvider {
        return StaticCredentialsProvider.create(
            AwsBasicCredentials.create("access-key", "secret-key")
        )
    }
}

http://localhost:4566 URL 을 Endpoint 로 사용해 각종 AWS 서비스 테스트 가능하다.

데모코드

아래 서비스 테스트 진행

  • SQS
  • S3
  • ParameterStore
  • Lambda
  • DynamoDB

https://github.com/Kouzie/spring-aws-demo

SQS

SQS 설명 https://docs.aws.amazon.com/ko_kr/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html

dependencies {
    implementation("software.amazon.awssdk:sqs")
    implementation("io.awspring.cloud:spring-cloud-aws-starter-sqs")
}

spring-cloud-aws-starter-sqs 를 사용하면 어노테이션, application.yml 을 통해 SQS 를 설정하고 메세지를 수신받을 수 있다.

/**
 * https://docs.awspring.io/spring-cloud-aws/docs/3.0.1/apidocs/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.html#maxMessagesPerPoll(int)
 * listener container bean 을 별도로 생성하고, 해당 bean 이 @SqsListener 메서드를 호출하는 방식
 * queueNames(values): queue 이름
 * factory: listener container bean 생성전략
 * id: listener container bean id 지정
 * maxConcurrentMessages: listener bean 내부 워커스레드 개수 지정, default 10,
 * pollTimeoutSeconds: SQS Long Polling 시간(초 단위) default 10
 * maxMessagesPerPoll: polling 당 가져올 메세지 수 default 10
 * messageVisibilitySeconds: 메세지 처리 보장시간 default 30, 시간내에 메세지를 확인처리하지 않으면 다시 소비가능한 상태로 돌아간다.
 * acknowledgementMode: 메세지 확인후 삭제 모드 default ON_SUCCESS
 *  - ON_SUCCESS: 오류 없이 메서드 완료시 확인 및 삭제처리
 *  - ALWAYS: 오류 상관 없이 확인 및 삭제처리
 *  - MANUAL: 수동 확인 및 삭제처리
 * */
@SqsListener(
    queueNames = ["\${demo.aws.sqs.queue-name}"],
    maxConcurrentMessages = "10",
    pollTimeoutSeconds = "20",
    maxMessagesPerPoll = "5",
    messageVisibilitySeconds = "10",
    acknowledgementMode = "MANUAL",
)
fun receiveMessage(
    message: String,
    @Header("demoAttr") demoAttrValue: String,
    ack: Acknowledgement,
) {
    logger.info("header: ${demoAttrValue}, payload:${message}")
    ack.acknowledge() // 수동 확인 처리
}

간결하고 직관적이지만 어노테이션 AOP 로 동작되기 때문에 커스텀하기 힘들 수 있다.

kotlin 과 코루틴을 사용할 수 있다면 software.amazon.awssdk:sqs 라이브러리를 사용해 직접 Polling 처리를 하는것도 좋아보인다.
빠른 SQS 메세지 처리를 수행할 수 있고 실시간으로 maxNumberOfMessages, waitTimeSeconds 를 조절해 서버 부하를 조절할 수 도 있다.

private var running = true // 작업 실행 상태 플래그

companion object {
    const val maxNumberOfMessages = 10;
    const val waitTimeSeconds = 10;
}

/**
    * 과도한 흐름을 제어하기 위해 별도의 Thread Pool 적용
    * 하나의 scope 로 관리할 경우 과도한 메세지가 들어오면 가장 밭깥의 while 문만 돌고 delete Message 는 되지 않아 악순환반복됨으로 스코프를 나눔
    * 일종의 우선순위처럼 동작 가능
    * */
private val pollMessageScope = CoroutineScope(
    Executors.newFixedThreadPool(1).asCoroutineDispatcher() + SupervisorJob()
)
private val processMessageScope = CoroutineScope(
    Executors.newFixedThreadPool(2).asCoroutineDispatcher() + SupervisorJob()
)

@PreDestroy
fun stopPolling() {
    logger.info("Stopping SQS polling...")
    running = false
    pollMessageScope.cancel() // 모든 코루틴 중단
    processMessageScope.cancel() // 모든 코루틴 중단
}

@PostConstruct
fun startPolling() {
    pollMessageScope.launch {
        while (running) { // 공통 루프에서 관리
            pollMessages()
        }
    }
}

private suspend fun pollMessages() {
    val receiveMessageRequest = ReceiveMessageRequest.builder()
        .queueUrl(sqsComponent.queueUrl)
        .maxNumberOfMessages(maxNumberOfMessages) // 한 번에 가져올 메시지 수
        .waitTimeSeconds(waitTimeSeconds) // Long polling 시간
        .build()

    val messages = sqsClient.receiveMessage(receiveMessageRequest).messages()
    logger.info("${messages.size} messages received")
    var jobs = messages.mapIndexed() { idx, message ->
        processMessageScope.launch {
            val workerName = Thread.currentThread().name
            processMessage(workerName, idx, message)
            logger.info("Polling cycle complete, workerId: $workerName")
        }
    }
}

private suspend fun processMessage(workerName: String, idx: Int, message: Message) {
    logger.info("Worker($workerName) processing index(${idx}) message: ${message.body()}")
    delay(10000L)
    // 메시지 처리 로직 추가
    deleteMessage(message)
    logger.info("Worker($workerName) finished index(${idx}) message: ${message.body()}")
}

private fun deleteMessage(message: Message) {
    sqsClient.deleteMessage(
        DeleteMessageRequest.builder()
            .queueUrl(sqsComponent.queueUrl)
            .receiptHandle(message.receiptHandle())
            .build()
    )
}

ParameterStore

/demo/param/ 을 prefix 로 ParameterStorekey-value 지정

awslocal ssm put-parameter \
    --type String \
    --name "/demo/param/DEMO_PROPERTIES_ID" \
    --value "aws_id"
    
awslocal ssm put-parameter \
    --type String \
    --name "/demo/param/DEMO_PROPERTIES_KEY" \
    --value "aws_key"

awslocal ssm put-parameter \
    --type String \
    --name "/demo/param/DEMO_PROPERTIES_URL" \
    --value "aws_url"

spring.config.import 속성을 통해 Parameter Store 에 저장된 값을 Spring Properties 값으로 사용할 수 있다.

dependencies {
    implementation("io.awspring.cloud:spring-cloud-aws-starter-parameter-store")
}
spring:
  application:
    name: param-demo
  profiles:
    active: paramstore
demo:
  properties:
    id: ${DEMO_PROPERTIES_ID:default_id}
    key: ${DEMO_PROPERTIES_KEY:default_key}
    url: ${DEMO_PROPERTIES_URL:default_url}
---
spring:
  config:
    activate:
      on-profile: paramstore
    import: 'optional:aws-parameterstore:/demo/param/'
  cloud:
    aws:
      region:
        static: us-east-1
      credentials:
        secret-key: test-access-key
        access-key: test-secret-key
      parameterstore:
        enabled: true
        endpoint: http://localhost:4566

비슷한 역할을 수행하는 서비스로 Secret Manager 가 있으며 key-value 이외에도 인증서, 암호화 키등을 관리할 수 있으며 수명주기로 관리할 수 있다.

Lambda

dependencies {
    implementation("software.amazon.awssdk:lambda")
}

비동기로 호출할것인지 동기로 호출할 것인지 지정, Event 로 호출할 경우 HttpStatus.ACCEPTED(202) 가 반환된다.

/**
    * @param functionName 등록한 함수명
    * @param payload lambda 에 전달할 페이로드
    * @param type 호출 타입
    *  Event: 비동기
    *  RequestResponse: 동기
    *  DryRun: 테스트호출, 실행하지 않고 request 가 유효한지만 확인
    *
    * */
fun invokeLambda(functionName: String, payload: String, type: InvocationType): String {
    var request = InvokeRequest.builder()
        .invocationType(type)
        .functionName(functionName)
        .payload(SdkBytes.fromUtf8String(objectMapper.writeValueAsString(mapOf("body" to payload))))
        .build()
    var response: InvokeResponse = lambdaClient.invoke(request)
    if (response.statusCode() in listOf(HttpStatus.OK.value(), HttpStatus.ACCEPTED.value())) {
        return response.payload().asUtf8String()
    } else {
        throw RuntimeException("failed to invoke lambda:${functionName}, status:${response.statusCode()}")
    }
}

DynamoDB

참고: 한번씩 읽어볼만한 문서들 https://kouzie.github.io/database/DynamoDB
https://docs.aws.amazon.com/ko_kr/sdk-for-java/latest/developer-guide/java_dynamodb_code_examples.html
https://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/DynamoDBEnhanced.html
https://github.com/aws/aws-sdk-java-v2/tree/master/services-custom/dynamodb-enhanced

Srping Boot 에서 java sdk v2 dynamodb 를 사용하는 예제 https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/usecases/creating_dynamodb_web_app
https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/javav2/usecases/creating_first_project
https://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/ProgrammingWithJava.html

noArg {
    annotation("software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean") // @DynamoDbBean 기본생성자 생성
}

dependencies {
    implementation("software.amazon.awssdk:dynamodb")
    implementation("software.amazon.awssdk:dynamodb-enhanced")
}

해당 예제에선 싱글 테이블 패턴을 사용한다.

@Component
class DynamodbComponent(
    val dynamoDbClient: DynamoDbClient,
    val dynamoDbEnhancedClient: DynamoDbEnhancedClient
) {
    @Value("\${ddb.sigle-table-name}")
    private lateinit var tableName: String

    /**
     * single table 전략
     * pk sk 필수
     * expired 가 있는 데이터는 자동 삭제됨
     * */
    @PostConstruct
    private fun createTableIfNotExists() {
        val tableNames = dynamoDbClient.listTables().tableNames()
        if (!tableNames.contains(tableName)) {
            dynamoDbClient.createTable {
                it.tableName(tableName)
                it.keySchema(
                    KeySchemaElement.builder().attributeName("pk").keyType(KeyType.HASH).build(),
                    KeySchemaElement.builder().attributeName("sk").keyType(KeyType.RANGE).build()
                )
                it.attributeDefinitions(
                    AttributeDefinition.builder().attributeName("pk").attributeType(ScalarAttributeType.S).build(),
                    AttributeDefinition.builder().attributeName("sk").attributeType(ScalarAttributeType.S).build()
                )
                it.provisionedThroughput {
                    it.readCapacityUnits(5)
                    it.writeCapacityUnits(5)
                }
            }
            println("Created table $tableName")
        }
        val describeTTLRequest = DescribeTimeToLiveRequest.builder()
            .tableName(tableName)
            .build()
        val describeTTLResponse: DescribeTimeToLiveResponse = dynamoDbClient.describeTimeToLive(describeTTLRequest)
        val currentTTLStatus: TimeToLiveStatus = describeTTLResponse.timeToLiveDescription().timeToLiveStatus()

        if (currentTTLStatus == TimeToLiveStatus.DISABLED) {
            // TTL 설정
            val ttlAttributeName = "expired"
            val ttlReq = UpdateTimeToLiveRequest.builder()
                .tableName(tableName)
                .timeToLiveSpecification(
                    TimeToLiveSpecification.builder()
                        .attributeName(ttlAttributeName)
                        .enabled(true)
                        .build()
                )
                .build()
            dynamoDbClient.updateTimeToLive(ttlReq)
            println("Updated TTL Config $tableName $ttlAttributeName")
        }
    }
    /**
     * mapping 만 해줄뿐이지 single table pattern 에서 타입별 필터링까진 해주지 않는다.
     * */
    fun <T : Any> generateDynamoDbTable(entityClass: KClass<T>): DynamoDbTable<T> {
        return dynamoDbEnhancedClient.table(tableName, TableSchema.fromBean(entityClass.java))
    }
}

어노테이션

https://docs.aws.amazon.com/ko_kr/sdk-for-java/latest/developer-guide/ddb-en-client-anno-index.html

  • enhanced.dynamodb.mapper.annotations
    • DynamoDbBean
      class, 데이터 클래스 지정
    • DynamoDbPartitionKey
      getter, 기본 파티션키 지정
    • DynamoDbSortKey
      getter, 기본 정렬키 지정

위에 3가지는 필수적으로 사용하고 나머지 어노테이션은 선택사항

  • enhanced.dynamodb.mapper.annotations
    • DynamoDbSecondaryPartitionKey
      getter, GSI 의 파티션키
    • DynamoDbSecondarySortKey
      getter, GSI 의 정렬키, LSI 의 정렬키
    • DynamoDbAttribute
      getter, 속성의 이름을 변경
    • DynamoDbIgnore
      getter, 속성 무시
    • DynamoDbFlatten
      getter, 중첩 클래스 평탄화
    • DynamoDbConvertedBy
      getter, 컨버터 클래스 지정
    • DynamoDbIgnoreNulls
      getter, not null 효과
    • DynamoDbImmutable
      class, update 불가한 데이터
    • DynamoDbPreserveEmptyObject
      getter, 중첩클래스의 경우 값을 가져올 때 null 대신 기본생성자로 데이터가 설정됨
    • DynamoDbUpdateBehavior
      getter, 업데이트시 동작 행동, [WRITE_ALWAYS, WRITE_IF_NOT_EXISTS] 설정 가능
  • enhanced.dynamodb.extensions.annotations
    • DynamoDbVersionAttribute getter, 항목 버전 번호를 증가시키고 추적 낙관적 잠금 기능을 제공
    • DynamoDbAtomicCounter
      getter, 레코드가 write 될 때마다 태그가 지정된 숫자 속성이 증가
    • DynamoDbAutoGeneratedTimestampAttribute
      getter, 레코드가 write 될 때마다 태그가 지정된 속성을 현재 타임스탬프로 지정
// Sort key for primary index and partition key for GSI "SubjectLastPostedDateIndex".
@DynamoDbSortKey
@DynamoDbSecondaryPartitionKey(indexNames = "SubjectLastPostedDateIndex")
public String getSubject() {
    return Subject;
}

public void setSubject(String subject) {
    Subject = subject;
}

// Sort key for GSI "SubjectLastPostedDateIndex" and sort key for LSI "ForumLastPostedDateIndex".
@DynamoDbSecondarySortKey(indexNames = {"SubjectLastPostedDateIndex", "ForumLastPostedDateIndex"})
public String getLastPostedDateTime() {
    return LastPostedDateTime;
}

이 외의 어노테이션은 위 aws 공식문서에서 확인

@DynamoDbBean
class DemoEntity(
    @get:DynamoDbPartitionKey
    var pk: String, // DEMO#{demoId}
    @get:DynamoDbSortKey
    var sk: String, // DEMO#{demoId}
    @get:DynamoDbAttribute("attrKey")
    val internalKey: String,
    @get:DynamoDbIgnore
    val ignoreKey: String,
    @get:DynamoDbVersionAttribute // 항목 버전 번호, 낙관적 잠금 지원
    val version: Long,
    @get:DynamoDbAtomicCounter(startValue = 0, delta = 1) // 1씩 증가
    val updateCounter: Long,

    @get:DynamoDbAutoGeneratedTimestampAttribute
    val updateTimestamp: Instant,
    @get:DynamoDbUpdateBehavior(UpdateBehavior.WRITE_ALWAYS)
    val updated: Instant,
    @get:DynamoDbUpdateBehavior(UpdateBehavior.WRITE_IF_NOT_EXISTS)
    val created: Instant,
) {

}

@DynamoDbBean
class DemoFlatten {

}

Query

기본 키(Primary Key)PartitionKey, SortKey 로 구성하였다면 해당 파티션 내에 Query(범위질의)가 가능하다.

fun getByCustomerId(customerId: String, beginDate: Long, endingDate: Long): List<OrderDto> {
    val key = Key.builder().partitionValue("CUSTOMER#$customerId").build()
    val request: QueryConditional = QueryConditional.keyEqualTo(key)

    val query = QueryConditional.sortBetween(
        Key.builder().partitionValue("CUSTOMER#$customerId").sortValue("ORDER#${beginDate}#${MIN_UUID}").build(),
        Key.builder().partitionValue("CUSTOMER#$customerId").sortValue("ORDER#${endingDate}#${MAX_UUID}").build()
    )
    orderTable.query(request)
    val result: PageIterable<CustomerOrderEntity> = customerOrderTable.query(query)
    val keysToGet: List<String> = result.items()
        .map { "ORDER#${it.sk.split("#")[2]}" }

    val readBatchBuilder = ReadBatch.builder(OrderEntity::class.java)
        .mappedTableResource(orderTable)
    for (orderId in keysToGet) {
        readBatchBuilder.addGetItem(Key.builder().partitionValue(orderId).sortValue(orderId).build())
    }
    var resultPages: BatchGetResultPageIterable =
        enhancedClient.batchGetItem { b -> b.readBatches(readBatchBuilder.build()) }
    return resultPages.resultsForTable(orderTable).map { mapper.toDto(it) }
}

Scan

fun getByCreateTimeBetween(startTime: Instant, endTime: Instant): List<CustomerDto> {
    val scanRequest = ScanEnhancedRequest.builder()
        .filterExpression(
            Expression.builder()
                .expression("created BETWEEN :startTime AND :endTime")
                .expressionValues(
                    mapOf(
                        ":startTime" to AttributeValue.builder().s(startTime.toString()).build(),
                        ":endTime" to AttributeValue.builder().s(endTime.toString()).build()
                    )
                )
                .build()
        )
        .build()
    val result: PageIterable<CustomerEntity> = customerTable.scan(scanRequest)
    return result.items()
        .filter { it.type == "CUSTOMER"}
        .map { mapper.toDto(it) }
}

트랜잭션

@DynamoDbBean
class CustomerEntity(
    @get:DynamoDbPartitionKey
    var pk: String, // CUSTOMER#{customerId}
    @get:DynamoDbSortKey
    var sk: String, // CUSTOMER#{customerId}
    var type: String, // CUSTOMER
    var username: String,
    var password: String,
    var expired: Long, // 유효기간
    var updated: Instant,
    var created: Instant
)


@DynamoDbBean
class CustomerInfoEntity(
    @get:DynamoDbPartitionKey
    var pk: String, // CUSTOMER#{customerId}
    @get:DynamoDbSortKey
    var sk: String, // CUSTOMER_INFO#{customerId}
    var type: String, // CUSTOMER_INFO
    var age: Int,
    var email: String,
    var nickname: String,
    var intro: String,
    var expired: Long, // 유효기간
    var updated: Instant,
    var created: Instant
)

고객 테이블의 로그인 Entity상세정보 Entity 2개를 Transactional 하게 저장

val customerTable = dynamodbComponent.generateDynamoDbTable(CustomerEntity::class)
val customerInfoTable = dynamodbComponent.generateDynamoDbTable(customerInfoTable::class)

fun create(request: CustomerAddRequest): CustomerDetailDto {
    val customerId = UUID.randomUUID().toString()
    val now = Instant.now()
    val customerEntity = mapper.toEntity(request, customerId, now)
    val customerInfoEntity = mapper.toCustomerInfoEntity(request, customerId, now)
    enhancedClient.transactWriteItems(
        TransactWriteItemsEnhancedRequest.builder()
            .addPutItem(customerTable, customerEntity)
            .addPutItem(customerInfoTable, customerInfoEntity)
            .build()
    )
    return mapper.toDetailDto(customerEntity, customerInfoEntity)
}

배치 Read

두개 이상의 기본 키(Primary Key) 를 한번에 조회할 때 사용한다.

SortKey 를 사용한 Query배치 Read 에선 사용할 수 없다.

fun getById(customerId: String): CustomerDetailDto {
    val pk = "CUSTOMER#${customerId}"
    val infoSk = "CUSTOMER_INFO#${customerId}"
    val customerRead: ReadBatch = ReadBatch.builder(CustomerEntity::class.java)
        .mappedTableResource(customerTable)
        .addGetItem(Key.builder().partitionValue(pk).sortValue(pk).build())
        .build()

    val customerInfoRead: ReadBatch = ReadBatch.builder(CustomerInfoEntity::class.java)
        .mappedTableResource(customerInfoTable)
        .addGetItem(Key.builder().partitionValue(pk).sortValue(infoSk).build())
        .build()

    val resultPages: BatchGetResultPageIterable = enhancedClient.batchGetItem(
        BatchGetItemEnhancedRequest.builder()
            .readBatches(customerRead, customerInfoRead)
            .build()
    )

    // results 를 반복하면 일치하는 값 필터링
    val entity: CustomerEntity = resultPages.resultsForTable(customerTable)
        .first { it.type == "CUSTOMER" }
    val customerInfoEntity: CustomerInfoEntity = resultPages.resultsForTable(customerInfoTable)
        .first { it.type == "CUSTOMER_INFO" }
    return mapper.toDetailDto(entity, customerInfoEntity)
}

카테고리:

업데이트: