3 #include <kafka/Project.h>
5 #include <kafka/Error.h>
6 #include <kafka/Header.h>
7 #include <kafka/Timestamp.h>
8 #include <kafka/Types.h>
10 #include <librdkafka/rdkafka.h>
15 namespace KAFKA_API {
namespace clients {
namespace consumer {
25 explicit ConsumerRecord(rd_kafka_message_t* msg): _rk_msg(msg, rd_kafka_message_destroy) {}
30 Topic
topic()
const {
return _rk_msg->rkt ? rd_kafka_topic_name(_rk_msg->rkt):
""; }
35 Partition
partition()
const {
return _rk_msg->partition; }
40 Offset
offset()
const {
return _rk_msg->offset; }
45 Key
key()
const {
return Key(_rk_msg->key, _rk_msg->key_len); }
50 Value
value()
const {
return Value(_rk_msg->payload, _rk_msg->len); }
57 rd_kafka_timestamp_type_t tstype{};
58 const Timestamp::Value tsValue = rd_kafka_message_timestamp(_rk_msg.get(), &tstype);
59 return {tsValue, tstype};
90 using rd_kafka_message_shared_ptr = std::shared_ptr<rd_kafka_message_t>;
91 rd_kafka_message_shared_ptr _rk_msg;
99 rd_kafka_headers_t* hdrs =
nullptr;
100 if (rd_kafka_message_headers(_rk_msg.get(), &hdrs) != RD_KAFKA_RESP_ERR_NO_ERROR)
105 headers.reserve(rd_kafka_header_cnt(hdrs));
107 const char* name =
nullptr;
108 const void* valuePtr =
nullptr;
109 std::size_t valueSize = 0;
110 for (std::size_t i = 0; !rd_kafka_header_get_all(hdrs, i, &name, &valuePtr, &valueSize); i++)
112 headers.emplace_back(name, Header::Value(valuePtr, valueSize));
121 rd_kafka_headers_t* hdrs =
nullptr;
122 if (rd_kafka_message_headers(_rk_msg.get(), &hdrs) != RD_KAFKA_RESP_ERR_NO_ERROR)
124 return Header::Value();
127 const void* valuePtr =
nullptr;
128 std::size_t valueSize = 0;
129 return (rd_kafka_header_get_last(hdrs,
key.c_str(), &valuePtr, &valueSize) == RD_KAFKA_RESP_ERR_NO_ERROR) ?
130 Header::Value(valuePtr, valueSize) : Header::Value();
136 std::ostringstream oss;
140 << (
key().size() ? (
key().toString() +
"/") :
"") <<
value().toString();
142 else if (
error().
value() == RD_KAFKA_RESP_ERR__PARTITION_EOF)
Unified error type.
Definition: Error.h:32
std::string message() const
Readable error string.
Definition: Error.h:89
A key/value pair to be received from Kafka.
Definition: ConsumerRecord.h:22
Value value() const
The value.
Definition: ConsumerRecord.h:50
Offset offset() const
The position of this record in the corresponding Kafka partition.
Definition: ConsumerRecord.h:40
Partition partition() const
The partition from which this record is received.
Definition: ConsumerRecord.h:35
Key key() const
The key (or null if no key is specified).
Definition: ConsumerRecord.h:45
Topic topic() const
The topic this record is received from.
Definition: ConsumerRecord.h:30
std::string toString() const
Obtains explanatory string.
Definition: ConsumerRecord.h:134
Headers headers() const
The headers of the record.
Definition: ConsumerRecord.h:95
Timestamp timestamp() const
The timestamp of the record.
Definition: ConsumerRecord.h:55
Error error() const
The error.
Definition: ConsumerRecord.h:82
Header::Value lastHeaderValue(const Header::Key &key)
Return just one (the very last) header's value for the given key.
Definition: ConsumerRecord.h:119
The time point together with the type.
Definition: Timestamp.h:21