10#ifdef RDK_BUILD_THREADSAFE_SSS
11#ifndef MULTITHREADED_MOL_SUPPLIER
12#define MULTITHREADED_MOL_SUPPLIER
24#include <boost/tokenizer.hpp>
29typedef boost::tokenizer<boost::char_separator<char>>
tokenizer;
39 unsigned int numWriterThreads = 1;
40 size_t sizeInputQueue = 5;
41 size_t sizeOutputQueue = 5;
44 MultithreadedMolSupplier() {}
49 virtual ~MultithreadedMolSupplier() {close();}
52 virtual void close()
override;
54 std::unique_ptr<RWMol> next()
override;
57 bool atEnd()
override;
60 bool getEOFHitOnRead()
const {
return false; }
66 unsigned int getLastRecordId()
const;
68 std::string getLastItemText()
const;
79 void setNextCallback(T cb) {
90 void setWriteCallback(T cb) {
100 void setReadCallback(T cb) {
106 virtual void closeStreams() {}
123 MultithreadedMolSupplier(
const MultithreadedMolSupplier &);
124 MultithreadedMolSupplier &operator=(
const MultithreadedMolSupplier &);
126 void reset()
override;
127 void init()
override = 0;
128 virtual bool getEnd()
const = 0;
130 virtual bool extractNextRecord(std::string &record,
unsigned int &lineNum,
131 unsigned int &index) = 0;
133 virtual RWMol *processMoleculeRecord(
const std::string &record,
134 unsigned int lineNum) = 0;
136 std::mutex d_threadCounterMutex;
137 std::atomic<unsigned int> d_threadCounter{1};
138 std::vector<std::thread> d_writerThreads;
139 std::thread d_readerThread;
142 std::atomic<bool> df_started =
false;
143 std::atomic<bool> df_forceStop =
false;
145 std::atomic<unsigned int> d_lastRecordId =
147 std::string d_lastItemText;
148 const unsigned int d_numReaderThread = 1;
151 ConcurrentQueue<std::tuple<std::string, unsigned int, unsigned int>>>
154 ConcurrentQueue<std::tuple<RWMol *, std::string, unsigned int>>>
157 std::function<void(RWMol &,
const MultithreadedMolSupplier &)> nextCallback =
159 std::function<void(RWMol &,
const std::string &,
unsigned int)>
160 writeCallback =
nullptr;
161 std::function<std::string(
const std::string &,
unsigned int)> readCallback =
173 using ContainedType = v2::FileParsers::MultithreadedMolSupplier;
174 MultithreadedMolSupplier() {}
177 bool getEOFHitOnRead()
const {
179 return static_cast<ContainedType *
>(dp_supplier.get())->getEOFHitOnRead();
188 unsigned int getLastRecordId()
const {
190 return static_cast<ContainedType *
>(dp_supplier.get())->getLastRecordId();
193 std::string getLastItemText()
const {
195 return static_cast<ContainedType *
>(dp_supplier.get())->getLastItemText();
#define PRECONDITION(expr, mess)
boost::tokenizer< boost::char_separator< char > > tokenizer
#define RDKIT_FILEPARSERS_EXPORT