Modern C++ Kafka API
ProducerCommon.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/ProducerRecord.h>
6 #include <kafka/RdKafkaHelper.h>
7 #include <kafka/Timestamp.h>
8 #include <kafka/Types.h>
9 
10 #include <librdkafka/rdkafka.h>
11 
12 #include <functional>
13 #include <memory>
14 
15 
16 namespace KAFKA_API { namespace clients { namespace producer {
17 
22 {
23 public:
24  enum class PersistedStatus { Not, Possibly, Done };
25 
26  RecordMetadata() = default;
27 
28  RecordMetadata(const RecordMetadata& another) { *this = another; }
29 
30  // This is only called by the KafkaProducer::deliveryCallback (with a valid rkmsg pointer)
31  RecordMetadata(const rd_kafka_message_t* rkmsg, Optional<ProducerRecord::Id> recordId)
32  : _rkmsg(rkmsg), _recordId(recordId) {}
33 
34  RecordMetadata& operator=(const RecordMetadata& another)
35  {
36  if (this != &another)
37  {
38  const auto offsetOption = another.offset();
39  _cachedInfo = std::make_unique<CachedInfo>(another.topic(),
40  another.partition(),
41  offsetOption ? *offsetOption : RD_KAFKA_OFFSET_INVALID,
42  another.keySize(),
43  another.valueSize(),
44  another.timestamp(),
45  another.persistedStatus());
46  _recordId = another._recordId;
47  _rkmsg = nullptr;
48  }
49 
50  return *this;
51  }
52 
56  std::string topic() const
57  {
58  return _rkmsg ? (_rkmsg->rkt ? rd_kafka_topic_name(_rkmsg->rkt) : "") : _cachedInfo->topic;
59  }
60 
64  Partition partition() const
65  {
66  return _rkmsg ? _rkmsg->partition : _cachedInfo->partition;
67  }
68 
72  Optional<Offset> offset() const
73  {
74  auto offset = _rkmsg ? _rkmsg->offset : _cachedInfo->offset;
75  return (offset != RD_KAFKA_OFFSET_INVALID) ? Optional<Offset>(offset) : Optional<Offset>();
76  }
77 
81  Optional<ProducerRecord::Id> recordId() const
82  {
83  return _recordId;
84  }
85 
89  KeySize keySize() const
90  {
91  return _rkmsg ? _rkmsg->key_len : _cachedInfo->keySize;
92  }
93 
97  ValueSize valueSize() const
98  {
99  return _rkmsg ? _rkmsg->len : _cachedInfo->valueSize;
100  }
101 
106  {
107  return _rkmsg ? getMsgTimestamp(_rkmsg) : _cachedInfo->timestamp;
108  }
109 
113  PersistedStatus persistedStatus() const
114  {
115  return _rkmsg ? getMsgPersistedStatus(_rkmsg) : _cachedInfo->persistedStatus;
116  }
117 
118  std::string persistedStatusString() const
119  {
120  return getPersistedStatusString(persistedStatus());
121  }
122 
123  std::string toString() const
124  {
125  const auto offsetOption = offset();
126  const auto recordIdOption = recordId();
127 
128  return topic() + "-" + std::to_string(partition()) + "@" + (offsetOption ? std::to_string(*offsetOption) : "NA")
129  + (recordIdOption ? (":id[" + std::to_string(*recordIdOption) + "],") : ",")
130  + timestamp().toString() + "," + persistedStatusString();
131  }
132 
133 private:
134  static Timestamp getMsgTimestamp(const rd_kafka_message_t* rkmsg)
135  {
136  rd_kafka_timestamp_type_t tstype{};
137  const Timestamp::Value tsValue = rd_kafka_message_timestamp(rkmsg, &tstype);
138  return {tsValue, tstype};
139  }
140 
141  static PersistedStatus getMsgPersistedStatus(const rd_kafka_message_t* rkmsg)
142  {
143  const rd_kafka_msg_status_t status = rd_kafka_message_status(rkmsg);
144  return status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED ? PersistedStatus::Not : (status == RD_KAFKA_MSG_STATUS_PERSISTED ? PersistedStatus::Done : PersistedStatus::Possibly);
145  }
146 
147  static std::string getPersistedStatusString(PersistedStatus status)
148  {
149  return status == PersistedStatus::Not ? "NotPersisted" :
150  (status == PersistedStatus::Done ? "Persisted" : "PossiblyPersisted");
151  }
152 
153  struct CachedInfo
154  {
155  CachedInfo(Topic t, Partition p, Offset o, KeySize ks, ValueSize vs, Timestamp ts, PersistedStatus pst)
156  : topic(std::move(t)),
157  partition(p),
158  offset(o),
159  keySize(ks),
160  valueSize(vs),
161  timestamp(ts),
162  persistedStatus(pst)
163  {
164  }
165 
166  CachedInfo(const CachedInfo&) = default;
167 
168  std::string topic;
169  Partition partition;
170  Offset offset;
171  KeySize keySize;
172  ValueSize valueSize;
173  Timestamp timestamp;
174  PersistedStatus persistedStatus;
175  };
176 
177  std::unique_ptr<CachedInfo> _cachedInfo;
178  const rd_kafka_message_t* _rkmsg = nullptr;
179  Optional<ProducerRecord::Id> _recordId;
180 };
181 
186 using Callback = std::function<void(const RecordMetadata& metadata, const Error& error)>;
187 
188 } } } // end of KAFKA_API::clients::producer
189 
The metadata for a record that has been acknowledged by the server.
Definition: ProducerCommon.h:22
Optional< ProducerRecord::Id > recordId() const
The recordId could be used to identify the acknowledged message.
Definition: ProducerCommon.h:81
ValueSize valueSize() const
The size of the value in bytes.
Definition: ProducerCommon.h:97
Optional< Offset > offset() const
The offset of the record in the topic/partition.
Definition: ProducerCommon.h:72
PersistedStatus persistedStatus() const
The persisted status of the record.
Definition: ProducerCommon.h:113
Timestamp timestamp() const
The timestamp of the record in the topic/partition.
Definition: ProducerCommon.h:105
std::string topic() const
The topic the record was appended to.
Definition: ProducerCommon.h:56
KeySize keySize() const
The size of the key in bytes.
Definition: ProducerCommon.h:89
Partition partition() const
The partition the record was sent to.
Definition: ProducerCommon.h:64
The time point together with the type.
Definition: Timestamp.h:21