Expand|Select|Wrap|Line Numbers
- self.staticText3 = wx.StaticText(id=wxID_DBCONNECTDIALOGSTATICTEXT3,
- label='ODBC Data Source Name', name='staticText3', parent=self, pos=wx.Point(240,
- 40), size=wx.Size(143, 16), style=0)
Expand|Select|Wrap|Line Numbers
- ##from MySQLdb import *
- from mx.ODBC.Windows import *
- from time import time
- class DBServer:
- def __init__(self, master):
- self.master = master
- def Login(self, servername, username, password): #, database=""
- """Attempt to create a database login. If successful, return
- an open connection. Otherwise, return None."""
- try:
- self.dbconnect = Connect(servername, user=username, password=password,
- clear_auto_commit=0) #, host=servername
- ## self.dbconnect = connect(host=servername, user=username, passwd=password) #, db=database
- except (DatabaseError, OperationalError):
- self.dbconnect = None
- self.dbcursor = None
- self.master.write('Couldn\'t log on to the server `%s` as `%s`\n' %(servername, username))
- return
- self.master.write("%s has been logged onto %s\n" %(username, servername))
- self.dbcursor = self.dbconnect.cursor()
- self.Execute('SET autocommit=1')
- return self.dbconnect
- def DBError(self, query, message):
- """Remove the current message from the cursor
- and display it. Report the query and error to the master."""
- print message
- ## try:
- ## (error, message) = self.dbcursor.messages.pop()
- ## except AttributeError:
- ## (error, message) = self.dbconnect.messages.pop()
- ## self.master.write('%s\n%s #%d: %s\n' %(query, str(error).split('.')[-1],
- ## message[0], message[1]))
- self.master.write('%s\nOCDB Error %s #%d: %s #%d\n' %(query, message[0], message[1],
- message[-2].split(']')[-1], message[-1]))
- def IsOpen(self, connection):
- return not connection.closed
- def Execute(self, query):
- """Execution method reports on the number of rows affected and duration
- of the database query execution and catches errors. Return a reference
- to the cursor if no error ocurred, otherwise, None."""
- cursor = self.dbcursor
- if cursor:
- try:
- now = time()
- cursor.execute(query)
- nRows = cursor.rowcount
- self.master.write("%s " % query)
- self.master.write("%d rows affected: %.2f sec.\n" %(nRows, time() - now))
- except (DatabaseError, OperationalError), message:
- self.DBError(query, message)
- return
- return cursor
- def DBExists(self, database):
- """Return True if database exists"""
- cursor = self.Execute("show databases")
- if cursor:
- rows = cursor.fetchall()
- return (database.strip('`').lower(),) in rows
- def TableExists(self, table):
- """Return True if database exists"""
- cursor = self.Execute("show tables")
- if cursor:
- rows = cursor.fetchall()
- return (table.strip('`').lower(),) in rows
- def SetMaster(self, master):
- """Allow the master to be reset."""
- self.master = master
- def GetMaster(self):
- return self.master
- def GetDbConnection(self):
- return self.dbconnect
- def close(self):
- try:
- self.dbconnect.close()
- self.master.write("Closed connection.\n")
- except ProgrammingError:
- self.master.write("Already closed!\n")
- class DBClient:
- """Subclass this class and override the write() method,
- or provide these minimal services in your own class."""
- def __init__(self, database):
- ## Connect to the SQL data base
- self.dbServer = DBServer(self, database)
- def write(self, message):
- print message
- ## Probably don't want to give default values to parameters in utility routines...
- ## Do some tests some day to look at side effects...
- def MySQLDelete(table, argdict={}, **kwargs):
- """Build an SQL DELETE command from the arguments:
- Return a single string which can be 'execute'd.
- argdict and kwargs are two way to evaluate 'colName':value
- for the WHERE clause."""
- args = argdict.copy()
- args.update(kwargs)
- for key, value in args.items():
- args[key] = (str(value), repr(value))[type(value) == str]
- b = ''
- if args:
- b = 'WHERE %s' % ' AND '.join(key + '=' + value
- for key, value in args.items())
- return ' '.join(['DELETE FROM', table, b])
- def MySQLInsert(table, argdict={}, **kwargs):
- """Build an SQL INSERT command from the arguments:
- Return a single string which can be 'execute'd.
- argdict is a dictionary of 'column_name':value items.
- **kwargs is the same but passed in as column_name=value"""
- args = argdict.copy() # don't modify caller dictionary!
- args.update(kwargs)
- keys = args.keys()
- argslist = []
- for key in keys:
- a = args[key]
- argslist.append((str(a), repr(a))[type(a) == str])
- # wrap comma separated values in parens
- a = '(%s)' %', '.join(field for field in keys)
- b = '(%s)' %', '.join(argslist)
- return ' '.join(['INSERT', table, a, 'VALUES', b])
- def MySQLUpdate(table, valuedict, argdict={}, **kwargs):
- """Build an SQL SELECT command from the arguments:
- Return a single string which can be 'execute'd.
- valuedict is a dictionary of column_names:value to update.
- argdict and kwargs are two way to evaluate 'colName'=value
- for the WHERE clause."""
- vargs = valuedict.copy()
- for key, value in vargs.items():
- vargs[key] = (str(value), repr(value))[type(value) == str]
- a = 'SET %s' % ', '.join(key + '=' + value
- for key, value in vargs.items())
- args = argdict.copy()
- args.update(kwargs)
- for key, value in args.items():
- args[key] = (str(value), repr(value))[type(value) == str]
- b = ''
- if args:
- b = 'WHERE %s' % ' AND '.join(key + '=' + value
- for key, value in args.items())
- return ' '.join(['UPDATE', table, a, b])
- def MySQLSelect(table, arglist=[], argdict={}, **kwargs):
- """Build an SQL SELECT command from the arguments:
- Return a single string which can be 'execute'd.
- arglist is a list of strings that are column names to get.
- argdict and kwargs are two way to evaluate 'colName'=value
- for the WHERE clause"""
- a = ', '.join(arglist)
- args = argdict.copy()
- args.update(kwargs)
- for key, value in args.items():
- args[key] = (str(value), repr(value))[type(value) == str]
- b = ''
- if args:
- b = 'WHERE %s' % ' AND '.join(key + '=' + value
- for key, value in args.items())
- return ' '.join(['SELECT', (a or '*'), 'FROM', table, b])
- def Py2SQL_Join(tables, mapping, *arglist, **kwargs):
- """Build an SQL SELECT command from the arguments:
- Return a single string which can be 'execute'd.
- tables is a list of strings that are table names to get from.
- mapping is a list of tuples that maps table[:] to arglist[]
- arglist is a list of strings that are column names to get.
- argdict and kwargs are two way to evaluate 'colName'=value
- for the WHERE clause. There may be a way to map this dict,
- but for now, keys must be unique across tables."""
- a = ', '.join(['%s.%s' %(tables[mapping[i][0]], arglist[mapping[i][1]]) for i in range(len(mapping))])
- for key, value in kwargs.items():
- kwargs[key] = (str(value), repr(value))[type(value) == str]
- b = ', '.join(tables)
- # this is just wrong! it randomly pops from dict, and using tables[1] doesn't make sense!
- try:
- k, v = kwargs.popitem()
- except KeyError:
- k = ''
- if k:
- kwargs.update({'%s.%s' %(tables[1], k):v})
- # kwargs.update({'%s.%s' %(tables[0], priKeyName):'%s.%s' %(tables[1], priKeyName)})
- c = ''
- if kwargs:
- c = 'WHERE %s' % ' AND '.join(key + '=' + value
- for key, value in kwargs.items())
- return ' '.join(['SELECT', (a or '*'), 'FROM', b, c])
- if __name__ == "__main__":
- pass
- ## import sys
- ## db = DBServer(sys.stdout)
- ##
- ## con = db.Login('genesis', 'joe', 'password')
- ## print con
- ## print db.IsOpen(con)