RDKit
Open-source cheminformatics and machine learning.
Loading...
Searching...
No Matches
MultithreadedMolSupplier.h
Go to the documentation of this file.
1//
2// Copyright (C) 2020 Shrey Aryan
3//
4// @@ All Rights Reserved @@
5// This file is part of the RDKit.
6// The contents are covered by the terms of the BSD license
7// which is included in the file license.txt, found at the root
8// of the RDKit source tree.
9//
10#ifdef RDK_BUILD_THREADSAFE_SSS
11#ifndef MULTITHREADED_MOL_SUPPLIER
12#define MULTITHREADED_MOL_SUPPLIER
13
18#include <RDGeneral/RDLog.h>
19#include <RDGeneral/RDThreads.h>
20#include <RDGeneral/StreamOps.h>
21
22#include <functional>
23#include <atomic>
24#include <boost/tokenizer.hpp>
25
26#include "FileParsers.h"
27#include "MolSupplier.h"
28
29typedef boost::tokenizer<boost::char_separator<char>> tokenizer;
30
31namespace RDKit {
32namespace v2 {
33namespace FileParsers {
34class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
35 //! this is an abstract base class to concurrently supply molecules one at a
36 //! time
37 public:
38 struct Parameters {
39 unsigned int numWriterThreads = 1;
40 size_t sizeInputQueue = 5;
41 size_t sizeOutputQueue = 5;
42 };
43
44 MultithreadedMolSupplier() {}
45
46
47 // Derived classes MUST have a destructor that calls close
48 // to properly end threads while the instance is alive
49 virtual ~MultithreadedMolSupplier() {close();}
50
51 //! shut down the supplier
52 virtual void close() override;
53 //! pop elements from the output queue
54 std::unique_ptr<RWMol> next() override;
55
56 //! returns true when all records have been read from the supplier
57 bool atEnd() override;
58
59 //! included for the interface, always returns false
60 bool getEOFHitOnRead() const { return false; }
61
62 //! returns the record id of the last extracted item
63 //! Note: d_LastRecordId = 0, initially therefore the value 0 is returned
64 //! if and only if the function is called before extracting the first
65 //! record
66 unsigned int getLastRecordId() const;
67 //! returns the text block for the last extracted item
68 std::string getLastItemText() const;
69
70 //! sets the callback to be applied to molecules before they are returned by
71 ///! the next() function
72 /*!
73 \param cb: a function that takes a reference to an RWMol and a const
74 reference to the MultithreadedMolSupplier. This can modify the molecule in
75 place
76
77 */
78 template <typename T>
79 void setNextCallback(T cb) {
80 nextCallback = cb;
81 }
82 //! sets the callback to be applied to molecules after they are processed, but
83 ///! before they are written to the output queue
84 /*!
85 \param cb: a function that takes a reference to an RWMol, a const reference
86 to the string record, and an unsigned int record id. This can modify the
87 molecule in place
88 */
89 template <typename T>
90 void setWriteCallback(T cb) {
91 writeCallback = cb;
92 }
93 //! sets the callback to be applied to input text records before they are
94 ///! added to the input queue
95 /*!
96 \param cb: a function that takes a const reference to the string record and
97 an unsigned int record id and returns the modified string record
98 */
99 template <typename T>
100 void setReadCallback(T cb) {
101 readCallback = cb;
102 }
103
104 protected:
105 //! Close down any external streams
106 virtual void closeStreams() {}
107
108 //! starts reader and writer threads
109 void startThreads();
110 //! finalizes the reader and writer threads
111 void endThreads();
112
113 private:
114 //! reads lines from input stream to populate the input queue
115 void reader();
116 //! parses lines from the input queue converting them to RWMol objects
117 //! populating the output queue
118 void writer();
119 //! disable automatic copy constructors and assignment operators
120 //! for this class and its subclasses. They will likely be
121 //! carrying around stream pointers and copying those is a recipe
122 //! for disaster.
123 MultithreadedMolSupplier(const MultithreadedMolSupplier &);
124 MultithreadedMolSupplier &operator=(const MultithreadedMolSupplier &);
125 //! not yet implemented
126 void reset() override;
127 void init() override = 0;
128 virtual bool getEnd() const = 0;
129 //! extracts next record from the input file or stream
130 virtual bool extractNextRecord(std::string &record, unsigned int &lineNum,
131 unsigned int &index) = 0;
132 //! processes the record into an RWMol object
133 virtual RWMol *processMoleculeRecord(const std::string &record,
134 unsigned int lineNum) = 0;
135
136 std::mutex d_threadCounterMutex;
137 std::atomic<unsigned int> d_threadCounter{1}; //!< thread counter
138 std::vector<std::thread> d_writerThreads; //!< vector writer threads
139 std::thread d_readerThread; //!< single reader thread
140
141 protected:
142 std::atomic<bool> df_started = false;
143 std::atomic<bool> df_forceStop = false;
144
145 std::atomic<unsigned int> d_lastRecordId =
146 0; //!< stores last extracted record id
147 std::string d_lastItemText; //!< stores last extracted record
148 const unsigned int d_numReaderThread = 1; //!< number of reader thread
149
150 std::unique_ptr<
151 ConcurrentQueue<std::tuple<std::string, unsigned int, unsigned int>>>
152 d_inputQueue; //!< concurrent input queue
153 std::unique_ptr<
154 ConcurrentQueue<std::tuple<RWMol *, std::string, unsigned int>>>
155 d_outputQueue; //!< concurrent output queue
156 Parameters d_params;
157 std::function<void(RWMol &, const MultithreadedMolSupplier &)> nextCallback =
158 nullptr;
159 std::function<void(RWMol &, const std::string &, unsigned int)>
160 writeCallback = nullptr;
161 std::function<std::string(const std::string &, unsigned int)> readCallback =
162 nullptr;
163
164};
165} // namespace FileParsers
166} // namespace v2
167
168inline namespace v1 {
169class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
170 //! this is an abstract base class to concurrently supply molecules one at a
171 //! time
172 public:
173 using ContainedType = v2::FileParsers::MultithreadedMolSupplier;
174 MultithreadedMolSupplier() {}
175
176 //! included for the interface, always returns false
177 bool getEOFHitOnRead() const {
178 if (dp_supplier) {
179 return static_cast<ContainedType *>(dp_supplier.get())->getEOFHitOnRead();
180 }
181 return false;
182 }
183
184 //! returns the record id of the last extracted item
185 //! Note: d_LastRecordId = 0, initially therefore the value 0 is returned
186 //! if and only if the function is called before extracting the first
187 //! record
188 unsigned int getLastRecordId() const {
189 PRECONDITION(dp_supplier, "no supplier");
190 return static_cast<ContainedType *>(dp_supplier.get())->getLastRecordId();
191 }
192 //! returns the text block for the last extracted item
193 std::string getLastItemText() const {
194 PRECONDITION(dp_supplier, "no supplier");
195 return static_cast<ContainedType *>(dp_supplier.get())->getLastItemText();
196 }
197};
198} // namespace v1
199} // namespace RDKit
200#endif
201#endif
#define PRECONDITION(expr, mess)
Definition Invariant.h:108
boost::tokenizer< boost::char_separator< char > > tokenizer
Definition LinkNode.h:18
#define RDKIT_FILEPARSERS_EXPORT
Definition export.h:177
Std stuff.