3 #include <kafka/Project.h>
5 #include <kafka/ProducerRecord.h>
6 #include <kafka/RdKafkaHelper.h>
7 #include <kafka/Timestamp.h>
8 #include <kafka/Types.h>
10 #include <librdkafka/rdkafka.h>
16 namespace KAFKA_API {
namespace clients {
namespace producer {
24 enum class PersistedStatus { Not, Possibly, Done };
32 : _rkmsg(rkmsg), _recordId(
recordId) {}
38 const auto offsetOption = another.
offset();
39 _cachedInfo = std::make_unique<CachedInfo>(another.
topic(),
41 offsetOption ? *offsetOption : RD_KAFKA_OFFSET_INVALID,
46 _recordId = another._recordId;
58 return _rkmsg ? (_rkmsg->rkt ? rd_kafka_topic_name(_rkmsg->rkt) :
"") : _cachedInfo->topic;
66 return _rkmsg ? _rkmsg->partition : _cachedInfo->partition;
74 auto offset = _rkmsg ? _rkmsg->offset : _cachedInfo->offset;
75 return (
offset != RD_KAFKA_OFFSET_INVALID) ? Optional<Offset>(
offset) : Optional<Offset>();
91 return _rkmsg ? _rkmsg->key_len : _cachedInfo->keySize;
99 return _rkmsg ? _rkmsg->len : _cachedInfo->valueSize;
107 return _rkmsg ? getMsgTimestamp(_rkmsg) : _cachedInfo->timestamp;
115 return _rkmsg ? getMsgPersistedStatus(_rkmsg) : _cachedInfo->persistedStatus;
118 std::string persistedStatusString()
const
123 std::string toString()
const
125 const auto offsetOption =
offset();
126 const auto recordIdOption =
recordId();
128 return topic() +
"-" + std::to_string(
partition()) +
"@" + (offsetOption ? std::to_string(*offsetOption) :
"NA")
129 + (recordIdOption ? (
":id[" + std::to_string(*recordIdOption) +
"],") :
",")
130 +
timestamp().toString() +
"," + persistedStatusString();
134 static Timestamp getMsgTimestamp(
const rd_kafka_message_t* rkmsg)
136 rd_kafka_timestamp_type_t tstype{};
137 const Timestamp::Value tsValue = rd_kafka_message_timestamp(rkmsg, &tstype);
138 return {tsValue, tstype};
141 static PersistedStatus getMsgPersistedStatus(
const rd_kafka_message_t* rkmsg)
143 const rd_kafka_msg_status_t status = rd_kafka_message_status(rkmsg);
144 return status == RD_KAFKA_MSG_STATUS_NOT_PERSISTED ? PersistedStatus::Not : (status == RD_KAFKA_MSG_STATUS_PERSISTED ? PersistedStatus::Done : PersistedStatus::Possibly);
147 static std::string getPersistedStatusString(PersistedStatus status)
149 return status == PersistedStatus::Not ?
"NotPersisted" :
150 (status == PersistedStatus::Done ?
"Persisted" :
"PossiblyPersisted");
155 CachedInfo(Topic t, Partition p, Offset o, KeySize ks, ValueSize vs, Timestamp ts, PersistedStatus pst)
156 :
topic(std::move(t)),
166 CachedInfo(
const CachedInfo&) =
default;
177 std::unique_ptr<CachedInfo> _cachedInfo;
178 const rd_kafka_message_t* _rkmsg =
nullptr;
179 Optional<ProducerRecord::Id> _recordId;
186 using Callback = std::function<void(
const RecordMetadata& metadata,
const Error& error)>;
The time point together with the type.
Definition: Timestamp.h:21