1
2
3
4 """ defines class _DbResultSet_ for lazy interactions with Db query results
5
6 **Note**
7
8 this uses the Python iterator interface, so you'll need python 2.2 or above.
9
10 """
11 import sys
12 from Dbase import DbInfo
13
15 - def __init__(self,cursor,conn,cmd,removeDups=-1,transform=None,extras=None):
25
27 """ implement in subclasses
28
29 """
30 try:
31 if not self.extras:
32 self.cursor.execute(self.cmd)
33 else:
34 self.cursor.execute(self.cmd,self.extras)
35 except:
36 sys.stderr.write('the command "%s" generated errors:\n'%(self.cmd))
37 import traceback
38 traceback.print_exc()
39
40
42 self.Reset()
43 return self
44
46 self.colNames = []
47 self.colTypes = []
48 for cName,cType in DbInfo.GetColumnInfoFromCursor(self.cursor):
49 self.colNames.append(cName)
50 self.colTypes.append(cType)
51 self.colNames = tuple(self.colNames)
52 self.colTypes = tuple(self.colTypes)
53
59 res = [None]*len(self.colNames)
60 for i in range(len(self.colNames)):
61 res[i] = self.colNames[i],self.colTypes[i]
62 return tuple(res)
63
64
66 """ Only supports forward iteration
67
68 """
76
78 if self._stopped:
79 raise StopIteration
80 r = None
81 while r is None:
82 r = self.cursor.fetchone()
83 if not r:
84 self._stopped = 1
85 raise StopIteration
86 if self.transform is not None:
87 r = self.transform(r)
88 if self.removeDups>=0:
89 v = r[self.removeDups]
90 if v in self.seen:
91 r = None
92 else:
93 self.seen.append(v)
94 return r
95
96
97
99 """ Supports random access
100
101 """
103 DbResultBase.__init__(self,*args,**kwargs)
104 self.results = []
105 self.seen = []
106 self._pos = -1
107
112
114 if self.cursor:
115
116 r = self.cursor.fetchone()
117 while r:
118 if self.transform is not None:
119 r = self.transform(r)
120 if self.removeDups >=0:
121 v = r[self.removeDups]
122 if v not in self.seen:
123 self.seen.append(v)
124 self.results.append(r)
125 else:
126 self.results.append(r)
127 r = self.cursor.fetchone()
128 self.cursor = None
130 if idx < 0: raise IndexError,"negative indices not supported"
131 if self.cursor is None:
132 if len(self.results):
133 if idx >= len(self.results):
134 raise IndexError,'index %d too large (%d max)'%(idx,len(self.results))
135 else:
136 raise ValueError,'Invalid cursor'
137
138 while idx >= len(self.results):
139 r = None
140 while r is None:
141 r = self.cursor.fetchone()
142 if not r:
143 self.cursor = None
144 raise IndexError,'index %d too large (%d max)'%(idx,len(self.results))
145
146 if self.transform is not None:
147 r = self.transform(r)
148 if self.removeDups>=0:
149 v = r[self.removeDups]
150 if v in self.seen:
151 r = None
152 else:
153 self.results.append(r)
154 self.seen.append(v)
155 else:
156 self.results.append(r)
157
158 return self.results[idx]
159
161 if self.results is None:
162 raise ValueError,"len() not supported for noMemory Results Sets"
163 self._finish()
164 return len(self.results)
165
167 self._pos += 1
168 res = None
169 if self._pos < len(self):
170 res = self.results[self._pos]
171 else:
172 raise StopIteration
173 return res
174
175 if __name__ == '__main__':
176 from Dbase.DbConnection import DbConnect
177 conn = DbConnect('TEST.GDB')
178 curs = conn.GetCursor()
179 print 'curs:',repr(curs)
180 curs.execute('select * from ten_elements')
181 set = RandomAccessDbResultSet(curs)
182 for i in range(12):
183 try:
184 val = set[i]
185 except IndexError:
186 assert i >= 10
187
188 print 'use len'
189 curs = conn.GetCursor()
190 curs.execute('select * from ten_elements')
191 set = RandomAccessDbResultSet(curs)
192 for i in range(len(set)):
193 val = set[i]
194
195 print 'use iter'
196 curs = conn.GetCursor()
197 curs.execute('select * from ten_elements')
198 set = DbResultSet(curs)
199 for thing in set:
200 id,val = thing
201
202 print 'dups'
203 curs = conn.GetCursor()
204 curs.execute('select * from ten_elements_dups')
205 set = DbResultSet(curs)
206 r = []
207 for thing in set:
208 r.append(thing)
209 assert len(r)==20
210
211 curs = conn.GetCursor()
212 curs.execute('select * from ten_elements_dups')
213 set = DbResultSet(curs,removeDups=0)
214 r = []
215 for thing in set:
216 r.append(thing)
217 assert len(r)==10
218
219 curs = conn.GetCursor()
220 curs.execute('select * from ten_elements_dups')
221 set = RandomAccessDbResultSet(curs,removeDups=0)
222 assert len(set)==10
223 assert set[0] == (0,11)
224
225 curs = conn.GetCursor()
226 curs.execute('select * from ten_elements_dups')
227 set = RandomAccessDbResultSet(curs,removeDups=0)
228 assert set[0] == (0,11)
229 assert set[1] == (2,21)
230 assert set[5] == (10,61)
231
232 curs = conn.GetCursor()
233 curs.execute('select * from ten_elements_dups')
234 set = RandomAccessDbResultSet(curs)
235 assert set[0] == (0,11)
236 assert set[1] == (0,11)
237