RDKit
Open-source cheminformatics and machine learning.
Loading...
Searching...
No Matches
MultithreadedMolSupplier.h
Go to the documentation of this file.
1//
2// Copyright (C) 2020 Shrey Aryan
3//
4// @@ All Rights Reserved @@
5// This file is part of the RDKit.
6// The contents are covered by the terms of the BSD license
7// which is included in the file license.txt, found at the root
8// of the RDKit source tree.
9//
10#ifdef RDK_BUILD_THREADSAFE_SSS
11#ifndef MULTITHREADED_MOL_SUPPLIER
12#define MULTITHREADED_MOL_SUPPLIER
13
18#include <RDGeneral/RDLog.h>
19#include <RDGeneral/RDThreads.h>
20#include <RDGeneral/StreamOps.h>
21
22#include <functional>
23#include <atomic>
24#include <boost/tokenizer.hpp>
25
26#include "FileParsers.h"
27#include "MolSupplier.h"
28
29typedef boost::tokenizer<boost::char_separator<char>> tokenizer;
30
31namespace RDKit {
32namespace v2 {
33namespace FileParsers {
34class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
35 //! this is an abstract base class to concurrently supply molecules one at a
36 //! time
37 public:
38 struct Parameters {
39 unsigned int numWriterThreads = 1;
40 size_t sizeInputQueue = 5;
41 size_t sizeOutputQueue = 5;
42 };
43
44 MultithreadedMolSupplier() {}
45 ~MultithreadedMolSupplier() override;
46 //! pop elements from the output queue
47 std::unique_ptr<RWMol> next() override;
48
49 //! returns true when all records have been read from the supplier
50 bool atEnd() override;
51
52 //! included for the interface, always returns false
53 bool getEOFHitOnRead() const { return false; }
54
55 //! returns the record id of the last extracted item
56 //! Note: d_LastRecordId = 0, initially therefore the value 0 is returned
57 //! if and only if the function is called before extracting the first
58 //! record
59 unsigned int getLastRecordId() const;
60 //! returns the text block for the last extracted item
61 std::string getLastItemText() const;
62
63 //! sets the callback to be applied to molecules before they are returned by
64 ///! the next() function
65 /*!
66 \param cb: a function that takes a reference to an RWMol and a const
67 reference to the MultithreadedMolSupplier. This can modify the molecule in
68 place
69
70 */
71 template <typename T>
72 void setNextCallback(T cb) {
73 nextCallback = cb;
74 }
75 //! sets the callback to be applied to molecules after they are processed, but
76 ///! before they are written to the output queue
77 /*!
78 \param cb: a function that takes a reference to an RWMol, a const reference
79 to the string record, and an unsigned int record id. This can modify the
80 molecule in place
81 */
82 template <typename T>
83 void setWriteCallback(T cb) {
84 writeCallback = cb;
85 }
86 //! sets the callback to be applied to input text records before they are
87 ///! added to the input queue
88 /*!
89 \param cb: a function that takes a const reference to the string record and
90 an unsigned int record id and returns the modified string record
91 */
92 template <typename T>
93 void setReadCallback(T cb) {
94 readCallback = cb;
95 }
96
97 protected:
98 //! starts reader and writer threads
99 void startThreads();
100
101 private:
102 //! reads lines from input stream to populate the input queue
103 void reader();
104 //! parses lines from the input queue converting them to RWMol objects
105 //! populating the output queue
106 void writer();
107 //! finalizes the reader and writer threads
108 void endThreads();
109 //! disable automatic copy constructors and assignment operators
110 //! for this class and its subclasses. They will likely be
111 //! carrying around stream pointers and copying those is a recipe
112 //! for disaster.
113 MultithreadedMolSupplier(const MultithreadedMolSupplier &);
114 MultithreadedMolSupplier &operator=(const MultithreadedMolSupplier &);
115 //! not yet implemented
116 void reset() override;
117 void init() override = 0;
118 virtual bool getEnd() const = 0;
119 //! extracts next record from the input file or stream
120 virtual bool extractNextRecord(std::string &record, unsigned int &lineNum,
121 unsigned int &index) = 0;
122 //! processes the record into an RWMol object
123 virtual RWMol *processMoleculeRecord(const std::string &record,
124 unsigned int lineNum) = 0;
125
126 std::atomic<unsigned int> d_threadCounter{1}; //!< thread counter
127 std::vector<std::thread> d_writerThreads; //!< vector writer threads
128 std::thread d_readerThread; //!< single reader thread
129
130 protected:
131 std::atomic<bool> df_started = false;
132 std::atomic<unsigned int> d_lastRecordId =
133 0; //!< stores last extracted record id
134 std::string d_lastItemText; //!< stores last extracted record
135 const unsigned int d_numReaderThread = 1; //!< number of reader thread
136
137 std::unique_ptr<
138 ConcurrentQueue<std::tuple<std::string, unsigned int, unsigned int>>>
139 d_inputQueue; //!< concurrent input queue
140 std::unique_ptr<
141 ConcurrentQueue<std::tuple<RWMol *, std::string, unsigned int>>>
142 d_outputQueue; //!< concurrent output queue
143 Parameters d_params;
144 std::function<void(RWMol &, const MultithreadedMolSupplier &)> nextCallback =
145 nullptr;
146 std::function<void(RWMol &, const std::string &, unsigned int)>
147 writeCallback = nullptr;
148 std::function<std::string(const std::string &, unsigned int)> readCallback =
149 nullptr;
150};
151} // namespace FileParsers
152} // namespace v2
153
154inline namespace v1 {
155class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
156 //! this is an abstract base class to concurrently supply molecules one at a
157 //! time
158 public:
159 using ContainedType = v2::FileParsers::MultithreadedMolSupplier;
160 MultithreadedMolSupplier() {}
161
162 //! included for the interface, always returns false
163 bool getEOFHitOnRead() const {
164 if (dp_supplier) {
165 return static_cast<ContainedType *>(dp_supplier.get())->getEOFHitOnRead();
166 }
167 return false;
168 }
169
170 //! returns the record id of the last extracted item
171 //! Note: d_LastRecordId = 0, initially therefore the value 0 is returned
172 //! if and only if the function is called before extracting the first
173 //! record
174 unsigned int getLastRecordId() const {
175 PRECONDITION(dp_supplier, "no supplier");
176 return static_cast<ContainedType *>(dp_supplier.get())->getLastRecordId();
177 }
178 //! returns the text block for the last extracted item
179 std::string getLastItemText() const {
180 PRECONDITION(dp_supplier, "no supplier");
181 return static_cast<ContainedType *>(dp_supplier.get())->getLastItemText();
182 }
183};
184} // namespace v1
185} // namespace RDKit
186#endif
187#endif
#define PRECONDITION(expr, mess)
Definition Invariant.h:109
boost::tokenizer< boost::char_separator< char > > tokenizer
Definition LinkNode.h:18
#define RDKIT_FILEPARSERS_EXPORT
Definition export.h:161
Std stuff.
bool rdvalue_is(const RDValue_cast_t)