10#ifdef RDK_BUILD_THREADSAFE_SSS
11#ifndef MULTITHREADED_MOL_SUPPLIER
12#define MULTITHREADED_MOL_SUPPLIER
24#include <boost/tokenizer.hpp>
29typedef boost::tokenizer<boost::char_separator<char>>
tokenizer;
33namespace FileParsers {
39 unsigned int numWriterThreads = 1;
40 size_t sizeInputQueue = 5;
41 size_t sizeOutputQueue = 5;
44 MultithreadedMolSupplier() {}
45 ~MultithreadedMolSupplier()
override;
47 std::unique_ptr<RWMol> next()
override;
50 bool atEnd()
override;
53 bool getEOFHitOnRead()
const {
return false; }
59 unsigned int getLastRecordId()
const;
61 std::string getLastItemText()
const;
72 void setNextCallback(T cb) {
83 void setWriteCallback(T cb) {
93 void setReadCallback(T cb) {
113 MultithreadedMolSupplier(
const MultithreadedMolSupplier &);
114 MultithreadedMolSupplier &operator=(
const MultithreadedMolSupplier &);
116 void reset()
override;
117 void init()
override = 0;
118 virtual bool getEnd()
const = 0;
120 virtual bool extractNextRecord(std::string &record,
unsigned int &lineNum,
121 unsigned int &index) = 0;
123 virtual RWMol *processMoleculeRecord(
const std::string &record,
124 unsigned int lineNum) = 0;
126 std::atomic<unsigned int> d_threadCounter{1};
127 std::vector<std::thread> d_writerThreads;
128 std::thread d_readerThread;
131 std::atomic<bool> df_started =
false;
132 std::atomic<unsigned int> d_lastRecordId =
134 std::string d_lastItemText;
135 const unsigned int d_numReaderThread = 1;
138 ConcurrentQueue<std::tuple<std::string, unsigned int, unsigned int>>>
141 ConcurrentQueue<std::tuple<RWMol *, std::string, unsigned int>>>
144 std::function<void(RWMol &,
const MultithreadedMolSupplier &)> nextCallback =
146 std::function<void(RWMol &,
const std::string &,
unsigned int)>
147 writeCallback =
nullptr;
148 std::function<std::string(
const std::string &,
unsigned int)> readCallback =
159 using ContainedType = v2::FileParsers::MultithreadedMolSupplier;
160 MultithreadedMolSupplier() {}
163 bool getEOFHitOnRead()
const {
165 return static_cast<ContainedType *
>(dp_supplier.get())->getEOFHitOnRead();
174 unsigned int getLastRecordId()
const {
176 return static_cast<ContainedType *
>(dp_supplier.get())->getLastRecordId();
179 std::string getLastItemText()
const {
181 return static_cast<ContainedType *
>(dp_supplier.get())->getLastItemText();
#define PRECONDITION(expr, mess)
boost::tokenizer< boost::char_separator< char > > tokenizer
#define RDKIT_FILEPARSERS_EXPORT
bool rdvalue_is(const RDValue_cast_t)