Spring Cloud - Spring Cloud Stream
Message Driven System
이벤트 기반 마이크로서비스의 일종
서비스간 소비 가능한 이벤트를 주고받으며 서로 비동기 통신을 진행하는 시스템을 뜻한다.

이 이벤트를 Message System 을 사용해 구현할 경우 Message Driven System 이라 할 수 있다.
Message System 구성을 사용하려면 Message Broker 를 주로 사용하는데
Spring Boot 에서 대표적으로 사용하는 Message Broker 로 아래와 같은 프로젝트들이 있는데
- RabbitMQ
- Apache Kafka
- Kafka Streams
- Amazon Kinesis
- Google PubSub (partner maintained)
- Solace PubSub+ (partner maintained)
- Azure Event Hubs (partner maintained)
- Azure Service Bus (partner maintained)
- AWS SQS (partner maintained)
- AWS SNS (partner maintained)
- Apache RocketMQ (partner maintained)
엄밀히
Kafka분산 스트리밍 플랫폼이지만 여기선 단순Message Broker용도로 소개
RabbitMQ는 같은 VMWare 에서 관리하는 오픈소스 프로젝트이다 보니 서로 많은 지원을 해주는 듯
모든 프로젝트에서 Java Client 라이브러리를 제공하고 프로젝트에서 직접제공하는 라이브러리를 사용해도 되지만
여기선 Spring Cloud Stream 라이브러리를 사용해 위 두 Message Broker 를 쉽게 Spring Cloud 환경에 통합시킬 수 있다.
Spring Cloud Stream
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
https://spring.io/projects/spring-cloud-stream
Spring Cloud Stream 를 사용하면 어떤 Message Broker 를 사용하던 상관없이
동일한 코드를 통해 어플리케이션과 통합시킬 수 있다.
Kafka 는 Topic, Partition Consumer Group 을 사용하고
RabbitMQ 는 Exchange, Queue, Consumer Group 을 사용한다.
Kafka RabbitMQ 외에도 각종 클라우드 서비스(SQS, Google PusSub) 를 Spring Cloud Strema 으로 추상화 할 수 있다.
프로젝트의 모든 기능을 사용하고 싶다면
Spring Cloud Stream가 아닌 개별적으로 제공되는 java client 라이브러리 사용을 권장
각종 메세지, 이벤트 시스템을 하나의 코드로 통합하기 위해 Spring Cloud Stream 에선 아래 3가지 추상화 개념을 사용한다.
-
Destination Binders
외부 메시징 시스템 종류 상관없이 통합을 담당하는 구성 요소 -
Bindings 메시지의 생산자(Output Binding) 와 소비자(Input Binding)
-
Message 생산자와 소비자가 사용하는 표준 데이터 구조


여기선 Destination Binders 중 RabbitMQ 를 사용해본다.
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'
단일 바인딩 - Account Service
Account service 는 Order service 와만 Message 통신하고
Order service 는 Account service, Product Service 두개 서비스와 Message 통신한다.
먼저 하나의 서비스와만 단일 바인딩 하여 통신하는 Account service 에 대해 구현한다.
아래와 같이 @EnableBinding 어노테이션을 통해 Message System 사용을 명시
@EnableDiscoveryClient
@SpringBootApplication
@EnableBinding(Processor.class)
public class AccountApplication {
public static void main(String[] args) {
SpringApplication.run(AccountApplication.class, args);
}
}
아래 3가지 인터페이스를 제공한다.
Sink- 메세지inbound채널을 생성하고 메세지 수신 서비스를 표시하는데 사용Source- 메세지outbound채널을 생성하고 메시지 송신 처리Processor- 메세지 송/수신 모두 필요한 경우 사용
메세지 처리 여부에 따라 원하는 인터페이스를 지정하면 된다.
대부분의 서비스가 송/수신을 모두 처리하기에 Processor 가 주로 쓰인다.
단일 바인딩에서 메세시 송신을 위한 컴포넌트는 아래와 같이 구현한다.
우리는 위에서 Processor 를 사용하기로 하였기에 Processor 를 주입받는다.
@Component
@RequiredArgsConstructor
public class AccountMessageSender {
private final Processor processor;
public boolean send(String payload) {
Message<String> message = MessageBuilder.withPayload(payload).build();
return processor.output().send(message);
}
}
Processor 는 Spring Cloud 의 메세지 송/수신을 위한 인터페이스로 아래와 같다.
기본적으로 output input 이름의 channel 을 생성해서 각각 송신, 수신시에 사용하게된다.
package org.springframework.cloud.stream.messaging;
public interface Processor extends Source, Sink {
}
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
반대로 생성된 메세지 수신 을 위한 listener 코드를 구성해보자.
@Slf4j
@Component
@RequiredArgsConstructor
public class AccountMessageHandler {
private final ObjectMapper mapper;
private final AccountService accountService;
@StreamListener(Processor.INPUT)
public void receiveOrder(Order order) throws JsonProcessingException {
log.info("Order received: {}", mapper.writeValueAsString(order));
Account account = accountService.findById(order.getAccountId());
log.info("Account found: {}", mapper.writeValueAsString(account));
order.setStatus(OrderStatus.ACCEPTED);
}
}
@StreamListener 어노테이션으로 핸들링 처리하고 @SendTo 어노테이션으로 다시 바인딩으로 메세지를 전달 가능하다.
기본적으로 rabbitmq 연결을 위한 설정과 기본 바인딩인 output 과 input 에 대한 이름설정을 해주어야 한다.
spring.application.name=account-service
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# for binding
spring.cloud.stream.bindings.output.destination=from-account
spring.cloud.stream.bindings.input.destination=to-account
# for rabbitmq custom
# default topic
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=direct
# default '#'
spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=to-account
Account Service 를 실행해보면 아래와 같이 Exchange 와 Queue 가 생성된다.


rabbitmq 를 Destination Binders 로 사용할 경우 기본 exchange topic 이다.
그래서 binding-routing-key 역시 모든 메세지를 수신할 수 있는 # 을 사용하는데
exchange direct 는 # 같은 와일드카드를 사용하지 않고 정확히 일치한 routing key 의 경우에만 메세지를 라우팅하기 때문에 binding-routing-key 를 별도 문자열로 지정해줘야 한다.
exchange direct 메세지 송신시 별도의 설정을 하지 않으면 destination 이름을 사용기에 to-account 로 설정했다.
멀티 바인딩 - Order Service
Order service 의 경우 Account Service 와 Product Service, 2개의 서비스와 연결되어야 함으로 단순 Producer 사용은 불가능하다.
아래와 같이 별개의 Producer(Sink, Source) 바인딩이 구현될 수 있도록 @Input, @Output 어노테이션을 사용한다.
// AccountProducer.java
public interface AccountProducer {
// account -> order
String INPUT = "to-order-from-account";
// order -> account
String OUTPUT = "from-order-to-account";
@Input(INPUT)
SubscribableChannel subscribableChannel();
@Output(OUTPUT)
MessageChannel messageChannel();
}
// ProductProducer.java
public interface ProductProducer {
// product -> order
String INPUT = "to-order-from-product";
// order -> product
String OUTPUT = "from-order-to-product";
@Input(INPUT)
SubscribableChannel subscribableChannel();
@Output(OUTPUT)
MessageChannel messageChannel();
}
더이상 Producer 인터페이스를 사용하지 않고 별도의 커스텀 클래스를 통해 바인인처리 됨으로
main 의 @EnableBinding 어노테이션 속성도 변경되어야 한다.
@EnableDiscoveryClient
@SpringBootApplication
@EnableBinding({AccountProducer.class, ProductProducer.class})
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
메세지 송신을 위한 컴포넌트는 아래와 같다.
@Service
public class OrderMessageSender {
@Autowired
@Qualifier(AccountProducer.OUTPUT)
private MessageChannel accountMessageChannel;
@Autowired
@Qualifier(ProductProducer.OUTPUT)
private MessageChannel productMessageChannel;
public boolean sendToAccount(String payload) {
Message<String> msg = MessageBuilder.withPayload(payload).build();
return accountMessageChannel.send(msg);
}
public boolean sendToProduct(String payload) {
Message<String> msg = MessageBuilder.withPayload(payload).build();
return productMessageChannel.send(msg);
}
}
메세지 수신을 위한 컴포넌트는 아래와 같다.
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderMessageHandler {
private final ObjectMapper mapper;
@StreamListener(ProductProducer.INPUT)
public void receiveProductOrder(Order order) throws JsonProcessingException {
log.info("Order receiveProductOrder: {}", mapper.writeValueAsString(order));
}
@StreamListener(AccountProducer.INPUT)
public void receiveAccountOrder(Order order) throws JsonProcessingException {
log.info("Order receiveAccountOrder: {}", mapper.writeValueAsString(order));
}
}
Account Service 와 마찬가지로 생성된 바인딩에 대한 설정을 진행
spring.application.name=order-service
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# for account service binding
spring.cloud.stream.bindings.to-order-from-account.destination=from-account
spring.cloud.stream.bindings.from-order-to-account.destination=to-account
# for product service binding
spring.cloud.stream.bindings.to-order-from-product.destination=from-product
spring.cloud.stream.bindings.from-order-to-product.destination=to-product
# for rabbitmq custom
spring.cloud.stream.rabbit.bindings.to-order-from-product.consumer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.from-order-to-product.producer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.to-order-from-account.consumer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.from-order-to-account.producer.exchange-type=direct
# default '#'
spring.cloud.stream.rabbit.bindings.to-order-from-product.consumer.binding-routing-key=from-product
spring.cloud.stream.rabbit.bindings.to-order-from-account.consumer.binding-routing-key=from-account
실제 direct exchange 가 생성된것을 확인

product-service 와 account-service 역시 order-service 에게 destination 이름으로 메세지를 보내기에 binding-routing-key 를 설정해야 한다.
Consumer Group
다수의 서비스와 다수의 인스턴스를 운영하려면 메세지 중복처리를 설계해야 한다.
대부분의 Message Broker 가 메세지 로드벨런싱 기능을 지원하며 rabbitmq 의 경우 하나의 Queue 에 클라이언트들이 연결되면 된다.
Spring Cloud 에선 이를 추상화하여 Consumer Group 기능을 제공한다.
Consumer Group 설정없이 Product Service 를 2개 실행시키면 아래와 같이 2개의 Queue 가 생성된다.

두개의 Product Service 가 모두 메세지를 받게된다.
Product Service 에 Concumer Group 설정
# for binding
spring.cloud.stream.bindings.output.destination=from-product
spring.cloud.stream.bindings.input.destination=to-product
# set consumer group
spring.cloud.stream.bindings.input.group=my-product-cg
# for rabbitmq custom
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=direct
# default '#'
spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=to-product

기타 메세지 기능
Poller
Poller 매초에 하나씩 메세지를 지속석으로 보낼 수 있다.
MessageSource 라는 빈 객체를 생성해야 하며 fixedDelay 밀리초에 메세지를 전송한다.
@Bean
@InboundChannelAdapter(value = AccountProducer.OUTPUT, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1"))
// org.springframework.integration.core.MessageSource
public MessageSource orderSource() {
log.info("orderSource invoked");
return new MessageSource() {
@Override
public Message receive() {
String result = "";
Order order = Order.builder()
.status(OrderStatus.NEW)
.accountId((long) random.nextInt(3))
.customerId((long) random.nextInt(3))
.productIds(Collections.singletonList((long) random.nextInt(3)))
.build();
try {
result = mapper.writeValueAsString(order);
} catch (JsonProcessingException e) {
log.error(e.getMessage());
}
return new GenericMessage(result);
}
};
}
@Transformer
@StreamListener 어노테이션으로 바인딩으로부터 메세지를 핸들링하고
@SendTo 어노테이션을 통해 바인딩으로 메세지 반환값을 전달한다.
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Order receiveOrder(Order order) throws JsonProcessingException {
...
...
return order;
}
@Transformer 어노테이션으로도 대체 가능하다.
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Order receiveOrder(Order order) throws JsonProcessingException {
...
...
return order;
}
condition
Order Service 의 경우 멀티바인딩을 통해 2개의 exchange 를 만들어 메세지 처리를 진행했다.
@StreamListener(ProductProducer.INPUT)
public void receiveProductOrder(Order order) {
...
}
@StreamListener(AccountProducer.INPUT)
public void receiveAccountOrder(Order order) {
...
}
@StreamListener 의 condition 속성을 사용하면
하나의 Sink 객체만 Bean 으로 등록해서 사용할 수 있다.
@StreamListener(value = Order.INPUT, condition = "headers['processor']=='product'")
public void receiveProductOrder(Order order) throws JsonProcessingException {
...
}
@StreamListener(value = Order.INPUT, condition = "headers['processor']=='account'")
public void receiveAccountOrder(Order order) throws JsonProcessingException {
...
}
Spring Cloud Stream 3.x
공식 데모: https://github.com/spring-cloud/spring-cloud-stream-samples
@EnableBinding, @Input, @Out, @StreamListener, @StreamMessageConverter 등이 모두 deprecated 되었다.
2023 년 기준
Spring Cloud Stream은 최신버전은4.0.2
기존의 Destination Binders Bindings Message 개념, Sink Source Processor 개념은 동일하지만
spring-cloud-stream-reactive 가 합쳐지면서 함수형 프로그래밍 방식으로 바인딩 처리를 진행하도록 변경되었다.
아무런 설정 없이 Spring Cloud 의존성만 설정하고 @Bean 으로 함수형 객체를 등록하면 바인딩처리가 완료된다.
@Bean name속성을 별도로 설정하지 않을경우method name이 사용된다.
@Bean(name = "account-producer")
public Function<String, Order> inputAndOutput() {
return new Function<String, Order>() {
@Override
public Order apply(String msg) {
log.info("received msg:{}", msg);
try {
Order order = mapper.readValue(msg, Order.class);
log.info("Order received: {}", (order));
Account account = accountService.findById(order.getAccountId());
log.info("Account found: {}", mapper.writeValueAsString(account));
order.setStatus(OrderStatus.ACCEPTED);
return order;
} catch (Exception e) {
log.error("input error invoked, error type:{}, msg:{}", e.getClass().getSimpleName(), e.getMessage());
return null;
}
}
};
}

바인딩 이름 규칙은 아래와 같다.
<functionName> + -in- + <index><functionName> + -out- + <index>
해당 바인딩의 rabbitmq 커스터마이징을 하고싶다면 기존 방식대로 진행하면 된다.
spring.application.name=account-service
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# for binding
spring.cloud.stream.bindings.account-producer-in-0.destination=to-account
spring.cloud.stream.bindings.account-producer-out-0.destination=from-account
spring.cloud.stream.bindings.account-producer-in-0.group=my-account-cg
spring.cloud.stream.bindings.account-producer-in-0.binder=rabbit
## for rabbitmq custom
spring.cloud.stream.rabbit.bindings.account-producer-in-0.consumer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.account-producer-out-0.producer.exchange-type=direct
#spring.cloud.stream.rabbit.bindings.account-sink-0.consumer.exchange-type=direct
## default '#'
spring.cloud.stream.rabbit.bindings.account-producer-in-0.consumer.binding-routing-key=to-account
기본적으로 생성되는 account-producer-in-0, account-producer-out-0 형식의 바인딩이름이 마음에 안들 수 있다.
3.x 이전에는 @Input, @Output 어노테이션으로 바인딩 이름을 지정했었는데
3.x 이후부턴 properties 를 통해 바인딩 이름을 지정할 수 있다.
spring.cloud.stream.function.bindings 설정 참고
가독성을 좋아질 수 있지만 변경으로 인해 에러를 유발할 수 있음으로 사용하지 않기를 권장한다
# for naming binding
spring.cloud.stream.function.bindings.account-producer-in-0=to-account
spring.cloud.stream.function.bindings.account-producer-out-0=from-account
# for binding
spring.cloud.stream.bindings.to-account.destination=to-account
spring.cloud.stream.bindings.from-account.destination=from-account
spring.cloud.stream.bindings.to-account.group=my-account-cg
spring.cloud.stream.bindings.to-account.binder=rabbit
## for rabbitmq custom
spring.cloud.stream.rabbit.bindings.to-account.consumer.exchange-type=direct
spring.cloud.stream.rabbit.bindings.frmo-account.producer.exchange-type=direct
#spring.cloud.stream.rabbit.bindings.account-sink-0.consumer.exchange-type=direct
## default '#'
spring.cloud.stream.rabbit.bindings.to-account.consumer.binding-routing-key=to-account
Sink 생성을 원한다면 Consumer 함수객체를,
Source 생성을 원한다면 Supplier 함수객체를 Bean 으로 등록하면 된다.
@Bean(name = "account-sink")
public Consumer<String> input() {
return new Consumer<String>() {
@Override
public void accept(String msg) {
System.out.println("Received: " + msg);
}
};
}
멀티 바인딩
두개 이상의 함수형 객체를 Bean 으로 등록해서 바인딩으로 사용하고 싶다면 아래와 같이 ; 구분자와 Function Name 을 spring.cloud.function.definition 에 지정해줘야 한다.
@Bean
public Consumer<Order> fromProduct() {
return order -> log.info("Order fromProduct: {}", order);
}
@Bean
public Consumer<Order> fromAccount() {
return order -> log.info("Order fromAccount: {}", order);
}
# function def for binding
spring.cloud.function.definition=fromAccount;fromProduct
# for account service binding
spring.cloud.stream.bindings.fromAccount-in-0.destination=from-account
# for product service binding
spring.cloud.stream.bindings.fromProduct-in-0.destination=from-product
Functional Composition
spring.cloud.function.definition=toUpperCase|wrapInQuotes;
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}

생성된 바인딩명이 난해하다.
여기서 바인딩명을 명시하는 spring.cloud.stream.function.bindings 속성이 가독성에 도움될 수 있다.
spring.cloud.function.definition=toUpperCase|wrapInQuotes;account-producer
spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=upperAndWrapIn
spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-out-0=upperAndWrapOut
메세지 송신
Spring Cloud Stream 3.x 에서 메세지 송신만 하는 방법
StreamBridge
StreamBridge 프레임워크를 사용해 직접 메세지를 전송할 수 있다.
먼저 생성하지 못한 Output 용 바인딩을 생성해야 한다.
함수형 메서드를 Bean 으로 등록해서 바인딩을 생성하는게 아니기 때문에 spring.cloud.stream.source 속성으로 직접 명시해줘야 한다.
spring.cloud.stream.source=toAccount;toProduct
spring.cloud.stream.bindings.toAccount-out-0.destination=to-account
spring.cloud.stream.bindings.toProduct-out-0.destination=to-product
생성한 바인딩 이름을 기반으로 send 메서드를 호출한다.
@Service
public class OrderMessageSender {
@Autowired
private StreamBridge streamBridge;
public boolean sendToAccount(String payload) {
Message<String> msg = MessageBuilder
.withPayload(payload)
.build();
return streamBridge.send("toAccount-out-0", msg);
}
public boolean sendToProduct(String payload) {
Message<String> msg = MessageBuilder.withPayload(payload).build();
return streamBridge.send("toProduct-out-0", msg);
}
@Bean
@GlobalChannelInterceptor(patterns = "toProduct-*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
}
@GlobalChannelInterceptor 어노테이션으로 ChannelInterceptor 을 Bean 으로 등록하면
send 하기전에 Message 인터셉터 처리가 가능하다.
poller - Supplier
지속적인 poller 형식의 메세지 전송의 경우 Supplier 를 사용한다.
Supplier 객체를 바인딩으로 등록하면 get() 메서드가 1초에 한번씩 호출되고 문자열이 전송된다.
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
설정을 바꾸고 싶다면 spring.cloud.stream.poller 속성을 통해 default 값을 변경하면 된다.
- fixedDelay: Fixed delay for default poller in milliseconds. Default:
1000L - maxMessagesPerPoll: Maximum messages for each polling event of the default poller. Default:
1L - cron: Cron expression value for the Cron Trigger. Default:
none - initialDelay: Initial delay for periodic triggers. Default:
0 - timeUnit: The TimeUnit to apply to delay values. Default:
MILLISECONDS
spring.cloud.stream.poller.fixed-delay=2000
Supplier 의 경우 데이터 원본을 가지고 있는 서비스가 직접 호출하는 것이기 때문에 데이터 원본관리와 트리거 시점을 정해야 한다.
다중 입력, 다중 출력
지금까지 바인딩 이름 뒤에 붙는 <index> 는 Spring Cloud Stream 3.x 이상부터 제공하는 다중 입력, 다중 출력 지원을 위한 것으로
Project Reactor(Flux, Mono) 에서 제공하는 추상화에 의존하고 있다.
spring.cloud.function.definition=gather;scatter
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return new Function<>() {
@Override
public Flux<String> apply(Tuple2<Flux<String>, Flux<Integer>> tuple) {
Flux<String> stringStream = tuple.getT1()
.doOnNext(str -> log.info("first flux:{}", str));
Flux<String> intStream = tuple.getT2()
.doOnNext(num -> log.info("second flux:{}", num))
.map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
}
};
}
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return new Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>>() {
@Override
public Tuple2<Flux<String>, Flux<String>> apply(Flux<Integer> integerFlux) {
Flux<Integer> connectedFlux = integerFlux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux
.filter(number -> number % 2 == 0)
.doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux
.filter(number -> number % 2 != 0)
.doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(
Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()),
Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe())
);
}
};
}
위 함수메서드가 Bean 으로 등록되면 아래 그림과 같은 입출력 바인딩이 생성된다.

다중 입력, 다중 출력이라고 크게 다를건 없다.
gather 의 경우 두개의 입력 스트림을 한개의 출력 스트림으로 그대로 전송할 뿐이고
scatter 의 경우 한개의 입력 스트림을 두개의 출력 스트림으로 나눠 전송한다.