Modern C++ Kafka API
ConsumerRecord.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/Error.h>
6 #include <kafka/Header.h>
7 #include <kafka/Timestamp.h>
8 #include <kafka/Types.h>
9 
10 #include <librdkafka/rdkafka.h>
11 
12 #include <sstream>
13 
14 
15 namespace KAFKA_API { namespace clients { namespace consumer {
16 
22 {
23 public:
24  // ConsumerRecord will take the ownership of msg (rd_kafka_message_t*)
25  explicit ConsumerRecord(rd_kafka_message_t* msg): _rk_msg(msg, rd_kafka_message_destroy) {}
26 
30  Topic topic() const { return _rk_msg->rkt ? rd_kafka_topic_name(_rk_msg->rkt): ""; }
31 
35  Partition partition() const { return _rk_msg->partition; }
36 
40  Offset offset() const { return _rk_msg->offset; }
41 
45  Key key() const { return Key(_rk_msg->key, _rk_msg->key_len); }
46 
50  Value value() const { return Value(_rk_msg->payload, _rk_msg->len); }
51 
56  {
57  rd_kafka_timestamp_type_t tstype{};
58  const Timestamp::Value tsValue = rd_kafka_message_timestamp(_rk_msg.get(), &tstype);
59  return {tsValue, tstype};
60  }
61 
65  Headers headers() const;
66 
70  Header::Value lastHeaderValue(const Header::Key& key);
71 
82  Error error() const { return Error{_rk_msg->err}; }
83 
87  std::string toString() const;
88 
89 private:
90  using rd_kafka_message_shared_ptr = std::shared_ptr<rd_kafka_message_t>;
91  rd_kafka_message_shared_ptr _rk_msg;
92 };
93 
94 inline Headers
96 {
97  Headers headers;
98 
99  rd_kafka_headers_t* hdrs = nullptr;
100  if (rd_kafka_message_headers(_rk_msg.get(), &hdrs) != RD_KAFKA_RESP_ERR_NO_ERROR)
101  {
102  return headers;
103  }
104 
105  headers.reserve(rd_kafka_header_cnt(hdrs));
106 
107  const char* name = nullptr;
108  const void* valuePtr = nullptr;
109  std::size_t valueSize = 0;
110  for (std::size_t i = 0; !rd_kafka_header_get_all(hdrs, i, &name, &valuePtr, &valueSize); i++)
111  {
112  headers.emplace_back(name, Header::Value(valuePtr, valueSize));
113  }
114 
115  return headers;
116 }
117 
118 inline Header::Value
119 ConsumerRecord::lastHeaderValue(const Header::Key& key)
120 {
121  rd_kafka_headers_t* hdrs = nullptr;
122  if (rd_kafka_message_headers(_rk_msg.get(), &hdrs) != RD_KAFKA_RESP_ERR_NO_ERROR)
123  {
124  return Header::Value();
125  }
126 
127  const void* valuePtr = nullptr;
128  std::size_t valueSize = 0;
129  return (rd_kafka_header_get_last(hdrs, key.c_str(), &valuePtr, &valueSize) == RD_KAFKA_RESP_ERR_NO_ERROR) ?
130  Header::Value(valuePtr, valueSize) : Header::Value();
131 }
132 
133 inline std::string
135 {
136  std::ostringstream oss;
137  if (!error())
138  {
139  oss << topic() << "-" << partition() << ":" << offset() << ", " << timestamp().toString() << ", "
140  << (key().size() ? (key().toString() + "/") : "") << value().toString();
141  }
142  else if (error().value() == RD_KAFKA_RESP_ERR__PARTITION_EOF)
143  {
144  oss << "EOF[" << topic() << "-" << partition() << ":" << offset() << "]";
145  }
146  else
147  {
148  oss << "ERROR[" << error().message() << ", " << topic() << "-" << partition() << ":" << offset() << "]";
149  }
150  return oss.str();
151 }
152 
153 } } } // end of KAFKA_API::clients::consumer
154 
Unified error type.
Definition: Error.h:32
std::string message() const
Readable error string.
Definition: Error.h:89
A key/value pair to be received from Kafka.
Definition: ConsumerRecord.h:22
Value value() const
The value.
Definition: ConsumerRecord.h:50
Offset offset() const
The position of this record in the corresponding Kafka partition.
Definition: ConsumerRecord.h:40
Partition partition() const
The partition from which this record is received.
Definition: ConsumerRecord.h:35
Key key() const
The key (or null if no key is specified).
Definition: ConsumerRecord.h:45
Topic topic() const
The topic this record is received from.
Definition: ConsumerRecord.h:30
std::string toString() const
Obtains explanatory string.
Definition: ConsumerRecord.h:134
Headers headers() const
The headers of the record.
Definition: ConsumerRecord.h:95
Timestamp timestamp() const
The timestamp of the record.
Definition: ConsumerRecord.h:55
Error error() const
The error.
Definition: ConsumerRecord.h:82
Header::Value lastHeaderValue(const Header::Key &key)
Return just one (the very last) header's value for the given key.
Definition: ConsumerRecord.h:119
The time point together with the type.
Definition: Timestamp.h:21