RDKit
Open-source cheminformatics and machine learning.
Loading...
Searching...
No Matches
MultithreadedMolSupplier.h
Go to the documentation of this file.
1//
2// Copyright (C) 2020 Shrey Aryan
3//
4// @@ All Rights Reserved @@
5// This file is part of the RDKit.
6// The contents are covered by the terms of the BSD license
7// which is included in the file license.txt, found at the root
8// of the RDKit source tree.
9//
10#ifdef RDK_BUILD_THREADSAFE_SSS
11#ifndef MULTITHREADED_MOL_SUPPLIER
12#define MULTITHREADED_MOL_SUPPLIER
13
18#include <RDGeneral/RDLog.h>
19#include <RDGeneral/RDThreads.h>
20#include <RDGeneral/StreamOps.h>
21
22#include <atomic>
23#include <boost/tokenizer.hpp>
24
25#include "FileParsers.h"
26#include "MolSupplier.h"
27
28typedef boost::tokenizer<boost::char_separator<char>> tokenizer;
29
30namespace RDKit {
31namespace v2 {
32namespace FileParsers {
33class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
34 //! this is an abstract base class to concurrently supply molecules one at a
35 //! time
36 public:
37 struct Parameters {
38 unsigned int numWriterThreads = 1;
39 size_t sizeInputQueue = 5;
40 size_t sizeOutputQueue = 5;
41 };
42
43 MultithreadedMolSupplier() {}
44 ~MultithreadedMolSupplier() override;
45 //! pop elements from the output queue
46 std::unique_ptr<RWMol> next() override;
47 //! returns true when all records have been read from the supplier
48 bool atEnd() override;
49
50 //! included for the interface, always returns false
51 bool getEOFHitOnRead() const { return false; }
52
53 //! returns the record id of the last extracted item
54 //! Note: d_LastRecordId = 0, initially therefore the value 0 is returned
55 //! if and only if the function is called before extracting the first
56 //! record
57 unsigned int getLastRecordId() const;
58 //! returns the text block for the last extracted item
59 std::string getLastItemText() const;
60
61 protected:
62 //! starts reader and writer threads
63 void startThreads();
64
65 private:
66 //! reads lines from input stream to populate the input queue
67 void reader();
68 //! parses lines from the input queue converting them to RWMol objects
69 //! populating the output queue
70 void writer();
71 //! finalizes the reader and writer threads
72 void endThreads();
73 //! disable automatic copy constructors and assignment operators
74 //! for this class and its subclasses. They will likely be
75 //! carrying around stream pointers and copying those is a recipe
76 //! for disaster.
77 MultithreadedMolSupplier(const MultithreadedMolSupplier &);
78 MultithreadedMolSupplier &operator=(const MultithreadedMolSupplier &);
79 //! not yet implemented
80 void reset() override;
81 void init() override = 0;
82 virtual bool getEnd() const = 0;
83 //! extracts next record from the input file or stream
84 virtual bool extractNextRecord(std::string &record, unsigned int &lineNum,
85 unsigned int &index) = 0;
86 //! processes the record into an RWMol object
87 virtual RWMol *processMoleculeRecord(const std::string &record,
88 unsigned int lineNum) = 0;
89
90 std::atomic<unsigned int> d_threadCounter{1}; //!< thread counter
91 std::vector<std::thread> d_writerThreads; //!< vector writer threads
92 std::thread d_readerThread; //!< single reader thread
93
94 protected:
95 std::atomic<unsigned int> d_lastRecordId =
96 0; //!< stores last extracted record id
97 std::string d_lastItemText; //!< stores last extracted record
98 const unsigned int d_numReaderThread = 1; //!< number of reader thread
99
100 ConcurrentQueue<std::tuple<std::string, unsigned int, unsigned int>>
101 *d_inputQueue; //!< concurrent input queue
102 ConcurrentQueue<std::tuple<RWMol *, std::string, unsigned int>>
103 *d_outputQueue; //!< concurrent output queue
104 Parameters d_params;
105};
106} // namespace FileParsers
107} // namespace v2
108
109inline namespace v1 {
110class RDKIT_FILEPARSERS_EXPORT MultithreadedMolSupplier : public MolSupplier {
111 //! this is an abstract base class to concurrently supply molecules one at a
112 //! time
113 public:
114 using ContainedType = v2::FileParsers::MultithreadedMolSupplier;
115 MultithreadedMolSupplier() {}
116
117 //! included for the interface, always returns false
118 bool getEOFHitOnRead() const {
119 if (dp_supplier) {
120 return static_cast<ContainedType *>(dp_supplier.get())->getEOFHitOnRead();
121 }
122 return false;
123 }
124
125 //! returns the record id of the last extracted item
126 //! Note: d_LastRecordId = 0, initially therefore the value 0 is returned
127 //! if and only if the function is called before extracting the first
128 //! record
129 unsigned int getLastRecordId() const {
130 PRECONDITION(dp_supplier, "no supplier");
131 return static_cast<ContainedType *>(dp_supplier.get())->getLastRecordId();
132 }
133 //! returns the text block for the last extracted item
134 std::string getLastItemText() const {
135 PRECONDITION(dp_supplier, "no supplier");
136 return static_cast<ContainedType *>(dp_supplier.get())->getLastItemText();
137 }
138};
139} // namespace v1
140} // namespace RDKit
141#endif
142#endif
#define PRECONDITION(expr, mess)
Definition Invariant.h:109
boost::tokenizer< boost::char_separator< char > > tokenizer
Definition LinkNode.h:18
#define RDKIT_FILEPARSERS_EXPORT
Definition export.h:161
Std stuff.
bool rdvalue_is(const RDValue_cast_t)