Modern C++ Kafka API
KafkaRecoverableProducer.h
1 #pragma once
2 
3 #include <kafka/Project.h>
4 
5 #include <kafka/KafkaProducer.h>
6 
7 #include <atomic>
8 #include <memory>
9 #include <mutex>
10 #include <thread>
11 
12 
13 namespace KAFKA_API { namespace clients { namespace producer {
14 
15 class KafkaRecoverableProducer
16 {
17 public:
18  explicit KafkaRecoverableProducer(const Properties& properties)
19  : _properties(properties), _running(true)
20  {
21  _properties.put(Config::ENABLE_MANUAL_EVENTS_POLL, "true");
22  _properties.put(Config::ERROR_CB, [this](const Error& error) { if (error.isFatal()) _fatalError = std::make_unique<Error>(error); });
23 
24  _producer = createProducer();
25 
26  _pollThread = std::thread([this]() { keepPolling(); });
27  }
28 
29  ~KafkaRecoverableProducer()
30  {
31  if (_running) close();
32  }
33 
37  const std::string& clientId() const
38  {
39  const std::lock_guard<std::mutex> lock(_producerMutex);
40 
41  return _producer->clientId();
42  }
43 
47  const std::string& name() const
48  {
49  const std::lock_guard<std::mutex> lock(_producerMutex);
50 
51  return _producer->name();
52  }
53 
57  void setLogLevel(int level)
58  {
59  const std::lock_guard<std::mutex> lock(_producerMutex);
60 
61  _properties.put(Config::LOG_LEVEL, std::to_string(level));
62  _producer->setLogLevel(level);
63  }
64 
68  const Properties& properties() const
69  {
70  const std::lock_guard<std::mutex> lock(_producerMutex);
71 
72  return _producer->properties();
73  }
74 
78  Optional<std::string> getProperty(const std::string& name) const
79  {
80  const std::lock_guard<std::mutex> lock(_producerMutex);
81 
82  return _producer->getProperty(name);
83  }
84 
89  Optional<BrokerMetadata> fetchBrokerMetadata(const std::string& topic,
90  std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaClient::DEFAULT_METADATA_TIMEOUT_MS),
91  bool disableErrorLogging = false)
92  {
93  const std::lock_guard<std::mutex> lock(_producerMutex);
94 
95  return _producer->fetchBrokerMetadata(topic, timeout, disableErrorLogging);
96  }
97 
104  Error flush(std::chrono::milliseconds timeout = InfiniteTimeout)
105  {
106  const std::lock_guard<std::mutex> lock(_producerMutex);
107 
108  return _producer->flush(timeout);
109  }
110 
114  Error purge()
115  {
116  const std::lock_guard<std::mutex> lock(_producerMutex);
117 
118  return _producer->purge();
119  }
120 
124  void close(std::chrono::milliseconds timeout = InfiniteTimeout)
125  {
126  const std::lock_guard<std::mutex> lock(_producerMutex);
127 
128  _running = false;
129  if (_pollThread.joinable()) _pollThread.join();
130 
131  _producer->close(timeout);
132  }
133 
145  producer::RecordMetadata syncSend(const producer::ProducerRecord& record)
146  {
147  const std::lock_guard<std::mutex> lock(_producerMutex);
148 
149  return _producer->syncSend(record);
150  }
151 
170  void send(const producer::ProducerRecord& record,
171  const producer::Callback& deliveryCb,
172  KafkaProducer::SendOption option = KafkaProducer::SendOption::NoCopyRecordValue,
173  KafkaProducer::ActionWhileQueueIsFull action = KafkaProducer::ActionWhileQueueIsFull::Block)
174  {
175  const std::lock_guard<std::mutex> lock(_producerMutex);
176 
177  _producer->send(record, deliveryCb, option, action);
178  }
179 
199  void send(const producer::ProducerRecord& record,
200  const producer::Callback& deliveryCb,
201  Error& error,
202  KafkaProducer::SendOption option = KafkaProducer::SendOption::NoCopyRecordValue,
203  KafkaProducer::ActionWhileQueueIsFull action = KafkaProducer::ActionWhileQueueIsFull::Block)
204  {
205  const std::lock_guard<std::mutex> lock(_producerMutex);
206 
207  _producer->send(record, deliveryCb, error, option, action);
208  }
209 
213  void initTransactions(std::chrono::milliseconds timeout = InfiniteTimeout)
214  {
215  const std::lock_guard<std::mutex> lock(_producerMutex);
216 
217  _producer->initTransactions(timeout);
218  }
219 
223  void beginTransaction()
224  {
225  const std::lock_guard<std::mutex> lock(_producerMutex);
226 
227  _producer->beginTransaction();
228  }
229 
233  void commitTransaction(std::chrono::milliseconds timeout = InfiniteTimeout)
234  {
235  const std::lock_guard<std::mutex> lock(_producerMutex);
236 
237  _producer->commitTransaction(timeout);
238  }
239 
243  void abortTransaction(std::chrono::milliseconds timeout = InfiniteTimeout)
244  {
245  const std::lock_guard<std::mutex> lock(_producerMutex);
246 
247  _producer->abortTransaction(timeout);
248  }
249 
253  void sendOffsetsToTransaction(const TopicPartitionOffsets& topicPartitionOffsets,
254  const consumer::ConsumerGroupMetadata& groupMetadata,
255  std::chrono::milliseconds timeout)
256  {
257  const std::lock_guard<std::mutex> lock(_producerMutex);
258 
259  _producer->sendOffsetsToTransaction(topicPartitionOffsets, groupMetadata, timeout);
260  }
261 
262 #ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS
263  void mockFatalError()
264  {
265  _fatalError = std::make_unique<Error>(RD_KAFKA_RESP_ERR__FATAL, "fake fatal error", true);
266  }
267 #endif
268 
269 private:
270  void keepPolling()
271  {
272  while (_running)
273  {
274  _producer->pollEvents(std::chrono::milliseconds(1));
275  if (_fatalError)
276  {
277  const std::string errStr = _fatalError->toString();
278  KAFKA_API_LOG(Log::Level::Notice, "met fatal error[%s], will re-initialize the internal producer", errStr.c_str());
279 
280  const std::lock_guard<std::mutex> lock(_producerMutex);
281 
282  if (!_running) return;
283 
284  _producer->purge();
285  _producer->close();
286 
287  _fatalError.reset();
288 
289  _producer = createProducer();
290  }
291  }
292  }
293 
294  std::unique_ptr<KafkaProducer> createProducer()
295  {
296  return std::make_unique<KafkaProducer>(_properties);
297  }
298 
299  // Configurations for producer
300  Properties _properties;
301 
302  std::unique_ptr<Error> _fatalError;
303 
304  std::atomic<bool> _running;
305  std::thread _pollThread;
306 
307  mutable std::mutex _producerMutex;
308  std::unique_ptr<KafkaProducer> _producer;
309 };
310 
311 } } } // end of KAFKA_API::clients::producer
312 
static constexpr const char * ERROR_CB
Log callback.
Definition: ClientConfig.h:38
static constexpr const char * ENABLE_MANUAL_EVENTS_POLL
To poll the events manually (otherwise, it would be done with a background polling thread).
Definition: ClientConfig.h:26
static constexpr const char * LOG_LEVEL
Log level (syslog(3) levels).
Definition: ClientConfig.h:72
ActionWhileQueueIsFull
Choose the action while the sending buffer is full.
Definition: KafkaProducer.h:71
SendOption
Options for sending messages.
Definition: KafkaProducer.h:66