해당 튜토리얼에서 사용하는 `route_guide.proto` 파일의 데이터형식과 서비스코드는 아래와 같다.
**전송을 위핸 객체**
- `Point`: 위경도객체 - `Feature`: 지역객체(Point + name) - `Rectangle`: 사각형객체(Point + Point) - `FeatureDatabase`: 지역 데이터베이스객체(리스트) - `RouteNote`: Point 설명 객체(Point + message) - `RouteSummary`: 경로객체(수신받은 포인트, 해당되는 지역, 지역간 거리합)
**전송시 사용되는 서비스**
```js service RouteGuide { // Point 에 해당하는 Feature 반환 rpc GetFeature(Point) returns (Feature) {} // Rectangle 안의 Feature List 반환, server side streaming rpc ListFeatures(Rectangle) returns (stream Feature) {} // Point List 로 생성한 RouteSummary 반환, client side streaming rpc RecordRoute(stream Point) returns (RouteSummary) {} // bi streaming rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} }
한번에 여러개를 보내는것이 아닌 스트림 형식으로 데이터를 계속 보내기 때문에 server side streaming, client side streaming 이라는 단어를 사용한다.
for (Feature feature : features) { if (!RouteGuideUtil.exists(feature)) { continue; }
intlat= feature.getLocation().getLatitude(); intlon= feature.getLocation().getLongitude(); // 해당 Rectangle 범위안에 부합하는 Feature 모두 전달 if (lon >= left && lon <= right && lat >= bottom && lat <= top) { responseObserver.onNext(feature); } } responseObserver.onCompleted(); }
return 이 별도로 존재하지 않고 호출자에게 반환값을 전달할 때 StreamObserver 객체를 사용한다. onNext 로 값을 저장하고 onCompleted 를 호출하면 rpc 가 종료되는 구조이다.
이번엔 client side streaming 형식으로 데이터를 보내는 recordRoute, routeChat 서비스를 구현해보자.
// rpc RecordRoute(stream Point) returns (RouteSummary) {} @Override public StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) { returnnewStreamObserver<RouteSummary>() { int pointCount; int featureCount; int distance; Point previous; longstartTime= System.nanoTime();
@Override publicvoidonNext(Point point) { pointCount++; if (RouteGuideUtil.exists(checkFeature(point))) featureCount++; // For each point after the first, add the incremental distance from the previous point // to the total distance value. if (previous != null) distance += calcDistance(previous, point); previous = point; }
@Override publicvoidonError(Throwable t) { log.warn("Encountered error in recordRoute", t); }
@Override publicvoidonCompleted() { longseconds= NANOSECONDS.toSeconds(System.nanoTime() - startTime); responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount) .setFeatureCount(featureCount).setDistance(distance) .setElapsedTime((int) seconds).build()); responseObserver.onCompleted(); } }; } // rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} @Override public StreamObserver<RouteNote> routeChat(StreamObserver<RouteNote> responseObserver) { returnnewStreamObserver<RouteNote>() { @Override publicvoidonNext(RouteNote note) { List<RouteNote> notes = getOrCreateNotes(note.getLocation()); // Respond with all previous notes at this location. for (RouteNote prevNote : notes.toArray(newRouteNote[0])) { responseObserver.onNext(prevNote); } // Now add the new note to the list notes.add(note); }
스프링 부트를 통해 grpc 서버를 실행하다 보니 튜토리얼 코드와 다르다. 자세한 내용은 데모코드 참고
grpc 스텁종류
클라이언트 코드를 작성하기 전에 grpc 의 서비스를 호출하기 위한 스텁을 알아야한다. grpc 클라이언트는 서비스를 호출할 때 스텁을 사용하는데 3가지 종류가 있다.
비동기 스텁: 끝나면 StreamObserver 에 콜백하는 스텁
블록킹 스텁: 응답이 올때까지 기다리는 스텁
퓨처 스텁: GrpcFuture<T>을 반환하는 스텁
그리고 4개의 통신방식을 지원한다.
unary (1개 request , 1개 respone)
server stream (1개 request, n개 response)
client stream (n개 request, 1개 response)
bi stream (n개 request, n개 response)
스텁종류와 통신방식에 따라 12가지 호출방식을 지원해야 하지만 몇몇 통신방식에서 특정 스텁을 지원하지 않는경우가 있으니 주의
unary
server stream
client stream
bi stream
asyn
o
o
o
o
blocking
o
o
x
x
future
o
x
x
x
클라이언트 - RouteGuideClient 작성
스텁 사용도 마찬가지로 자동으로 생성된 코드를 사용한다.
route_guide.proto 로 인해 생성된 코드를 보면 위에서 설명한 3가지 스텁별로 클래스가 생성된것을 확인 가능하다.
asyn: RouteGuideStub
blocking: RouteGuideBlockingStub
future: RouteGuideFutureStub
내부를 살펴보면 통신방식 지원별로 메서드가 생성되어 있다.
GetFeature, ListFeatures 는 RouteGuideBlockingStub 클래스를 사용하고 RecordRoute, RouteChat 는 RouteGuideStub 클래스를 사용하여 구성한다.
blocking 스텁을 사용하는 GetFeature, ListFeatures 메서드는 코드가 간결하다. return 문이 정의되어 있음으로 받아서 사용하면 된다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
public Feature getFeature(int lat, int lon) { Pointrequest= Point.newBuilder().setLatitude(lat).setLongitude(lon).build(); Featurefeature= blockingStub.getFeature(request); return feature; }
public Iterator<Feature> listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) { Rectanglerequest= Rectangle.newBuilder() .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build()) .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()) .build(); Iterator<Feature> features = blockingStub.listFeatures(request); return features; }
반면 RecordRoute, RouteChat 함수는 client stream, bi stream 이다 보니 async 스텁을 사용해야 한다.
단순 요청, 응답 형식의 API 가 아니고 stream 통로를 통해 데이터를 주고 받다보니 return 문이 없고 요청, 응답에 사용할 핸들러 객체 StreamObserver 를 사전 정의하고 호출해야 한다.
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver); for (Point point : pointList) { requestObserver.onNext(point); Thread.sleep(1500); } // Mark the end of requests requestObserver.onCompleted(); }