Kafka 를 사용하다보면 __TypeId__ 이라는 Header 를 마주하는 경우가 있다.

이 Header 를 보았다면 아마도 JsonSerializer 를 사용하는 Kafka Producer 에서 생성된 메세지의 Header 를 보았을 가능성이 매우 높다.

 

이 __TypeId__ 라는 헤더는 메세지를 보낼 때의 데이터 객체 타입 이름을 가지고 있다. 아래처럼 Value 타입이 Object 인 JsonSerializer 를 사용하는 Kafka Producer 에서 send 메서드를 통해 메세지를 보낼 때, 메세지 파라미터의 데이터 타입에 따라 해당 데이터의 타입 이름이 들어가게 된다.

@Override
public ListenableFuture<SendResult<String, Object>> sendQueue(@NonNull final PostKafkaProxyParam params) {
    return kafkaObjectToJsonTemplate.send(params.getTopic(), "asdfvasdfv"); // java.lang.String
}
@Override
public ListenableFuture<SendResult<String, Object>> sendQueue(@NonNull final PostKafkaProxyParam params) {
    HashMap<String, Object> obj = new Gson().fromJson(params.getMessage(), new TypeToken<HashMap<String, Object>>(){}.getType());
    return kafkaObjectToJsonTemplate.send(params.getTopic(), obj); // java.util.HashMap
}

 

메세지를 보내는 과정에서 DefaultJackson2JavaTypeMapper 클래스의 fromJavaType 이라는 메서드를 호출하게 되는데 이를 잘 살펴보면 해당 Header 에 값을 넣는 코드를 볼 수 있다.

HashMap 객체를 전달하면 java.util.HashMap 이라는 값으로 저장된다.

그리고 아래와 같이 바이트로 Header 에 저장한 후 메세지를 보내게 된다.

{
    "headers": {
        "__ContentTypeId__": "amF2YS5sYW5nLk9iamVjdA==",
        "b3": "MWNhODkwY2I0MDZkZjAzNC04NTEzNzI0MmMzNzhkMDQ0LTE=",
        "__KeyTypeId__": "amF2YS5sYW5nLk9iamVjdA==",
        "__TypeId__": "amF2YS51dGlsLkhhc2hNYXA="
    }
}

 

메세지가 나중에 Consumer 에서 JsonDeserializer 를 통해 Header 를 읽을 때는 AbstractJavaTypeMapper 라는 클래스에서 __TypeId__ 이라는 Header 가 있는지를 확인한다.

만약 여기서 위의 Header 가 존재하면 바이트를 String 으로 변환해서 어떤 데이터 타입의 클래스인지 확인하게 된다.

그 후, Consumer 으로부터 전달받은 메세지에서 위에서 확인한 데이터 타입의 클래스로 Deserialize 를 시도하게 된다.

JsonDeserializer 에서는 기본적으로 Header 안에 있는 __TypeId__ 를 읽도록 시도한다.

 

이곳에서 어떤 데이터 타입인지 확인한 후 메세지 역직렬화를 시도하게 된다.