Package VLib :: Package NodeLib :: Module DbPickleSupplier
[hide private]
[frames] | no frames]

Source Code for Module VLib.NodeLib.DbPickleSupplier

  1  #  $Id: DbPickleSupplier.py 2 2006-05-06 22:54:39Z glandrum $ 
  2  # 
  3  #  Copyright (C) 2004 Rational Discovery LLC 
  4  #     All Rights Reserved 
  5  # 
  6  import RDConfig 
  7  import sys,os.path 
  8  from VLib.Supply import SupplyNode 
  9  import cPickle 
 10   
 11  if RDConfig.usePgSQL: 
 12    from pyPgSQL import PgSQL as sql 
13 - class _lazyDataSeq:
14 """ 15 These classes are used to speed up (a lot) the process of 16 pulling pickled objects from PostgreSQL databases. Instead of 17 having to use all of PgSQL's typechecking, we'll make a lot of 18 assumptions about what's coming out of the Db and its layout. 19 The results can lead to drastic improvements in perfomance. 20 21 """
22 - def __init__(self,cursor,cmd,pickleCol=1,depickle=1,klass=None):
23 self.cursor = cursor 24 self.cmd = cmd 25 self._first=0 26 self._pickleCol=pickleCol 27 self._depickle=depickle 28 self._klass=klass
29 - def _validate(self):
30 curs = self.cursor 31 if not curs or \ 32 curs.closed or \ 33 curs.conn is None or \ 34 (curs.res.resultType != sql.RESULT_DQL and curs.closed is None): 35 raise ValueError,'bad cursor' 36 if curs.res.nfields and curs.res.nfields < 2: 37 raise ValueError,\ 38 'invalid number of results returned (%d), must be at least 2'%curs.res.nfields 39 desc1 = curs.description[self._pickleCol] 40 ftv = desc1[self._pickleCol].value 41 if ftv != sql.BINARY: 42 raise TypeError,'pickle column (%d) of bad type'%self._pickleCol
43
44 - def __iter__(self):
45 try: 46 self.cursor.execute(self.cmd) 47 except: 48 import traceback 49 traceback.print_exc() 50 print 'COMMAND:',self.cmd 51 raise 52 self._first=1 53 self._validate() 54 return self
55 - def next(self):
56 curs = self.cursor 57 if not curs or \ 58 curs.closed or \ 59 curs.conn is None or \ 60 curs.res is None or \ 61 (curs.res.resultType != sql.RESULT_DQL and curs.closed is None): 62 raise StopIteration 63 if not self._first: 64 res = curs.conn.conn.query('fetch 1 from "%s"'%self.cursor.name) 65 66 if res.ntuples == 0: 67 raise StopIteration 68 else: 69 if res.nfields < 2: 70 raise ValueError,'bad result: %s'%str(res) 71 t = [res.getvalue(0,x) for x in range(res.nfields)] 72 val = t[self._pickleCol] 73 else: 74 t = curs.fetchone() 75 val = str(t[self._pickleCol]) 76 self._first = 0 77 if self._depickle: 78 if not self._klass: 79 fp = cPickle.loads(val) 80 else: 81 fp = self._klass(val) 82 fields = list(t) 83 del fields[self._pickleCol] 84 fp._fieldsFromDb = fields 85 else: 86 fp = list(t) 87 return fp
88
89 - class _dataSeq(_lazyDataSeq):
90 - def __init__(self,cursor,cmd,pickleCol=1,depickle=1):
91 self.cursor=cursor 92 self.cmd = cmd 93 self.res = None 94 self.rowCount = -1 95 self.idx = 0 96 self._pickleCol=pickleCol 97 self._depickle = depickle
98 - def __iter__(self):
99 self.cursor.execute(self.cmd) 100 self._first = self.cursor.fetchone() 101 self._validate() 102 self.res = self.cursor.conn.conn.query('fetch all from "%s"'%self.cursor.name) 103 self.rowCount = self.res.ntuples+1 104 self.idx=0 105 if self.res.nfields < 2: 106 raise ValueError,'bad query result'%str(res) 107 108 return self
109 - def next(self):
110 if self.idx >= self.rowCount: 111 raise StopIteration 112 113 fp = self[self.idx] 114 self.idx += 1 115 116 return fp
117
118 - def __len__(self):
119 return self.rowCount
120 - def __getitem__(self,idx):
121 if self.res is None: 122 self.cursor.execute(self.cmd) 123 self._first = self.cursor.fetchone() 124 self._validate() 125 self.res = self.cursor.conn.conn.query('fetch all from "%s"'%self.cursor.name) 126 self.rowCount = self.res.ntuples+1 127 self.idx=0 128 if self.res.nfields < 2: 129 raise ValueError,'bad query result'%str(res) 130 131 if idx < 0: 132 idx = self.rowCount+idx 133 if idx<0 or (idx >= 0 and idx >= self.rowCount): 134 raise IndexError 135 if idx==0: 136 val = str(self._first[self._pickleCol]) 137 t = list(self._first) 138 else: 139 val = self.res.getvalue(self.idx-1,self._pickleCol) 140 t = [self.res.getvalue(self.idx-1,x) for x in range(self.res.nfields)] 141 if self._depickle: 142 try: 143 fp = cPickle.loads(val) 144 except: 145 import logging 146 del t[self._pickleCol] 147 logging.exception('Depickling failure in row: %s'%str(t)) 148 raise 149 del t[self._pickleCol] 150 fp._fieldsFromDb = t 151 else: 152 fp = t 153 return fp
154 else: 155 _dataSeq=None 156 157
158 -class DbPickleSupplyNode(SupplyNode):
159 """ Supplies pickled objects from a db result set: 160 161 Sample Usage: 162 >>> from Dbase.DbConnection import DbConnect 163 164 """
165 - def __init__(self,cursor,cmd,binaryCol,**kwargs):
166 SupplyNode.__init__(self,**kwargs) 167 self._dbResults = dbResults 168 self._supplier = DbMolSupplier.RandomAccessDbMolSupplier(self._dbResults,**kwargs)
169
170 - def reset(self):
171 SupplyNode.reset(self) 172 self._supplier.Reset()
173 - def next(self):
174 """ 175 176 """ 177 return self._supplier.next()
178
179 -def GetNode(dbName,tableName):
180 from Dbase.DbConnection import DbConnect 181 conn = DbConnect(dbName,tableName) 182 return DbMolSupplyNode(conn.GetData())
183 184 #------------------------------------ 185 # 186 # doctest boilerplate 187 #
188 -def _test():
189 import doctest,sys 190 return doctest.testmod(sys.modules["__main__"])
191 192 193 if __name__ == '__main__': 194 import sys 195 failed,tried = _test() 196 sys.exit(failed) 197