3 #include <kafka/Project.h>
5 #include <kafka/KafkaProducer.h>
13 namespace KAFKA_API {
namespace clients {
namespace producer {
15 class KafkaRecoverableProducer
18 explicit KafkaRecoverableProducer(
const Properties& properties)
19 : _properties(properties), _running(true)
22 _properties.put(
Config::ERROR_CB, [
this](
const Error& error) {
if (error.isFatal()) _fatalError = std::make_unique<Error>(error); });
24 _producer = createProducer();
26 _pollThread = std::thread([
this]() { keepPolling(); });
29 ~KafkaRecoverableProducer()
31 if (_running) close();
37 const std::string& clientId()
const
39 const std::lock_guard<std::mutex> lock(_producerMutex);
41 return _producer->clientId();
47 const std::string& name()
const
49 const std::lock_guard<std::mutex> lock(_producerMutex);
51 return _producer->name();
57 void setLogLevel(
int level)
59 const std::lock_guard<std::mutex> lock(_producerMutex);
62 _producer->setLogLevel(level);
68 const Properties& properties()
const
70 const std::lock_guard<std::mutex> lock(_producerMutex);
72 return _producer->properties();
78 Optional<std::string> getProperty(
const std::string& name)
const
80 const std::lock_guard<std::mutex> lock(_producerMutex);
82 return _producer->getProperty(name);
89 Optional<BrokerMetadata> fetchBrokerMetadata(
const std::string& topic,
90 std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaClient::DEFAULT_METADATA_TIMEOUT_MS),
91 bool disableErrorLogging =
false)
93 const std::lock_guard<std::mutex> lock(_producerMutex);
95 return _producer->fetchBrokerMetadata(topic, timeout, disableErrorLogging);
104 Error flush(std::chrono::milliseconds timeout = InfiniteTimeout)
106 const std::lock_guard<std::mutex> lock(_producerMutex);
108 return _producer->flush(timeout);
116 const std::lock_guard<std::mutex> lock(_producerMutex);
118 return _producer->purge();
124 void close(std::chrono::milliseconds timeout = InfiniteTimeout)
126 const std::lock_guard<std::mutex> lock(_producerMutex);
129 if (_pollThread.joinable()) _pollThread.join();
131 _producer->close(timeout);
145 producer::RecordMetadata syncSend(
const producer::ProducerRecord& record)
147 const std::lock_guard<std::mutex> lock(_producerMutex);
149 return _producer->syncSend(record);
170 void send(
const producer::ProducerRecord& record,
171 const producer::Callback& deliveryCb,
175 const std::lock_guard<std::mutex> lock(_producerMutex);
177 _producer->send(record, deliveryCb, option, action);
199 void send(
const producer::ProducerRecord& record,
200 const producer::Callback& deliveryCb,
205 const std::lock_guard<std::mutex> lock(_producerMutex);
207 _producer->send(record, deliveryCb, error, option, action);
213 void initTransactions(std::chrono::milliseconds timeout = InfiniteTimeout)
215 const std::lock_guard<std::mutex> lock(_producerMutex);
217 _producer->initTransactions(timeout);
223 void beginTransaction()
225 const std::lock_guard<std::mutex> lock(_producerMutex);
227 _producer->beginTransaction();
233 void commitTransaction(std::chrono::milliseconds timeout = InfiniteTimeout)
235 const std::lock_guard<std::mutex> lock(_producerMutex);
237 _producer->commitTransaction(timeout);
243 void abortTransaction(std::chrono::milliseconds timeout = InfiniteTimeout)
245 const std::lock_guard<std::mutex> lock(_producerMutex);
247 _producer->abortTransaction(timeout);
253 void sendOffsetsToTransaction(
const TopicPartitionOffsets& topicPartitionOffsets,
254 const consumer::ConsumerGroupMetadata& groupMetadata,
255 std::chrono::milliseconds timeout)
257 const std::lock_guard<std::mutex> lock(_producerMutex);
259 _producer->sendOffsetsToTransaction(topicPartitionOffsets, groupMetadata, timeout);
262 #ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS
263 void mockFatalError()
265 _fatalError = std::make_unique<Error>(RD_KAFKA_RESP_ERR__FATAL,
"fake fatal error",
true);
274 _producer->pollEvents(std::chrono::milliseconds(1));
277 const std::string errStr = _fatalError->toString();
278 KAFKA_API_LOG(Log::Level::Notice,
"met fatal error[%s], will re-initialize the internal producer", errStr.c_str());
280 const std::lock_guard<std::mutex> lock(_producerMutex);
282 if (!_running)
return;
289 _producer = createProducer();
294 std::unique_ptr<KafkaProducer> createProducer()
296 return std::make_unique<KafkaProducer>(_properties);
300 Properties _properties;
302 std::unique_ptr<Error> _fatalError;
304 std::atomic<bool> _running;
305 std::thread _pollThread;
307 mutable std::mutex _producerMutex;
308 std::unique_ptr<KafkaProducer> _producer;
static constexpr const char * ERROR_CB
Log callback.
Definition: ClientConfig.h:38
static constexpr const char * ENABLE_MANUAL_EVENTS_POLL
To poll the events manually (otherwise, it would be done with a background polling thread).
Definition: ClientConfig.h:26
static constexpr const char * LOG_LEVEL
Log level (syslog(3) levels).
Definition: ClientConfig.h:72
ActionWhileQueueIsFull
Choose the action while the sending buffer is full.
Definition: KafkaProducer.h:71
SendOption
Options for sending messages.
Definition: KafkaProducer.h:66