3 #include <kafka/Project.h>
5 #include <kafka/KafkaClient.h>
6 #include <kafka/Types.h>
11 namespace KAFKA_API {
namespace clients {
namespace consumer {
17 bool empty()
const {
return data.empty(); }
18 std::size_t size()
const {
return data.size(); }
20 const T& front()
const {
return data[0]; }
26 for (std::size_t indexCurrent = data.size() - 1; indexCurrent > 0;)
28 const std::size_t indexParent = (indexCurrent + 1) / 2 - 1;
30 if (!(data[indexCurrent] < data[indexParent]))
return;
32 std::swap(data[indexCurrent], data[indexParent]);
33 indexCurrent = indexParent;
39 data[0] = data.back();
42 if (data.empty())
return;
44 for (std::size_t indexCurrent = 0;;)
46 const std::size_t indexRightChild = (indexCurrent + 1) * 2;
47 const std::size_t indexLeftChild = indexRightChild - 1;
49 if (indexLeftChild >= data.size())
return;
51 const std::size_t indexMinChild = (indexRightChild >= data.size() || data[indexLeftChild] < data[indexRightChild]) ? indexLeftChild : indexRightChild;
53 if (!(data[indexMinChild] < data[indexCurrent]))
return;
55 std::swap(data[indexCurrent], data[indexMinChild]);
56 indexCurrent = indexMinChild;
78 : _partitionInfo(std::string(
"topic[").append(topic).append(
"], paritition[").append(std::to_string(partition)).append(
"]"))
86 std::size_t
size()
const {
return _offsetsReceived.size(); }
94 if (offset < 0 || (!_offsetsReceived.empty() && offset <= _offsetsReceived.back()))
97 KAFKA_API_LOG(Log::Level::Err,
"Got invalid offset to wait[%lld]! %s", offset, (_partitionInfo.empty() ?
"" : _partitionInfo.c_str()));
101 _offsetsReceived.emplace_back(offset);
110 const Offset maxOffsetReceived = _offsetsReceived.back();
111 if (offset > maxOffsetReceived)
114 KAFKA_API_LOG(Log::Level::Err,
"Got invalid ack offset[%lld]! Even larger than all offsets received[%lld]! %s", offset, maxOffsetReceived, (_partitionInfo.empty() ?
"" : _partitionInfo.c_str()));
117 _offsetsToCommit.push(offset);
120 const Offset minOffsetToCommit = _offsetsToCommit.front();
121 const Offset expectedOffset = _offsetsReceived.front();
122 if (minOffsetToCommit == expectedOffset)
124 _toCommit = expectedOffset + 1;
125 _offsetsToCommit.pop_front();
126 _offsetsReceived.pop_front();
128 else if (minOffsetToCommit < expectedOffset)
131 KAFKA_API_LOG(Log::Level::Err,
"Got invalid ack offset[%lld]! Even smaller than expected[%lld]! %s", minOffsetToCommit, expectedOffset, (_partitionInfo.empty() ?
"" : _partitionInfo.c_str()));
132 _offsetsToCommit.pop_front();
138 }
while (!_offsetsToCommit.empty());
146 Optional<Offset> ret;
147 if (_committed != _toCommit)
149 ret = _committed = _toCommit;
159 Optional<Offset> ret;
160 if (_committed != INVALID_OFFSET)
168 std::deque<Offset> _offsetsReceived;
169 Heap<Offset> _offsetsToCommit;
170 Offset _toCommit = {INVALID_OFFSET};
171 Offset _committed = {INVALID_OFFSET};
172 std::string _partitionInfo;
174 static constexpr Offset INVALID_OFFSET = -1;
The queue can be used to determine the right offset to commit.
Definition: UnorderedOffsetCommitQueue.h:75
void ackOffset(Offset offset)
Ack the record has been handled and ready to be committed.
Definition: UnorderedOffsetCommitQueue.h:108
void waitOffset(Offset offset)
Add an offset (for a ConsumerRecord) to the waiting list, until it being acked (with ackOffset).
Definition: UnorderedOffsetCommitQueue.h:92
std::size_t size() const
Return how many received offsets have not been popped to commit (with popOffsetToCommit()).
Definition: UnorderedOffsetCommitQueue.h:86
Optional< Offset > popOffsetToCommit()
Pop the offset which is ready for the consumer (if any).
Definition: UnorderedOffsetCommitQueue.h:144
Optional< Offset > lastPoppedOffset()
Return the offset last popped.
Definition: UnorderedOffsetCommitQueue.h:157