[Server] grpc stream 서버 및 webRTC 서버 구축하기

2023. 12. 30. 22:11Developers 공간 [Basic]/Backend

728x90
반응형

어플리케이션을 설계하다보면 Client간의 P2P(Peer To Peer) 연결이 필요할 수도있고, 서버와의 streaming 연결이 필요할 수도 있는 등 다양한 Communication이 필요합니다.

 

이럴때 최근 많이 사용되는 Communication Protocol에는 HTTP, WebSocket, gRPC, WebRTC 등이 있고, 이중에 gRPCWebRTC를 위한 서버를 이번 글에서 구축해보고자 합니다.

 

개념을 먼저 살펴 본 뒤에, 직접 gRPC를 활용한 streaming서버를 구축해보고, WebRTC를 활용하기 위한 서버를 직접 구현해보고자 합니다.

<구성>
1.  개념정리 
    a. gRPC란?
    b. WebRTC란?
2. gRPC를 활용한 서버 구축
    a. 환경 구축  
    b. example 실행해보기
    c. stream 서버 만들기
3. WebRTC를 활용한 서버 구축
    a. 환경 구축
    b. example을 통한 서버 구현

글효과 분류1 : 코드

글효과 분류2 : 폴더/파일

글효과 분류3 : 용어설명

글효과 분류4 : 글 내 참조

글효과 분류5 : 글 내 참조2

글효과 분류6 : 글 내 참조3


1.  개념정리

앞서 많이 사용되는 Communication Protocol에는 HTTP, WebSocket, gRPC, WebRTC 등이 있다고 말했습니다. 간단한게 이런 프로토콜에 대해 정리하면 아래와 같습니다.

  • HTTP API : HTTP라는 Server-Client 프로토콜을 활용한 API이며, 이 중 REST API는 단순히 네트워크 자원을 어떻게 처리할지에 대한 방법을 정리해놓은 스타일이자 HTTP의 우수성을 최대한 활용하기 위한 방법입니다.
    기본적으로 비연결성의 특징을 가지고 있으므로, 실시간 통신을 하기 위해 Polling 혹은 Streaming 방식으로 대체가 가능하긴합니다. 프로토콜에는 HTTP와 HTTPS(데이터 보안을 위해 SSL을 적용)가 있습니다.
    ** REST(Representational State Transfer) : URI로 자원을 관리하며, Method는 네가지(GET, POST, PUT, DELETE)로 표현합니다.
  • WebSocket : TCP연결을 통해 전이중 양방향 통신을 하는 프로토콜입니다. 최초 Handshaking시 HTTP 프로토콜을 활용해 Server와 Client간의 상태를 저장함으로써 connection을 유지하고, 이후에는 WebSocket 프로토콜(TCP/IP기반)을 통해 양방향 통신을 합니다.
    ** 반이중 (Half-Duplex) : 양방향 통신이지만, 송수신을 동시에 하지 않고 무전기와 같이 합니다.
    ** 전이중(Full-Duplex) : 동시에 송수신이 가능합니다.
    예를 들어 streaming 중계나 게임 등 서버와의 실시간 정보교환이 필요할 때 주로 사용하며, 대부분의 브라우저들이 제공합니다. 프로토콜에는 WS와 WSS(데이터 보안을 위해 SSL을 적용)가 있습니다.

[WebSocket 통신의 과정]

  • gRPC : Protocol BufferHTTP/2를 사용한 RPC 프로토콜이자 프레임워크입니다. RestAPI가 Json 데이터 교환 형식을 따르는 것과 다르게 Protocol Buffer 형식을 활용하며, 하나의 TCP연결이 여러개의 양방향 스트리밍을 지원하기 때문에 고성능 또는 데이터 사용량이 많은 마이크로서비스 아키텍처에서 활용합니다.
  • WebRTC : Web Browser 및 Application 간의 실시간 통신이 가능하게 하는 프레임워크이며, 다양한 프로토콜로 구성되어있습니다. 실시간 음성, 영상 및 데이터 통신을 제공하며, 브라우저에서 플러그인이나 어플리케이션 설치 없이 다른 사용자와 실시간 연결하여 채팅, 화상 회의, 파일 공유 등의 서비스를 제공할 수 있습니다
더보기

---------------------------------------------------

<네트워크 OSI 7계층이란>

OSI(Open Systems Interconnection Reference) Model은 국제표준화기구(ISO)에서 개발한 모델로, 일반적으로 OSI 7 계층이라고 한다.

[OSI 7계층]

7. 응용(Application) : 프로세스와 관계하여 일반적인 서비스를 수행합니다.

6. 표현(Presentation) : 데이터 표현이 상이한 프로세스들의 독립성을 제공하고 암호화합니다.

5. 세션(Session) : 통신을 위한 논리적인 연결을 생성합니다. 

4. 전송(Transport) : Packet을 생성하여 효율적인 데이터를 전송합니다.

3. 네트워크(Network) : IP주소를 가지고 데이터를 목적지까지 안전하게 전달합니다. ex) 라우터, 스위치

2. 데이터 링크(Data Link) : MAC주소를 가지고 릴레이, 에러검출 등을 합니다. ex) 브릿지, 스위치

1. 물리(Physical) : 물리적 특성을 통해 통신케이블로 데이터를 전송합니다. ex) 통신 케이블, 리피터, 허브

 

다음은 계층 별 프로토콜의 종류의 예시입니다. 주로 많이 알려진 프로토콜은 빨간색으로 표시했습니다.

 

7. 응용(Application) : HTTP, SMTP, FTP, Telnet, POP3, IMAP, WebSocket, SRTP, SCTP

6. 표현(Presentation) : ASCII, MPEG, JPEG, MIDI

5. 세션(Session) : NetBIOS, SAP, SDP, NWLink, TLS, DTLS

4. 전송(Transport) : TCPUDP, SPX, WebSocket (+ ICE,STUN,TURN)

3. 네트워크(Network) : IP, IPX

2. 데이터 링크(Data Link) : Ethernet, Token Ring, FDDI, AppleTalk

1. 물리(Physical) : X

 

아래는 앞으로 설명할 WebRTC의 프로토콜 스택 입니다.

[WebRTC 프로토콜 스택]

---------------------------------------------------

 

gRPCWebRTC의 자세한 개념에 대해서는 아래에서 더 자세히 살펴보겠습니다.


a. gRPC란?

 

gRPC(google RPC)란 google에서 2016년에 발표한 오픈소스 RPC 프레임워크입니다. Protocol Buffer를 인터페이스 정의 언어로 사용하여 HTTP/2.0으로 통신하고, 이외에도 양방향 스트리밍 제어 & 차단 & 타임아웃 등의 기능을 제공합니다. 클라이언트와 서버와의 통신을 단순화하기 때문에, 마이크로서비스나 분산시스템 등에서 자주 사용합니다.

 

그럼 RPC(RemoteProcedure Call)란 무엇일까요? 원격 제어를 위해서 코딩 없이 다른 주소공간에서 함수와 프로시저를 실행하는 IPC(Inter-Process Communication) 기술입니다.

즉, 일반적으로 Process는 "자신의 주소공간 안에 존재하는 함수"만 호출해 실행이 가능한데, RPC가 "다른 Process의 함수"를 실행할 수 있도록 해주는 것입니다. 

이는 Client와 Server간 communication시에 필요한 정보는 최대한 감추고, 언어와 환경에 구애받지 않고 remote의 메소드를 호출할 수 있습니다. 대표적으로, Google의 ProtocolBuffer, Facebook의 Thrift, 트위터의 Finalge등이 있습니다. 

더보기

-----------------------------------------------------------------

<RPC의 동작 방식>

 

위에서 설명한 RPC가 구체적으로 어떻게 동작하는지 가볍게 살펴보도록 합니다.

[RPC의 기본 동작 구조]

0. 서버의 호출 규약을 정의해놓은 IDL를 사용해 function의 name, arguments, return을 저장하고, RPCGen컴파일러를 이용해 Stub코드(C,Java 등의 원시코드 형태)를 생성하고 Client와 Server에 모두 빌드합니다.

** IDL(Interface Definition Language) : C,C++, Java등과 같은 구현언어가 아닌 인터페이스 “정의 언어”로, 중립적으로 인터페이스를 지원하기 위한 호출 규약 언어입니다.

** Stub : remote 객체에 대한 method 호출을 해당 객체가 있는 서버로 전송하는 일을 담당하는 대리자입니다. 원하는 파라미터를 가진 객체를 메시지로로 Marshalling/Unmarshalling해줍니다.

** Marshalling/Unmarshalling : Serialization/Deserialization이 구조화된 객체를 byte stream으로 변환하는 과정이라면, Marshalling/Unmarshalling은 비슷하지만 다른 언어 혹은 다른 플랫폼 간의 데이터를 주고 받을 때 변환하는 과정입니다.

** Manage/Unmanaged Data : C# Compiler가 생성한 코드인 Managed Code와 다르게 C/C++의 pointer로 구현된 코드들을 Unmanaged code라고 하는데, Unmanaged code에서 작업해야하는 경우 기존의 managed App 도메인의 데이터를 unmanaged 영역으로 Marshalling해야합니다. 

 

1. 일반적인 함수 호출처럼 실행하면 Stub은 파라미터를 가진 객체를 가지고 XDR 형식으로 변환해 메시지로 넣어줍니다.

** XDR (External Data Representation) : 컴퓨터별로 메모리 저장방식(Little Endian, Big-Endian)과 같은 x86,arm등 CPU아키텍쳐 벌로 다른 부분을 맞춰 byte 전송 순서를 네트워크 전송과정에서 보장하기 위해 사용하는 데이터형입니다.

 

2. Transport Layer는 패킷을 Server Stub으로 전달해주면, 이를 해석해 기존에 정의된 함수를 호출하고 결과를 위와 같이 메시지로 넣어 Transport Layer로 반환해줍니다.

 

3. Client 프로그램은 서버의 결과값을 받습니다.

-----------------------------------------------------------------

 

다음으로 gRPC는 IDL로 Protobuf를 활용하므로, Protobuf를 살펴보겠습니다.
** IDL 개념을 알기 위해서는 위 더보기를 참조하세요

 

Protobuf(Protocol Buffer)는 google에서 2008년 공개한, 객체 데이터 직렬화(Serialization) 방식입니다. 유선이나 데이터 저장을 목적으로 서로 통신할 프로그램을 개발할 때 유용하며, C++, C#, Go, Java, Python, Object C, Javascript, Ruby 등 다양한 언어를 지원합니다.

** 직렬화(Serialization) : 데이터를 저장하고 싶으면 당연히 Bit 형태로 혹은 Byte Stream으로 표현해야하는데, 주어진 데이터를 이렇게 표현하는 방식이 직렬화입니다. 단순히 문자열 Encoding을 통해 직렬화 할 수 있겠지만, 구조화된 형식으로 표현하는 방법이 있습니다.

 

다른 직렬화 방법은 아래와 같습니다.

  • JSON, CSV, XML : 텍스트기반으로, 사람이 읽을 수 있는 형태인 장점이 있지만, 파싱이 오래걸리고 효율이 떨어집니다.
  • BSON(Binary JSON) : JSON을 이진 형식으로 직렬화/역직렬화 하는 형식으로, MongoDB 등의 NoSQL DB에서 사용되며, JSON보다 빠르고 효율이 높습니다.
  • MessagePack : JSON과 유사한 이진 기반의 경량 직렬화/역직렬화 방식입니다.
  • Protocol Buffer 
  • Pickle : 파이썬 객체를 직렬화/역직렬화 하는데 사용되는 형식.

Protobuf가 어떻게 직렬화하는지 자세히 보시려면 아래 더보기를 참조하세요.

더보기

-----------------------------------------------------------------

<예시로 살펴보는 Protobuf의 특징>

 

아래와 같은 데이터가 있다고 생각합시다. 문자열 형태로 직렬화한다면 공백을 제외하고  82bytes가 될 것입니다.

{
    "userName":"Martin",
    "favoriteNumber":1337,
    "interests":["daydreaming", "hacking"]
}

 

이때, Protobuf를 활용한다면 아래 그림의 왼쪽과 같이 "1byte(field_tag+type)"형태로 저장합니다. 이때 각각의 field_tag에 대한 데이터는 따로 오른쪽과 같이 "Nbytes(length+data)"형태로 저장합니다. 즉, 결과적으로 type과 name에 대한 부분들을 공유해서 쓰도록 만들어 버린 것입니다.

[Protobuf의 사용 예시]

 

이렇게 Protobuf를 활용하면 당연히 통신 할 데이터가 줄어들며, byte순서 그대로 사용하기 때문에 parsing이 필요하지 않게 됩니다.

-----------------------------------------------------------------


b. WebRTC란?

 

WebRTC(Web Real-Time Communication)란 Google에서 2011년 오픈소스화한 프로젝트에서 기원해 표준화되었습니다. 이는 웹 어플리케이션과 사이트가 중간자 없이 실시간으로 음성,비디오, 영상 등의 데이터 통신을 P2P(Peer-to-Peer)로 가능하게하는 API및 프로토콜 입니다. 이를 활용하면 화상통화 등 고화질의 멀티미디어 정보를 주고 받을 수 있습니다.

 

기존에는 서버가 Communication을 위한 중개 서버 역할을 하면서 서버의 부하가 과중되거나, 성능이 떨어지는 문제가 있었습니다. 이와 다르게 WebRTC는 P2P연결을 사용하기 때문에 속도가 빠르며, 기본적으로 HTTPS를 강제하기 때문에 보안이 보장됩니다.

 

WebRTC는 크게 네 단계를 거쳐 이루어지며, 각각의 단계마다 사용하는 프로토콜이 다릅니다.

 

1. Signaling

  • SDP (Session Description Protocol.)

2. Connection

  • ICE (Interactive Connectivity Establishment)

3. Security

  • DTLS (Datagram Transport Layer Security)
  • SRTP (Secure Real-time Transport Protocol)

4. Communicating

  • RTP (Real-time transport Protocol)
  • RTCP (Real-time Transort Control Protocol)
  • SCTP (Stream Control Transmission Protocol)

 

이중에 서버와 관련된  1.Signaling2.Connection을 살펴보겠습니다.


 

1. Signaling

 

아래와 같은 상황을 가정해보겠습니다. 두 User가 통신을 하고 싶은데, 서로의 IP나 어떤 종류의 통신을 원하는지 등에 대한 정보를 알수가 없습니다. 따라서 이를 중재하는 서버가 Signaling 서버입니다. 

 

[Signaling Server]

 

그럼 이때 사용하는 SDP(Session Description Protocol)는 무엇일까요? Session의 Description을 담당하는 프로토콜로, 생성할 세션의 정보 타입을 정해진 {key=value} 형태로 전달하는 것을 의미합니다.

 

위 예에서, User1이 요청하는 offer에는 '어떤 종류의 통신인지', 'encoding은 어떤것인지', '세션에 속할 IP주소값은 어떤 것인지' 등을 전달하며, 이렇게 주어진 포맷에 맞추어 전달하면, 이후에 해상도, 형식, 코덱, 암호화 등의 공식화된 규격으로 음성, 데이터를 교환할 수 있습니다.

더보기

---------------------------------------------------------

<Session Description 내용>

Optional한 것은 * 및 회색으로 표현했습니다. 단, 일반적으로 꽤 사용되는 것들은 빨간색으로 표시해보았습니다.

 

Session description
         v=  (protocol version)
         o=  (originator and session identifier)
         s=  (session name)
         i=* (session information)
         u=* (URI of description)
         e=* (email address)
         p=* (phone number)
         c=* (connection information -- not required if included in all media)
         b=* (zero or more bandwidth information lines) One or more time descriptions ("t=" and "r=" lines; see below)
         z=* (time zone adjustments)
         k=* (encryption key)
         a=* (zero or more session attribute lines)  Zero or more media descriptions

Time description
         t=  (time the session is active)
         r=* (zero or more repeat times)

Media description, if present
         m=  (media name and transport address)
         i=* (media title)
         c=* (connection information -- optional if included at session level)
         b=* (zero or more bandwidth information lines)
         k=* (encryption key)
         a=* (zero or more media attribute lines)

---------------------------------------------------------


 

2. Connection

 

ICE (Interactive Connectivity Establishment)란 peer 간의 connection을 가능하게 하고 최적의 경로를 탐색하는 프레임워크 및 프로토콜입니다. 다양한 상황에서 이러한 작업들을 위해 어떤 서버를 사용하는지 살펴보겠습니다.

 

일반적으로 Peer Connection이 필요한 상황은 크게 세 가지라고 합니다. 

1. 동일한 라우터를 공유하는 상황

2. 서로 다른 라우터를 가지고 있는 상황

3. 어떤 이유에서건 P2P 통신이 불가능한 상황

 

각각의 상황에 대해 하나씩 살펴보겠습니다. 


1. 동일한 라우터를 공유하는 경우

 

상대방의 IP를 알기 쉬우니 연결하기 쉬울 것입니다.

 

2. 서로 다른 라우터를 가지고 있는 경우.

 

라우터에 연결되어있는 나 자신은 아래 그림과 같이 나자신의 private IP만 알지, public IP를 알기 어려울 것입니다. 이를 알아내기 위한 STUN서버가 필요합니다. 즉, STUN(Session Traversal Utilities for NAT)이란 라우터가 분리되어있기 때문에 이 서버를 통해 자신의 public IP를 알아낼 수 있는 프로토콜입니다.

** NAT(Network Address Translation) : IP 패킷의 TCP/UDP소스 및 목적지의 IP주소와 포트를 Router를 통해  private IP로 다시 기록해 네트워크트래픽을 주고 받기 위한 기술입니다. 즉, Private Network에 속한 여러개의 호스트가 하나의 공인 IP를 사용해 인터넷에 접속하기 위해 IP를 Translation하겠다는 뜻입니다. (공인 IP주소를 절약가능합니다)

 

아래 그림의 경우, User1은 STUN서버를 통해 자신의 public IP를 알아내야합니다.

[STUN Server]


3. P2P 통신 중 connection이 불가능해진 경우


연결이 완성되면 통신을 위해 P2P통신이 진행될 것입니다. 하지만 중간에 connection이 끊어지거나 불가피한 상황이 생긴경우 연결을 중재해줄 서버가 필요한데 이때 필요한 백업서버가 TURN 서버입니다. 

 

위에서 한번 언급한 NATCone NATSymmetric NAT으로 나뉩니다.

Cone NAT은 라우터에 연결된 어떤 내부망이 (public_IP_A, PORT_A)로 트래픽을 보내면, (public_IP_B, PORT_B)로 보내도 해당 내부망의 Destination인 (public_IP_A, PORT_A)(public_IP_B, PORT_B)는 항상 같습니다.

하지만 Symmetric NAT는 내부망 입장에서 Destination인 (public_IP_A, PORT_A)(public_IP_B, PORT_B)가 다릅니다. 게다가 패킷을 수신한적이 있는 외부 호스트만이 내부 호스트에게 패킷을 전송할 수 있습니다.

 

몇개의 라우터들은 Symmetric NAT를 채용하고 있는데, 이런 경우 위와 같은 특성때문에 Peer들이 서로 연결된 적 있는 연결만 허용을 하겠죠? 그래서 이것을 우회하기 위해 사용하는 것이 TURN(Traversal Using Relays around NAT)입니다.

 

아래 그림에서 User1은 자신의 private IP를 포함해 TURN서버에 패킷을 보내면, 서버는 각각의 모든 peer들에게 해당 패킷을 포워딩해 Symmetric NAT를 우회할 수 있습니다. 

 

[TURN Server]
[Total Process]


2. gRPC를 활용한 서버 구축

 

그럼 먼저 grpc 서버를 구축해보겠습니다. python으로 구현할 예정이며, google에서 제공하는 gRPC 프레임워크(https://github.com/grpc/grpc)를 활용해서 구현하겠습니다. 


a. 환경 구축

 

먼저 python에서 필요한 패키지를 아래와 같이 설치해줍니다.

pip3 install grpcio grpcio-tools google google-api-core

 

설치 중에 아래와 같은 에러가 난다면 protocol buffer가 설치가 안되었거나, 버전이 맞지 않기 때문이므로 아래와 같이 설치혹은 재설치 해줍니다.

ImportError: cannot import name 'builder' from 'google.protobuf.internal'
pip3 install protobuf==4.21
pip3 install --force-reinstall "protobuf<4.0"

 

예시로 보일 stream 서버에 부가적으로 필요한 패키지들도 설치해봅니다.

pip3 install dataclasses pydub
apt-get install ffmpeg

b. example 실행해보기

 

본격적으로 stream 서버를 구현하기에 앞서, 이미 만들어져 있는 예시를 실행해보겠습니다. 공식 사이트에서 제공하는 예시(https://github.com/grpc/grpc/tree/master/examples/python/helloworld)를 실행해보고자 합니다. 관련 튜토리얼 링크(https://grpc.io/docs/languages/python/basics/)에서도 방법이 나와있습니다.

 

위 언급한 github 레포지토리 링크에서 greeter_server.py, greeter_client.py 두 파일을 사용하겠습니다.

또한 protos가 모여있는 위치(https://github.com/grpc/grpc/tree/master/examples/protos)에서 helloworld.proto 파일도 가져 오겠습니다.

 

자 이제 세개의 파일을 포함해 우리가 작업할 프로젝트 트리는 아래와 같습니다. 위 greeter_server.pyserver.py라는 파일로 작업할 것이고, greeter_client.pyclient.py라는 파일로 작업할 계획입니다. 

 

MyProject

├── start_script.sh
── protos
      ├── helloworld.proto
      └── my_protobuf.proto

└── test

      ├── helloworld_pb2.py, helloworld_pb2_grpc.py (will-be-made)

      ├──  my_protobuf_pb2.py, my_protobuf_pb2_grpc.py (will-be-made)

      ├── client.py
      └── server.py

 

먼저 server.py를 살펴보겠습니다. grpc 패키지에 있는 서버를 선언한뒤, 어떤 helloworld_pb2_grpc.py라는 파일의 어떤 GreeterServicer 클래스를 상속해 만들고, grpc 서버에 추가해주는 것으로 보입니다.

from concurrent import futures
import logging

import grpc
import helloworld_pb2
import helloworld_pb2_grpc


class Greeter(helloworld_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)


def serve():
    port = "50051"
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port("[::]:" + port)
    server.start()
    print("Server started, listening on " + port)
    server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig()
    serve()

 

어떤 helloworld_pb2_grpc.py라는 파일은 어떻게 만들까요?

 

바로 아까 가져온 helloworld.proto 의 파일을 통해 만들 수 있습니다. 해당 파일을 살펴보면 프로세스간 메시지 인터페이스를 정의하는 것으로 보입니다. 즉, 아래와 같이 Request와 Reply의 인터페이스를 정의한 파일을 만듭니다.

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}

  rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}

  rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

 

이제 helloworld.proto 의 파일을 만들었다면, 아래와 같은 명령어로 test 폴더 내부에 helloworld_pb2.py(python_out)와 helloworld_pb2_grpc.py(grpc_python_out)를 생성할 것입니다. 

python3 -m grpc_tools.protoc \
    -I./protos \
    --python_out=./test \
    --grpc_python_out=./test \
    ./protos/helloworld.proto

 

자 이제 무엇이 생성되었는지 살펴보겠습니다. 생성된 helloworld_pb2.py를 먼저 살펴보겠습니다. 조금 복잡해서 알아보기는 쉽지 않지만, 우리가 정의했던 Request와 Reply에 대한 정의를 담고 있습니다.

더보기

----------------------------------------------------------

<helloworld_pb2.py 파일 내용>

# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: helloworld.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3')

_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'helloworld_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:

  DESCRIPTOR._options = None
  DESCRIPTOR._serialized_options = b'\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW'
  _HELLOREQUEST._serialized_start=32
  _HELLOREQUEST._serialized_end=60
  _HELLOREPLY._serialized_start=62
  _HELLOREPLY._serialized_end=91
  _GREETER._serialized_start=93
  _GREETER._serialized_end=166
# @@protoc_insertion_point(module_scope)

----------------------------------------------------------

 

다음으로 생성된 helloworld_pb2_grpc.py 파일을 살펴보겠습니다. 파이썬으로 되어있어 알아보기 쉬우니 조금 자세히 보겠습니다.

  • GreeterStub : client측에서 서버에 요청을 하기 위한 stub 객체입니다. 위에서 설명한 바와 같이 RPC의핵심 개념이며 파라미터 객체를 메세지로 Marshalling/Unmarshalling하는 레이어입니다.
    ** Marshalling/Unmarshalling : Serialization/Deserialization이 구조화된 객체를 byte stream으로 변환하는 과정이라면, Marshalling/Unmarshalling은 비슷하지만 다른 언어 혹은 다른 플랫폼 간의 데이터를 주고 받을 때 변환하는 과정입니다.
    ** Manage/Unmanaged Data : C# Compiler가 생성한 코드인 Managed Code와 다르게 C/C++의 pointer로 구현된 코드들을 Unmanaged code라고 하는데, Unmanaged code에서 작업해야하는 경우 기존의 managed App 도메인의 데이터를 unmanaged 영역으로 Marshalling해야합니다. 
  • add_GreeterServicer_to_server : 우리가 만든 서비스를 외부에서 선언한 grpc서버에 추가하는 함수입니다.
  • GreeterServicer : 상속해서 만들 서비스의 부모 클래스 입니다. 
  • Greeter : 위 GreeterServicer를 상속해서 만든 서비스의 예시입니다. 사용되지 않습니다.
더보기

----------------------------------------------------------

<helloworld_pb2_grpc.py 파일 내용>

# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import helloworld_pb2 as helloworld__pb2


class GreeterStub(object):
    """The greeting service definition.
    """

    def __init__(self, channel):
        """Constructor.

        Args:
            channel: A grpc.Channel.
        """
        self.SayHello = channel.unary_unary(
                '/helloworld.Greeter/SayHello',
                request_serializer=helloworld__pb2.HelloRequest.SerializeToString,
                response_deserializer=helloworld__pb2.HelloReply.FromString,
                )


class GreeterServicer(object):
    """The greeting service definition.
    """

    def SayHello(self, request, context):
        """Sends a greeting
        """
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')


def add_GreeterServicer_to_server(servicer, server):
    rpc_method_handlers = {
            'SayHello': grpc.unary_unary_rpc_method_handler(
                    servicer.SayHello,
                    request_deserializer=helloworld__pb2.HelloRequest.FromString,
                    response_serializer=helloworld__pb2.HelloReply.SerializeToString,
            ),
    }
    generic_handler = grpc.method_handlers_generic_handler(
            'helloworld.Greeter', rpc_method_handlers)
    server.add_generic_rpc_handlers((generic_handler,))


 # This class is part of an EXPERIMENTAL API.
class Greeter(object):
    """The greeting service definition.
    """

    @staticmethod
    def SayHello(request,
            target,
            options=(),
            channel_credentials=None,
            call_credentials=None,
            insecure=False,
            compression=None,
            wait_for_ready=None,
            timeout=None,
            metadata=None):
        return grpc.experimental.unary_unary(request, target, '/helloworld.Greeter/SayHello',
            helloworld__pb2.HelloRequest.SerializeToString,
            helloworld__pb2.HelloReply.FromString,
            options, channel_credentials,
            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

----------------------------------------------------------

 

자이제 위에서 정의했던 서버를 이해했으니, 실행해 보겠습니다.

python3 server.py

 

이번엔 client에서 서버에 요청하는 과정을 살펴보겠습니다. 아래는 client.py 코드 입니다.

위에서 살펴봤던 GreeterStub을 선언해준 뒤, stub에 내장된 요청 함수를 통해 서버에 요청할 수 있게 되었네요.

from __future__ import print_function

import logging

import grpc
import helloworld_pb2
import helloworld_pb2_grpc


def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    print("Will try to greet world ...")
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(helloworld_pb2.HelloRequest(name="you"))
    print("Greeter client received: " + response.message)


if __name__ == "__main__":
    logging.basicConfig()
    run()

 

이제 파일을 실행해보겠습니다.

python3 client.py

 

아래와 같은 결과를 보니 서버가 제 역할을 잘 해주고 있는 것 같습니다.

Will try to greet world ...
Greeter client received: Hello, you!

c. stream 서버 만들기

 

기본 예시를 살펴보았으니, 참조해 나만의 서버를 만들어보겠습니다. 앞서 언급한 바와 같이 stream서버를 만들어 보는 것이 목적이지만, 두가지 모드 모두 만들어 보겠습니다. Batch 모드는 대량의 데이터를 순차적으로 처리하는 반면, Streaming 모드는 지속적으로 흐르는 데이터를 처리하게 만들 것입니다.

 

먼저 proto 파일을 작성해주어야 겠죠 ? proto파일은 nvidia riva에서 제공하는 proto파일(https://github.com/nvidia-riva/common/blob/main/riva/proto/riva_asr.proto)을 참조했습니다.

**Nvidia Riva : 음성 AI App을 개발하기 위한 GPU 가속 SDK로, 사전 훈련된 음성/언어 모델을 활용해 Resource를 효율적으로 활용하고 정확도 및 Performance까지도 개선해 서비스를 제공할 수 있습니다.

 

아래는 작성한 my_protobuf.proto 입니다. BatchStreaming 두가지 Mode에 대한 각각의 Request와 Response를 정의해 두었습니다.

syntax = "proto3";

package nvidia.riva.asr;

option cc_enable_arenas = true;

/* =============API Overview=========== */
service RivaSpeechRecognition {
    rpc Recognize(RecognizeRequest) 
    	returns (RecognizeResponse) {}
    rpc StreamingRecognize(stream StreamingRecognizeRequest) 
    	returns (stream StreamingRecognizeResponse) {}
}

/* =============Batch Mode============== */
message RecognizeRequest {
    bytes audio = 2;
}
message RecognizeResponse {
    SpeechRecognitionResult results = 1;
}
// A speech recognition result corresponding to the latest transcript
message SpeechRecognitionResult {
  bool success = 1;
  int32 channel_tag = 2;
  float audio_processed = 3;
  string text = 4;
}

/* ===========Streaming Mode============ */
message StreamingRecognizeRequest {
    oneof streaming_request {
        StreamingRecognitionConfig streaming_config = 1;
        bytes audio_content = 2;
    }
}
enum AudioEncoding {
    ENCODING_UNSPECIFIED = 0;
    LINEAR_PCM = 1;
    MULAW=2;
    ALAW=3;
    FLAC=4;
}
message StreamingRecognitionConfig {
    AudioEncoding encoding = 1;
    int32 sample_rate_hertz = 2;
    bool interim_results = 3;
}
message StreamingRecognizeResponse {
    repeated StreamingRecognitionResult results = 1;
}
message StreamingRecognitionResult {
  repeated SpeechRecognitionAlternative alternatives = 1;
  bool is_final = 2;
  int32 channel_tag = 5;
  float audio_processed = 6;
}
message SpeechRecognitionAlternative {
  string transcript = 1;
  float confidence = 2;
  repeated WordInfo words = 3;
}
message WordInfo {
  int32 start_time = 1;
  int32 end_time = 2;
}
/* ================================ */

 

위에서 예를 보셨다시피, 생성되는 메시지와 관련된 파이썬 코드는 한눈에 알아보기 어려운 구조를 가지고 있으므로, 위에서 작성한 proto파일을  참조해 코딩하는 경우가 많습니다. 

 

자 이제 위와 같이 두개의 파일을 생성해보겠습니다.

python3 -m grpc_tools.protoc \
    -I./protos \
    --python_out=./out \
    --grpc_python_out=./out \
    ./protos/my_protobuf.proto

 

앞서의 예시와 같이 my_protobuf_pb2.pymy_protobuf_pb2_grpc.py가 생성되었습니다. 내용은 보지 않겠습니다.

 

이번에 서버 코드를 작성해보겠습니다.

 

서버코드를 작성 전에 필자의 편의성을 위해 만든 HypothesisMyObject()를 소개하겠습니다.

Hypothesis는 최종적으로 grpc의 output으로 만들어지기 전의 데이터구조로, 위 proto파일을 참조하면, 해당 response에 들어 있는 모든 구조체들을 담아둔 것을 알 수 있습니다. batch 결과와 streaming 결과를 모두 담아 두었습니다.

 

MyObject()는 Request가 왔을 때 실제로 처리하는 역할을 할 객체입니다. 사실 원하는 어떠한 처리방식을 구현해두면 좋지만, 이 글의 목적과 다르니 데이터를 임의의 Hypothesis에 담아서 return만 하겠습니다. stream일 때와 batch일 때 다르게 처리하도록 했으며, stop_event라는 객체를 가지고 있어 해당 object가 동작중인지를 외부에서 확인할 수 있도록 했습니다.

# Refer to proto
@dataclass
class Hypothesis:
    success: bool
    channel_tag : int
    audio_processed : float 
    text : str
    
    transcript : str
    confidence : float
    start_time : int = -1
    end_time : int = -1
    is_final : bool = True

class MyObject():
    def __init__(self):
        self.total_length = 0
        self.stop_event=Event()
    def with_stream(self, chunk_iter):
        last_chunk=False
        while True:
            try :  
                if self.stop_event.is_set():
                    raise StopIteration
                chunk_bytes =next(chunk_iter)
                self.total_length += len(chunk_bytes)
            except StopIteration:
                last_chunk=True
            if last_chunk :
                self.total_length = 0
                hyp = Hypothesis(
                    True,32,0.0,"Mystring",
                    "Nothing",0.0,-1,-1,True
                ) 
                yield hyp
                break
            else : 
                hyp = Hypothesis(
                    True,32,0.0,"Mystring",
                    "Nothing",0.0,-1,-1,False
                ) 
                yield hyp
    def with_batch(self, chunk):
        last_chunk=True
        hyp = Hypothesis(
            True,32,0.0,"Mystring",
            "Nothing",0.0,-1,-1,True
        ) 
        return hyp

 

자 그럼 이제 본격적으로 server.py를 작성해보겠습니다. 위 greeter 서버 예시를 활용해 수정했으며, 위 server.py와 많이 다르지 않습니다. 하지만 아래 두가지가 다릅니다.

  • 생성된 my_protobuf_pb2_grpc.py 를 참조해서 해당 파일에 포함되어 있는 함수를 활용해 이름을 바꾸어 주었습니다.
  • private key를 활용해서 포트를 추가해주는 방법을 추가해보았습니다.
from concurrent import futures
import logging

import grpc
import my_protobuf_pb2
import my_protobuf_pb2_grpc

def serve():
    SECURE_MODE=False
    port = "50051"
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
    my_protobuf_pb2_grpc.add_RivaSpeechRecognitionServicer_to_server(Greeter(), server)
    if SECURE_MODE:
        with open('cert.key', 'r') as fd:
            private_key = fd.read()
        with open('chain.pem', 'r') as fd:
            certificate_chain = fd.read()
        server_credentials = grpc.ssl_serve_credentials( ((private_key,certificate_chain),) )
        server.add_secure_port("0.0.0.0:50051", server_credentials)
    else:
        #server.add_insecure_port("0.0.0.0:50051")
        server.add_insecure_port("[::]:" + port)
    server.start()
        
    print("Server started, listening on " + port)
    server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig()
    serve()

 

그럼 이제 위에서 서버에 추가한 Greeter() 클래스를 바꾼 내용을 살펴보겠습니다. (이름을 바꾸면 좋지만 그대로 두겠습니다)

  • 생성되었던 my_protobuf_pb2_grpc.py의 내용 중 일부는 아래와 같습니다. RivaSpeechRecognitionServicer을 상속해서 클래스를 만들어야 하기 때문에 상속한 Class의 껍질 함수를 보면 Recognition() 함수와 StreamingRecognition() 함수를 구현해야 하는 것을 알 수 있습니다.
...
class RivaSpeechRecognitionServicer(object):
    def Recognize(self, request, context):
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')

    def StreamingRecognize(self, request_iterator, context):
        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
        context.set_details('Method not implemented!')
        raise NotImplementedError('Method not implemented!')
...
  • 위에서 설명한 MyObject()를 Queue형태로 유지하면서 작동하도록 만들어주었습니다. (이는 여러개의 요청이 들어올 때 각각 다른 object가 Request를 처리 할 수 있도록 만들어 보았습니다.)
  • Recognize() 함수를 먼저 보면, 결과를 얻어 Response로 만들어주기 위해 my_protobuf_pb2.py파일을 참조합니다.
    • 즉, 데이터구조를 참조하고 싶을때 참조하는데 해당 파일은 참조하기 상당히 복잡하므로 위의 my_protobuf.proto 파일을 보면서 작성하는 것을  추천합니다.
    • 해당 response를 만들어 return해주면 끝납니다.
  • StreamingRecognize() 함수를 이번엔 살펴보겠습니다. 역시나 결과를 얻어 Response로 만들어주기 위해 my_protobuf_pb2.py파일을 참조합니다.
    1. 먼저 iterator를 만들어 주었는데, timeout_iterator은 아래 더보기를 클릭해서 참조하세요.
    2. input 중 context에는 어떤 행동을 callback으로 넣어 줄 수 있습니다.
    3. 데이터가 들어올 때 configuration이 먼저 들어오고, data가 들어오게 됩니다. 따라서 순차적으로 처리하도록 구현했습니다.
# TK New Import
from queue import Queue
import timeout_iterator as iterators
from dataclasses import dataclass
from threading import Event


class Greeter(my_protobuf_pb2_grpc.RivaSpeechRecognitionServicer):
    def __init__(self):
        self.recognizers=Queue(maxsize=8) # max_workers
        for i in range(8):
            self.recognizers.put(MyObject())
            
    def Recognize(self, request, context) : 
        object = self.recognizers.get(block=True, timeout=None)
        result = object.with_batch(request.audio)
        res = my_protobuf_pb2.SpeechRecognitionResult(
            success=result.success,
            channel_tag=result.channel_tag,
            audio_processed=result.audio_processed,
            text=result.text
        ) 
        resp = my_protobuf_pb2.RecognizeResponse(results=res)
        self.recognizers.put(object)
        yield resp
        
    def StreamingRecognize(self, request_iterator, context) : 
        req_iter = iterators.TimeoutIterator(request_iterator, timeout=3)
        object = self.recognizers.get(block=True, timeout=None)
        object.stop_event.clear()
        def on_rpc_done(): 
            object.stop_event.set()
        context.add_callback(on_rpc_done)
        
        #============= Get Configuration ===============
        try :
            req = next(req_iter)
            if req is req_iter.get_sentinel():
                print("Timeout has been reached, close stream")
                context.cancel()
            if req.streaming_config:
                myconfig = req.streaming_config
                config_done = True
            else:
                config_done = False
        except StopIteration:
            config_done = False
        except Exception:
            config_done = False
            

        #================ Get Data ==================
        if config_done:
            for result in object.with_stream(mystream(req_iter, context)): # WOW while function loop??
                if object.stop_event.is_set():
                    continue
                #------------making result--------------
                res = my_protobuf_pb2.StreamingRecognitionResult(
                    alternatives = [my_protobuf_pb2.SpeechRecognitionAlternative(
                        transcript=result.transcript,
                        confidence=result.confidence,
                        words=[my_protobuf_pb2.WordInfo(
                                    start_time=result.start_time, 
                                    end_time=result.end_time  
                                )]
                    )],
                    is_final=result.is_final,
                    channel_tag=result.channel_tag,
                    audio_processed=result.audio_processed,
                ) 
                resp = my_protobuf_pb2.StreamingRecognizeResponse(results=[res])
                yield resp
        else:
            pass;
        self.recognizers.put(object)

 

위 내용 중 for loop에 iterative한 mystream()을 넣어주었는데, 이는 연속된 stream data를 모아 하나의 chunk로 만들어 주기 위한 함수입니다. 어떤 식으로든 구현이 가능하지만 참조하시려면 아래 더보기를 참조하세요.

** 이렇게 계속 iterator로 만들어 주는 이유는, loop iterator를 제공해야 channel이 연결되고 어떤 return으로 끝나는것이 아니라 계속해서 stream을 받아 처리할 수 있기 때문입니다.

더보기

----------------------------------------------------------------------------

 <stream을 chunk로 만들기>

 

stream함수는 연속된 데이터를 모아 하나의 chunk로 만들어주는 역할을 하는 함수입니다. next()함수를 사용하기 위해 iterator를 만드는 함수를 사용해 인자로 넣어주었는데, 내용은 아래 더보기를 참조하시면 됩니다.

#================ Get Data(Function) ============
def mystream(req_iter, context):
    while True : 
        try :
            req = next(req_iter)
            if req is req_iter.get_sentinel():
                print("Timeout has been reached, close stream")
                context.cancel()
                break;
        except StopIteration:
            break
        if not req.audio_content:
            myconfig = req.streaming_config
            yield myconfig
        else:
            mydata = req.audio_content
            yield mydata

----------------------------------------------------------------------------

더보기

----------------------------------------------------------------------------

 <Timeout Iterator의 구현>

 

해당 클래스는 기존에 있는 소스(https://github.com/leangaurav/pypi_iterator/blob/main/iterators/timeout_iterator.py)를 사용했습니다. 코드는 아래와 같습니다.

import queue
import asyncio
import threading

class TimeoutIterator:
    """
    Wrapper class to add timeout feature to synchronous iterators
    - timeout: timeout for next(). Default=ZERO_TIMEOUT i.e. no timeout or blocking calls to next. Updated using set_timeout() 
    - sentinel: the object returned by iterator when timeout happens
    - reset_on_next: if set to True, timeout is reset to the value of ZERO_TIMEOUT on each iteration

    TimeoutIterator uses a thread internally.
    The thread stops once the iterator exhausts or raises an exception during iteration.

    Any excptions raised within the wrapped iterator are popagated as it is.
    Exception is raised when all elements geenerated by the actual iterator before exception have been consumed
    Timeout can be set dynamically before going for iteration
    """
    ZERO_TIMEOUT = 0.0

    def __init__(self, iterator, timeout=0.0, sentinel=object(), reset_on_next=False):
        self._iterator = iterator
        self._timeout = timeout
        self._sentinel= sentinel
        self._reset_on_next = reset_on_next

        self._interrupt = False
        self._done = False
        self._buffer = queue.Queue()
        self._thread = threading.Thread(target=self.__lookahead)
        self._thread.start()

    def get_sentinel(self):
        return self._sentinel

    def set_reset_on_next(self, reset_on_next):
        self._reset_on_next = reset_on_next

    def set_timeout(self, timeout: float):
        """
        Set timeout for next iteration
        """
        self._timeout = timeout

    def interrupt(self):
        """
        interrupt and stop the underlying thread.
        the thread acutally dies only after interrupt has been set and
        the underlying iterator yields a value after that.
        """
        self._interrupt = True

    def __iter__(self):
        return self

    def __next__(self):
        """
        yield the result from iterator
        if timeout > 0:
            yield data if available.
            otherwise yield sentinal
        """
        if self._done:
            raise StopIteration

        data = self._sentinel
        try:
            if self._timeout > self.ZERO_TIMEOUT:
                data = self._buffer.get(timeout=self._timeout)
            else:
                data = self._buffer.get()
        except queue.Empty:
            pass
        finally:
            # see if timeout needs to be reset
            if self._reset_on_next:
                self._timeout = self.ZERO_TIMEOUT

        # propagate any exceptions including StopIteration
        if isinstance(data, BaseException):
            self._done = True
            raise data

        return data

    def __lookahead(self):
        try:
            while True:
                self._buffer.put(next(self._iterator))
                if self._interrupt:
                    raise StopIteration()
        except BaseException as e:
            self._buffer.put(e)

class AsyncTimeoutIterator:
    """
    Async version of TimeoutIterator. See method documentation of TimeoutIterator
    """
    ZERO_TIMEOUT = 0.0

    def __init__(self, iterator, timeout=0.0, sentinel=object(), reset_on_next=False):
        self._iterator = iterator
        self._timeout = timeout
        self._sentinel= sentinel
        self._reset_on_next = reset_on_next

        self._interrupt = False
        self._done = False
        self._buffer = asyncio.Queue()
        self._task = asyncio.get_event_loop().create_task(self.__lookahead())

    def get_sentinel(self):
        return self._sentinel

    def set_reset_on_next(self, reset_on_next):
        self._reset_on_next = reset_on_next

    def set_timeout(self, timeout: float):
        self._timeout = timeout

    def interrupt(self):
        self._interrupt = True

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self._done:
            raise StopAsyncIteration

        data = self._sentinel
        try:
            if self._timeout > self.ZERO_TIMEOUT:
                data = await asyncio.wait_for(self._buffer.get(), self._timeout)
            else:
                data = await self._buffer.get()
        except asyncio.TimeoutError:
            pass
        finally:
            # see if timeout needs to be reset
            if self._reset_on_next:
                self._timeout = self.ZERO_TIMEOUT

        # propagate any exceptions including StopIteration
        if isinstance(data, BaseException):
            self._done = True
            raise data

        return data

    async def __lookahead(self):
        try:
            while True:
                data = await self._iterator.__anext__()
                await self._buffer.put(data)
                if self._interrupt:
                    raise StopAsyncIteration()
        except BaseException as e:
            await self._buffer.put(e)

----------------------------------------------------------------------------

** return 과 yield의 차이 : https://tkayyoo.tistory.com/162를 참조하세요.

 


 

이제 서버를 만들었으니, 요청할 client.py를 구현해보겠습니다.

 

먼저 batch 요청을 만들어 내기 위한 client.py를 만들어보겠습니다. my_protobuf_pb2_grpc.py 를 참조해서 해 stub 객체를 선언하고 이 객체를 해 Recognize() 함수를 서버에 요청할 수 있습니다.

 

또한 my_protobuf_pb.py를 참조해 Request를 만들어야하는데 역시나 my_protobuf.proto를 참조해서 작성하는 것을 추천합니다. 

from __future__ import print_function

import logging
import grpc

import my_protobuf_pb2
import my_protobuf_pb2_grpc

def run():
    print("Will try to greet world ...")
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = my_protobuf_pb2_grpc.RivaSpeechRecognitionStub(channel)

        print("waitning...")
        response = stub.Recognize(my_protobuf_pb2.RecognizeRequest(audio=bytes("you",'utf-8')))
        print("waitning...done")
        print("Greeter client received (text) : {}".format(response.results.text))

if __name__ == "__main__":
    logging.basicConfig()
    run()

 


다음으로 streaming 요청을 만들어 내기 위한 client.py는 아래와 같습니다. streaming_recognize()라는 함수를 통해 오디오 데이터를 처리할 iterator를 만들어 주었습니다.

from __future__ import print_function

import logging
import grpc

import my_protobuf_pb2
import my_protobuf_pb2_grpc

def run():
    print("Will try to greet world ...")
    
    with grpc.insecure_channel("localhost:50051") as channel:
        stub = my_protobuf_pb2_grpc.RivaSpeechRecognitionStub(channel)

        #=========================================
        from pydub import AudioSegment
        import wave

        wav_file = "./CantinaBand3.wav"
        data = AudioSegment.from_file(wav_file)
        #=========================================
        
        recognizer_outputs = streaming_recognize(wav_iter(data), stub)
        
        #========================================= Result check
        idx = 0
        while True:
            try:
                result = next(recognizer_outputs)
                print("Transcript : {}".format(result.results[0].alternatives[0].transcript))
                print("StartTime : {}".format(result.results[0].alternatives[0].words[0].start_time))
            except StopIteration:
                break;
        #print("result" + result)

        #=========================================


if __name__ == "__main__":
    logging.basicConfig()
    run()

 

streaming_recognize함수를 살펴보면 위에서의 방법과 같이 stub을 활용해 StreamingRecognize()를 호출했습니다. 호출할 때 iterator를 만들어주어야하기 때문에 request_iter()라는 함수를 따로 만들어 넣어주었습니다. 

from typing import Iterator        
def streaming_recognize(chunks:Iterator[bytes], stub):
    config = my_protobuf_pb2.StreamingRecognitionConfig()
    def requests_iter():
        req = my_protobuf_pb2.StreamingRecognizeRequest(streaming_config=config)
        for chunk in chunks:
            req = my_protobuf_pb2.StreamingRecognizeRequest(audio_content=chunk)
            yield req
    for res in stub.StreamingRecognize(requests_iter()):
        yield res
더보기

---------------------------------------------------------------

<데이터를 분리해서 처리하는 wav_iter함수>

 

단순히 오디오 데이터를 chunk로 분리해 byte데이터를 넘겨주는 iterator입니다.

pydub 패키지의 make_chunks함수를 활용했으며, 뒤에 20은 chunk의 길이로 20ms길이의 오디오 segment로 나누고 싶다는 뜻입니다.

또한 실제 현실과 비슷하게 넣어주기 위해 20ms마다 sleep을 걸어주었습니다.

from pydub.utils import make_chunks
from time import sleep
def wav_iter(data):
    # chunk_size=20
    for chunk in make_chunks(data,20):
        sleep(0.02)
        yield chunk.raw_data

---------------------------------------------------------------


3. WebRTC를 활용한 서버 구축

 

이번엔 WebRTC를 위한 Signaling, STUN, TURN을 포함한 서버를 구축해보려고합니다. 


a. 환경 구축

 

우리가 활용할 소스코드는 go언어로 구현이 되어 있어 공식 홈페이지(https://go.dev/doc/install)를 참조해 설치했습니다.

wget https://go.dev/dl/go1.21.5.linux-amd64.tar.gz

rm -rf /usr/local/go && tar -C /usr/local -xzf go1.21.5.linux-amd64.tar.gz

export PATH=$PATH:/usr/local/go/bin

go version

 

Go 언어에 대한 간단한 설명은 아래 더보기를 참조하세요.

더보기

----------------------------------------------------

<Go 언어 (golang)이란>

 

Go 프로그래밍 언어는 2007년 구글(Google)에서 개발을 시작하여 2012년 Go 1.0이 나왔습니다. 이는 시스템 프로그래밍에 적합하도록 설계되었으며, C++의 복잡함이 싫어서 Go를 만들었다고 하지만 구문은 C언어와 비슷합니다. 컴파일 언어이지만 컴파일러의 컴파일 속도가 매우 빨라 인터프리터 언어처럼 쓸 수 있습니다.

 

생소할 만한 go언어의 특징 몇가지만 소개하겠습니다.

  • 기본
    • := assign
    • 구조체(struct)가 class와 상응 합니다.
      ex) type User struct{
          userSN int64
          name string
          age int16
      }
    • 슬라이스 : golang의 동적배열입니다.
      ex) var slice1 []int
      slice2 := [][]int{{6, 7, 8}, {1, 2, 3, 4, 5}}
  • 패키지 관리
    • go는 package저장소가 없어 git주소를 이름으로 합니다.
      ex) import ( "github.com/golang/example/stringutil" )
    • go 프로젝트를 시작할 때 go mod init MYPROJECT을 실행하면 go.mod파일이 생성되며 해당 프로젝트의 package명이 정의됩니다. 로컬에 있는 다른 디렉토리의 파일을 import하고 싶을 때는 해당 프로젝트 명을 기준으로 import를 해야합니다. (메인 go파일에는 상위에 package main을 넣어주어야합니다.)
      ex) import ("MYPROJECT/local/path")
    • 위처럼 로컬 파일을 import 할때는 기준이 폴더 입니다.
      ex) 위는 ../local/path/path.go
    • import 한 후에 go mod tidy를 실행하면 현재 없는 모듈이 받아집니다.
  • 함수
    • 어떤 구조체(struct)의 method 정의시, 해당 메서드가 어떤 구조체의 것인지 명시 해줄 수 있습니다. 
      ex) type MYTYPE struct {a int64 b string}
      func (u *MYTYPE) method1(name string){u.b = name}
    • 함수의 input 파라미터는 순서가 type이 뒤로 갑니다.
      ex) func function(a int, b int) (ret int){}
    • method 정의시 리턴의 타입을 명시할 수 있습니다. (오른쪽아래2),
      ex) func function(a int, b int) (ret int){}
  • 기본 패키지
    • fmt : print를 하는데 주로 사용하는 기본 패키지입니다.
      ex) fmt.Println("Hello, World")
    • strconv : 데이터 타입과 string 간의 변환을 도와주는 패키지입니다.
      ex) strconv.Itoa('11111')
    • gopkg.in/ini.v1 : 파일을 읽고 쓸때 사용합니다.
      ex) ini.Load("config.ini")
  • 실행
    • 실행시 go run main.go혹은 해당 프로젝트의 top에서 go run .을 실행해줍니다. 후자는 현재 위치에 go파일이 존재해야합니다.
    • go build . 명령어를 활용해 빌드를 해 바이너리 파일을 만들 수도 있습니다. 

----------------------------------------------------


b. example을 통한 서버 구현

 

webRTC의 경우 오픈된 예시 서버 코드(https://github.com/flutter-webrtc/flutter-webrtc-server)를 통해 서버를 실행해볼 것입니다. 사실 gRPC와 다르게 기능이 정해져 있는 서버이므로 해당 코드에서 어떤 부분을 바꾸면 Customize 할 수 있을지 코드를 설명하며 살펴보겠습니다. 

 

flutter-webrtc-server

── cmd
      └── server

                  └── main.go

── pkg

      ├── logger

                  └── logger.go

      ├── signaler

                  └── signaler.go

      ├── turn

                  └── turn.go

      ├── util

                  └── expire.go
      └── websocket

                  ├── conn.go

                  └── server.go
└── configs

      ├── certs

                 ├── cert.pem
                └── key.pem

      └── config.ini

 

아래는 위 코드 트리를 가시화해 보였으며, 빨간 선은 조금 복잡한 참조 경로를 가시화해 나타내기 위해 보였습니다.

[프로젝트 구조]

 

먼저 가장 기본적인 파일의 설명은 아래와 같습니다. 

  • cmd/server/main.go : 메인 
  • configs/configs.ini : 옆에 설명 
  • pkg/websocket, signaler, turn : 메인에서 사용할 핵심 패키지들 입니다.

 

아래 명령어를 통해 프로그램을 실행할 수 있으며

go run cmd/server/main.go

 

이제 어떤 식으로 동작하는 지를 cmd/server/main.go를 자세히 살펴보겠습니다. 

package main

import (
	"os"

	"github.com/flutter-webrtc/flutter-webrtc-server/pkg/logger"
	"github.com/flutter-webrtc/flutter-webrtc-server/pkg/signaler"
	"github.com/flutter-webrtc/flutter-webrtc-server/pkg/turn"
	"github.com/flutter-webrtc/flutter-webrtc-server/pkg/websocket"
	"gopkg.in/ini.v1"
)

func main() {

 

main패키지는 항상 선언되어야 합니다. 또한 우리가 보고 있는 패키지의 이름은 "github.com/flutter-webrtc/flutter-webrtc-server" 입니다. 따라서 해당 이름을 기준으로 다른 패키지들을 import하고있는 모습을 볼 수 있습니다. ini 패키지config.ini파일을 읽기 위해 사용하는 패키지입니다.

 

이제 config파일의 정보를 얻어내는 단순한 과정을 살펴볼텐데, 어떤 값이 들어있는지 configs/config.ini 파일을 먼저 살펴보겠습니다.

[general] 
domain=demo.cloudwebrtc.com 
cert=configs/certs/cert.pem 
key=configs/certs/key.pem 
bind=0.0.0.0 
port=8086 
html_root=web 

[turn] 
public_ip=127.0.0.1 
port=19302 
realm=flutter-webrtc

 

내용을 보면 general에는 서버 자체의 SSL인증을 위한 정보, port등이 들어있고, turn에는 turn 서버의 정보가 들어있습니다. 사실 앞서 언급한 바와 같이 Customization을 위해서 이 config 파일만 수정을 해주면 원하는 기능을 하는 서버를 만들 수 있습니다. 하지만 뒤에 코드를 보며 왜 바꾸어야 하는지 살펴보겠습니다. 

 

1. 위와 같이 Config파일의 정보를 얻어낸 뒤, 해당 정보로 turn서버를 만들어 내는 main.go 의 내용은 아래와 같습니다.

따라서 PublicIP는 127.0.0.1, Port는 19302, Realm은 flutter-webrtc로 되어있네요. 외부에서 요청하는 IP를 허용하고 싶은 경우 PublicIP를 자신의 public ip로 바꾸고 Port를 원하는 UDP port로 바꿔주면 됩니다.

cfg, err := ini.Load("configs/config.ini")
if err != nil {
    logger.Errorf("Fail to read file: %v", err)
    os.Exit(1)
}

publicIP := cfg.Section("turn").Key("public_ip").String()
stunPort, err := cfg.Section("turn").Key("port").Int()
if err != nil {
    stunPort = 3478
}
realm := cfg.Section("turn").Key("realm").String()

turnConfig := turn.DefaultConfig()
turnConfig.PublicIP = publicIP
turnConfig.Port = stunPort
turnConfig.Realm = realm
turn := turn.NewTurnServer(turnConfig)

 

자 그럼 turn서버를 선언한 NewTurnServer 메서드를 살펴보기에 앞서 잠시 pkg/turn/turn.go에 어떤 구조체가 선언되었는지 살펴보겠습니다.

package turn

type TurnServerConfig struct {
    PublicIP string
    Port     int
    Realm    string
}

func DefaultConfig() TurnServerConfig {
    return TurnServerConfig{
    PublicIP: "127.0.0.1",
    Port:     19302,
    Realm:    "flutter-webrtc",
    }
}

type TurnServer struct {
    udpListener net.PacketConn
    turnServer  *turn.Server
    Config      TurnServerConfig
    AuthHandler func(username string, realm string, srcAddr net.Addr) (string, bool)
}

 

그럼 살펴보기로했던 NewTurnServer를 살펴보겠습니다. 객체가 명시되지 않았으므로, 메서드가 아닌 함수 입니다.

func  NewTurnServer(config TurnServerConfig) *TurnServer {
	server := &TurnServer{
		Config:      config,
		AuthHandler: nil,
	}
	 if   len (config.PublicIP) == 0 {
		logger.Panicf( "'public-ip' is required" )
	}
    /* Declare UDP Listener with net package */
	udpListener, err := net.ListenPacket( "udp4" ,  "0.0.0.0:" +strconv.Itoa(config.Port))
	 if  err != nil {
		logger.Panicf( "Failed to create TURN server listener: %s" , err)
	}
	server.udpListener = udpListener
 
    /* Declare UDP Listener with turn package */
	turnServer, err := turn.NewServer(turn.ServerConfig{
		Realm:       config.Realm,
		AuthHandler: server.HandleAuthenticate,
		PacketConnConfigs: []turn.PacketConnConfig{
			{
				PacketConn: udpListener,
				RelayAddressGenerator: &turn.RelayAddressGeneratorStatic{
					RelayAddress: net.ParseIP(config.PublicIP),
					Address:       "0.0.0.0" ,
				},
			},
		},
	})
	 if  err != nil {
		logger.Panicf( "%v" , err)
	}
	server.turnServer = turnServer
	 return  server
}

 

먼저 정의를 보니 위에서 정의했던 TurnServerConfig를 받아 위에서 정의했던 TurnServer객체를 return하도록 되어있네요.

 

먼저 객체를 선언한 후에,  0:0:0:0:19302를 통해 udp4 프로토콜로 listening 할 udpListener를 만들어 넣어주었습니다. 이 때  net이라는 패키지를 사용했습니다.

 

마지막으로 turn 패키지를 활용해 "flutter-webrtc"라는 realm에 HandleAuthenticate라는 정의된 메서드와 Packet Connection 정보를 넣어주고 turnServer 객체를 만들어주었습니다.

이때 앞서 선언한 udpListnerRelayAddressGeneratorStatic이라는 객체를 넣어주는데, 이는 우리가 정의한 public IP(위에서는 127.0.0.1)으로 relay하고 싶을때 만들어줍니다. 

 

정리하면 User에게 "나는 이런 IP에 존재하는 turn서버야"라고 알려주기 위한 정보들을 정리했다고 보시면 됩니다.

 

 

2. 자 다시 main으로 돌아옵시다. signaler라는 로컬에 있는 패키지를 통해 NewSignaler메서드를 실행해주었으니, 다시 또  signaler.go 파일을살펴보겠습니다. 해당메서드에 앞서 선언했던 turn 객체를 넣어주었네요 

signaler := signaler.NewSignaler(turn)

 

자 그럼 signaler서버를 선언한 NewSignaler 메서드를 살펴보기에 앞서 잠시 pkg/signaler/signaler.go에 있는 구조체 중 중요한 부분만 살펴보겠습니다.

  • TurnCredential : User에게 제공할 정보들을 담을 객체입니다.
  • Method : Request, PeerInfo, Negotiation, Byebye, Error 등의 인터페이스를 정의합니다.
  • Signaler : 최종적으로 만들 실제 signaler서버 객체입니다.
package signaler 

const  (
	sharedKey =  `flutter-webrtc-turn-server-shared-key` 
)
 
type  TurnCredentials  struct  {
	Username string    `json:"username"` 
	Password string    `json:"password"` 
	TTL      int       `json:"ttl"` 
	Uris     []string  `json:"uris"` 
}
 
type  Method string
const  (
	New       Method =  "new" 
	Bye       Method =  "bye" 
	Offer     Method =  "offer" 
	Answer    Method =  "answer" 
	Candidate Method =  "candidate" 
	Leave     Method =  "leave" 
	Keepalive Method =  "keepalive" 
)
 
type  Signaler  struct  {
	peers      map [string]Peer
	sessions   map [string]Session
	turn      *turn.TurnServer
	expresMap *util.ExpiredMap
}

 

자이제 살펴보기로 했던 NewSignaler 메서드를살펴보겠습니다. 역시나 객체가 명시되지 않았으므로 메서드가 아니라 함수 입니다.

 

결국 signaler객체를 만들어주는 것이 목표인데, 이전에 만들었던 turn서버와 자체 메서드인 authHandler를 넣어주었네요.

func NewSignaler(turn *turn.TurnServer) *Signaler {
	var signaler = &Signaler{
		peers:     make(map[string]Peer),
		sessions:  make(map[string]Session),
		turn:      turn,
		expresMap: util.NewExpiredMap(),
	}
	signaler.turn.AuthHandler = signaler.authHandler
	return signaler
}

 

이외에 Signaler객체에는 HandleNewWebSocketHandleTurnServerCredentials라는 메서드가 정의되어있습니다. 이 두 함수가 Signaler 객체의 핵심 기능 함수이므로 자세히 살펴보겠습니다.

  • HandleNewWebSocket : websocket connection을 만들고, New, Leave, Offer, Answer, Candidate, Bye, Keepalive라는 request에 맞게 message를 unmarshal해서 처리하는 프로세스를 정의해두었습니다.
    request별 처리방법 중에 NewNotifyPeersUpdate라는 자체 메서드 활용했으며, Candidate, Bye, KeepaliveSend라는 자체 메서드를 활용했습니다.
    Send 메서드는 주어진 websocket connection으로 message를 marshal해서 보내는 역할을 하는 메서드입니다.
  • HandleTurnServerCredentials : http Request를 받으면 CORS 옵션을 포함해 http response에 turn서버의 정보를 return 합니다. turn 서버의 정보를 제공해주면, 이 정보를 토대로 iceServer객체를 만들어낼 것입니다.
    이렇게 만들어진 iceServer 객체는 이후에 해당 turn 서버의 기능을 활용해 cloud-based relay service를 제공받아 한쪽이 incapable이 되더라도 connection이 설계될 수 있게 해줄 것입니다. request에 service로 제공할 수 있는 값은 'turn'만 가능합니다.
    • ttl : TTL을 위해 입력해주는 값이며, 입력해줄 86400sec은 하루를 의미합니다.
      ** TTL(Time To Live) : Network Layer에서 패킷이 무한으로 순환하는 것을 막아주는 역할입니다. TTL이 없으면 패킷이 네트워크를 통해 목적지를 찾지 못하고 떠돌아다닐 것이라 라우터와 같은 장비를 거칠때마다 TTL이 1씩 감소하게 해서 패킷이 드랍되도록 하는 것입니다.
    • uris : turn 서버에 도달하는데 사용될 주소이며 turn서버의 (public address:port)형태를 제공합니다. 넣어줄 주소는 public IP(위에서는 127.0.0.1)와 Port(위에서는 19302)를 넣어줍니다.
더보기

---------------------------------------------------------

<Access Control Allow Origin 이란>

 

먼저 CORS(Cross Origin Resource Sharing)의 개념에 대해서 살명하면, CORS는 웹앱에서 현재 사용하고 있는 하나의 출처 외에 다른 자원에 접근할 수 있는 권한을 브라우저에 알려주는 체제입니다.

 

즉, 웹앱은 HTTP를 사용해서 서버로부터 main page를 제공받았을텐데, 브라우저는 기본적으로 자신의 출처(main page)와 다른 서버의 리소스를 불러오는 교차 출처(Cross-Site)를 보안상의 이유로 제한합니다. 따라서, 다른 출처에서 리소스를 불러오려면 애초에 main 페이지를 불러올 때 CROS 헤더에 "해당 페이지는 가능하다"는 응답을 반환받아야 합니다.


예를 들어 A서버에서 처리중인 웹페이지가 B서버에 요청을 하면, B서버는 도착한 HTTP header에 보고  A서버에서 실행중인 웹페이지 임을 알 수 있습니다.

 

이때 A서버가 응답시 "Access-Control-Allow-Origin :*"로 응답을 하면, 모든 도메인에서 cross-site방식으로 리소스에 접근이 가능함을 알릴 수 있습니다.

 

하지만 "Access-Control-Allow-Origin :A"로 응답을하면 A 이외의 도메인은 교차 출처방식으로 리소스에 접근할 수 없습니다.

---------------------------------------------------------

정리하면, 실질적으로 서버에서 동작할 Signaling서버와 Turn서버의 기능들을 만들어준 것입니다.

 

 

3. 다음으로 다시 main으로 돌아와보겠습니다. websocket패키지의 NewWebSocketServer라는 메서드를 불렀는데, 해당 메서드에 위에서 선언한 signaler 객체의 HandleNewWebSocket 메서드와 HandleTurnServerCredentials 메서드를 넣어주었습니다. 

wsServer := websocket.NewWebSocketServer(signaler.HandleNewWebSocket, signaler.HandleTurnServerCredentials)

 

객체에 대한 설명은 생략하고 NewWebSocketServer 메서드를살펴보겠습니다. 역시나 객체가 명시되지 않았으므로 메서드가 아니라 함수 입니다.

func NewWebSocketServer(
	wsHandler func(ws *WebSocketConn, request *http.Request),
	turnServerHandler func(writer http.ResponseWriter, request *http.Request)) *WebSocketServer {
    
	var server = &WebSocketServer{
		handleWebSocket:  wsHandler,
		handleTurnServer: turnServerHandler,
	}
	server.upgrader = websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}
	return server
}

 

wsHandlerturnServerHandler를 받아 WebSocketServer라는 새로운 객체에 넣어주었네요. 

 

이외에 WebSocketServer 객체에는 handleWebSockerRequest와 handleTurnServerRequest가 메서드로 선언되어있습니다.

  • handleWebSockerRequest : 이전에 넣어주었던 signaler의 HandleNewWebSocket를 실행
  • handleTurnServerRequest : 이전에 넣어주었던 signaler의 HandleTurnServerCredentials를 실행

정리하면 위에서 만든 함수들을 활용해 WebSocket 서버를 만들었습니다.

 

4. 마지막으로 다시 main으로 돌아와보겠습니다. 위에서 살펴보았던 configs/config.ini 파일의 [general]에 해당하는 부분은 아래와 같습니다.

[general]
domain=demo.cloudwebrtc.com
cert=configs/certs/cert.pem
key=configs/certs/key.pem
bind=0.0.0.0
port=8086
html_root=web

 

위 내용들을 참고로 위에서 선언한 websocket 객체의 Bind메서드를 실행해주었네요.

sslCert := cfg.Section("general").Key("cert").String()
sslKey := cfg.Section("general").Key("key").String()
bindAddress := cfg.Section("general").Key("bind").String()

port, err := cfg.Section("general").Key("port").Int()
if err != nil {
    port = 8086
}

htmlRoot := cfg.Section("general").Key("html_root").String()

config := websocket.DefaultConfig()
config.Host = bindAddress
config.Port = port
config.CertFile = sslCert
config.KeyFile = sslKey
config.HTMLRoot = htmlRoot

wsServer.Bind(config)

 

자이제 websocet에 선언 되어있던 Bind 메서드를 살펴보겠습니다. 

// Bind .
func (server *WebSocketServer) Bind(cfg WebSocketServerConfig) {
	// Websocket handle func
	http.HandleFunc(cfg.WebSocketPath, server.handleWebSocketRequest)
	http.HandleFunc(cfg.TurnServerPath, server.handleTurnServerRequest)
	http.Handle("/", http.FileServer(http.Dir(cfg.HTMLRoot)))
	logger.Infof("Flutter WebRTC Server listening on: %s:%d", cfg.Host, cfg.Port)
	// http.ListenAndServe(cfg.Host+":"+strconv.Itoa(cfg.Port), nil)
	panic(http.ListenAndServeTLS(cfg.Host+":"+strconv.Itoa(cfg.Port), cfg.CertFile, cfg.KeyFile, nil))
}

 

http에 등록해두었던 두개의 함수 call back 함수로 등록해주고 ListenAndServeTLS로 동작시킵니다.

 

ListenAndServeTLS는 HTTPS를 매우 간단하게 적용할 수 있는 함수로, incoming connection을 listen하고 HTTP response를 만들어내는 HTTPS서버를 만들때 사용됩니다. 

** http.ListenAndServeTLS(Host_IP:Host_Port, CertFile, KeyFile) 형식

** https://www.techieindoor.com/go-http-listenandservetls-function-in-go/

 

더보기

------------------------------------------------------

 

<TLS란 (Transport Layer Security)란>

 

인터넷 상의 커뮤니케이션을 위한 개인정보와 데이터보안을 용이하게하기 위해 설계되어 널리 채택된 보안 프로토콜입니다. Netscape가 개발한 SSL(Secure Sockets Layer)라는 이전의 암호화 프로토콜에서 발전한 것으로 SSL 3.1이 TLS1.0이며, 용어는 혼용되기도 합니다. HTTPS는 HTTP프로토콜 상위에서 TLS암호화를 구현한 것으로, HTTPS를 사용한 웹사이트는 TLS 암호화를 이용합니다.

 

<SSL 프로토콜 certificate 과정>

 

1. 기본 개념
- 클라이언트와 서버가 주고받는 실제 정보는 대칭키 방식으로 암호화합니다. 
** 대칭키 방식 : 동일한 키(public key)로 암호화와 복호화를 같이 할 수 있는 방식으로, 빠르지만 위험합니다.

 

- 위와 같은 방식 진행할 대칭키를 공개키 방식으로 암호화합니다.

** 공개키(비대칭키) 방식 : A키(public key)로 암호화를 하면 B키(private key)로 복호화(ex. RSA)하거나 , B키를 가지고 암호화를 하면 A키로 복호화하는 방식(ex. 전자서명)입니다. 상대적으로 안전하지만 느립니다.

 

-  CA(Certificate authority) 혹은 Root Certificate
CA가 발급된 인증서를 이용하면 클라이언트가 접속한 서버가 의도한 안전한 서버가 맞는지를 보장할 수 있습니다. 인증서는 아래와 같이 여러가지 경우가 있으며, 공인 CA 인증서의 경우 CA가 private key를 가지고 암호화 한 것을 브라우저는 이미 CA의 public key를 가지고 있어 복호화할 수 있습니다.

  • 공인CA 인증서 : 유명한 사이트들이 주로 사용하며, 유료입니다.
  • 사설CA 인증서 : 안전하지 않다는 경고 메시지가 뜨며, keytool과 openssl등을 활용하면 무료입니다.
  • 자체서명된 인증서 : CA없이 발급된 인증서이며, 인증서 자체가 CA역할을 합니다. 보안이 약하지만 테스트에 사용합니다.

2. 인증 절차 : Handshake 

 

<그림넣기>

 

(1) client hello : (브라우저는) 랜덤데이터와 "클라이언트가 지원가능한 암호화 방식"을 보냅니다.
(2) server hello : (서버는) 랜덤데이터와 "후보중 서버가 선택한 암호화 방식", 그리고 인증서를 보냅니다.

** 인증서(SSL Certificate) :  CA의 정보, 서비스의 도메인, 서버가 발행한 공개키 등의 정보가 들어있으며, 이 인증서는 CA에서 이미 private 키로 암호화되어있습니다.
(3) (브라우저는) CA의 public key를 가지고 있으므로 CA인증서를 확인해보고 맞으면 신뢰하기로 합니다. 
(4)(5) (브라우저는) 위에서 주고 받은 두 랜덤데이터를 조합해 대칭키를 생성합니다. 그리고 이 대칭키서버가 발행한 공개키로 암호화해 보냅니다.
(6) (서버는) 받은 대칭키서버가 발행한 private key로 복호화해서 확인합니다. 이 대칭키는 임시키라고 하며, 이후에 Session Key라는 대칭키를 만들어내는데 활용합니다. 

 

3. 세션 통신

위에서 만든 Session Key를 활용해 암호화 복호화를 통해 서버와 클라이언트가 통신합니다.

 

<해당 소스에서 사용하는 mkcert(맥서트)>

 

인증서의 종류에는 PKCS12(업계표준)과 JKS(Java에서 활용) 등이 있는데, 현재 서버인 로컬을 인증된 발급기관(CA)로 추가하고 PKCS12 사설 인증서를 발급하는 것이 openssl, mkcert입니다. 이런 로컬 CA는 로컬에서만 신뢰할 수 있으므로 해당 소스에서는 로컬 내에서만 인증이 가능합니다.

 

설치는 아래와 같이 합니다.

sudo apt install libnss3-tools
curl -JLO "https://dl.filippo.io/mkcert/latest?for=linux/amd64"
chmod +x mkcert-v*-linux-amd64
cp mkcert-v*-linux-amd64 /usr/local/bin/mkcert

 

그럼이제 mkcert를 실행해서 만들어보겠습니다.

mkdir -p configs/certs
mkcert -key-file configs/certs/key.pem -cert-file configs/certs/cert.pem  localhost 127.0.0.1 ::1 0.0.0.0

 

이렇게 만들어진 *.cert는 공개해도 되는 (공개키, 인증기관의 서명을 포함하고 있는 인증서)이고, *.key는 공개해서는 안되는 privacy key입니다.

 

위 명령어를 통해 localhost, 127.0.0.1(IPv4)  , ::1(IPv6) 0.0.0.0(IPv4)에서 사용할 수 있는 인증서가 만들어집니다.

------------------------------------------------------

 

객체지향이다 보니 조금 복잡하죠..? 그럼 동작 순서를 정리해보겠습니다.

  • / 요청  로컬의 ./web위치를 참조해 HTTP의 response로 index.html를 제공
  • /api/turn 요청 → WebSocketServer.handleTurnServerRequstWebSocketServer.handleTurnServersignaler.HandleTurnServerCredendentials → http Request를 받아 http response를 return 합니다. return 내용은 turn서버의 정보를 제공합니다.
  • /ws 요청 → WebSocketServer.handleWebSocketRequestWebSocketServer.handleWebSocketsignaler.HandleNewWebSocket → http Request를 받아 websocket connection을 return 합니다. 어떤 request인지에 따라 다양한 response를 제공 가능합니다.

이제 서버를 실행해보겠습니다.

go run cmd/server/main.go

 

그럼 서버를 실행했으니 Flutter에서 turn 서버를 등록하고 시그널링 서버를 통해 peer의 정보를 받아 등록하는 과정을 살펴보겠습니다. 해당 소스코드는 관련 링크(https://github.com/flutter-webrtc/flutter-webrtc-demo)를 참조하시면 좋습니다.

 

먼저 main 함수를 lib/main.dart 에서 살펴보면 call_sample로 이동하는데 이부분이 우리가 살펴볼 WebRTC를 통한 P2P 통신을 만드는 과정입니다. 해당 코드는 lib/src/call_sample/call_sample.dart 를 참조하면 되는데, 아래와 같이 State를 초기화하는 과정에서 _connect라는 함수를 부르고 이때 Signaling 서버를 등록하는 것을 볼 수 있습니다.

  @override
  initState() {
    super.initState();
    initRenderers();
    _connect(context);
  }
  
  void _connect(BuildContext context) async {
    _signaling ??= Signaling(widget.host, context)..connect();
    
    ...
  }

 

그럼 이제 Signaling서버를 등록하는 과정을 lib/src/call_sample/signaling.dart 에서 살펴보겠습니다. 위와 같이 Signaling 객체를 만들면서 connect 함수를 실행했으므로 해당함수를 아래와 같이 살펴보겠습니다.

Future<void> connect() async {
    var url = 'https://$_host:$_port/ws';
    _socket = SimpleWebSocket(url);

    print('connect to $url');

    if (_turnCredential == null) {
      try {
        _turnCredential = await getTurnCredential(_host, _port);
        /*{
            "username": "1584195784:mbzrxpgjys",
            "password": "isyl6FF6nqMTB9/ig5MrMRUXqZg",
            "ttl": 86400,
            "uris": ["turn:127.0.0.1:19302?transport=udp"]
          }
        */
        _iceServers = {
          'iceServers': [
            {
              'urls': _turnCredential['uris'][0],
              'username': _turnCredential['username'],
              'credential': _turnCredential['password']
            },
          ]
        };
      } catch (e) {}
    }

    _socket?.onOpen = () {
      print('onOpen');
      onSignalingStateChange?.call(SignalingState.ConnectionOpen);
      _send('new', {
        'name': DeviceInfo.label,
        'id': _selfId,
        'user_agent': DeviceInfo.userAgent
      });
    };

    _socket?.onMessage = (message) {
      print('Received data: ' + message);
      onMessage(_decoder.convert(message));
    };

    _socket?.onClose = (int? code, String? reason) {
      print('Closed by server [$code => $reason]!');
      onSignalingStateChange?.call(SignalingState.ConnectionClosed);
    };

    await _socket?.connect();
  }

 

위에서 순서대로 살펴보면, websocket 연결을 형성한 다음에 turnCredential 정보를 서버에 요청합니다. 그럼 우리가 위에서 만든 것과 같이 turnCredential정보를 받을 수 있을 것이고, 이정보를 통해 _iceServers를 만들어 놓습니다. 이는 이후에 Turn, Stun서버로 활용하게 됩니다. 아래는 서버에서 제공받은 정보의 예시입니다.

{
 username : 1234567:flutter-webrtc,
 password: abcd/w389n2h3nf98h239f,
 ttl:86400,
 uris:[turn:127.0.0.1:19302?transport=udp]
}

 

그 다음 위에서 만든 웹소켓 객체에 대해 onOpen, onMessage, onClose등의 callback함수들을 등록하고 connect()함수를 통해 연결합니다.

 

이 과정에서 소켓의 onOpen 콜백이 실행되며 "new" request를 보내고, 우리가 서버에서 정의한 바와 같이 "peers"가 답장이 오면 이때 온 peers들을 초기화한 상태로 서버에 연결된 peer의 리스트를 저장해 둡니다. 아래는 서버에 요청한 request의 예시입니다.

{
 "type" : "new"
 "data" : {"name":"Flutter Web", "id":"12345", "user_agent":"flutter-webrtc/web-plugin 0.0.1 (Mozilla/5.0 (Macintosh; Intel Mac OS X 1.1.1) AppleWebKit/123.45 (KHTML, like Gecko) Chrome/12.0.123 Safari/123.12)"}
}

 

다음으로 서버에서 제공받은 정보의 예시입니다.

{
 "type" : "peers",
 "data" : [
    {"id":"12345", "name":"Flutter Web", "user_agent":"flutter-webrtc/web-plugin 0.0.1 (Mozilla/5.0 (Macintosh; Intel Mac OS X 1.1.1) AppleWebKit/123.45 (KHTML, like Gecko) Chrome/12.0.123 Safari/123.12)"}
 ]
}

 

이후 peers의 정보 중에 유저가 등록된 아이콘을 통해 연결을 누르면 lib/src/call_sample/call_sample.dart 내용과 같이 _invitePeer을 하며 lib/src/call_sample/signaling.dart 내용과 같이 signaling 서버가 _createSession을 통해 세션을 만드는것을 진행합니다.

 

** STUN서버의 기능은 구현하지 않은 것으로 보이는데, HTTPS의 인증서가 local에만 한정되기 때문에 필요가 없어 소스코드의 내용에 포함시키지 않은 것 같습니다.



https://github.com/grpc/grpc/blob/master/examples/python/helloworld/greeter_server.py

https://inspirit941.tistory.com/346

https://datatracker.ietf.org/doc/html/rfc4566

https://github.com/flutter-webrtc/flutter-webrtc-server

 

728x90
반응형

'Developers 공간 [Basic] > Backend' 카테고리의 다른 글

[Docker] 컨테이너의 기초와 운영  (0) 2023.04.16