Package biana :: Package BianaDB :: Module ConnectorDB
[hide private]
[frames] | no frames]

Source Code for Module biana.BianaDB.ConnectorDB

   1  """ 
   2      BIANA: Biologic Interactions and Network Analysis 
   3      Copyright (C) 2009  Javier Garcia-Garcia, Emre Guney, Baldo Oliva 
   4   
   5      This program is free software: you can redistribute it and/or modify 
   6      it under the terms of the GNU General Public License as published by 
   7      the Free Software Foundation, either version 3 of the License, or 
   8      (at your option) any later version. 
   9   
  10      This program is distributed in the hope that it will be useful, 
  11      but WITHOUT ANY WARRANTY; without even the implied warranty of 
  12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
  13      GNU General Public License for more details. 
  14   
  15      You should have received a copy of the GNU General Public License 
  16      along with this program.  If not, see <http://www.gnu.org/licenses/>. 
  17   
  18  """ 
  19   
  20  """ 
  21   File       : ConnectorDB.py, derived from file PianaDB.py from the PIANA project 
  22   Author     : Javier Garcia & Emre Guney, based on R. Aragues & D. Jaeggi script 
  23   Creation   : 2003 
  24   Contents   : class for establishing conexions to BianaDatabase and handling inserts and selects 
  25   Called from: BianaDBaccess.py 
  26  ======================================================================================================= 
  27   
  28  This a generalization of mysql commands 
  29   
  30  To see how this class is used, look at any method in BianaDBaccess.py 
  31   
  32  """ 
  33   
  34  import sets 
  35  import MySQLdb 
  36  import sys 
  37   
  38  DEBUG_BUFFER_INSERT_SINGLE = False # Set True to control queries, will insert each query seperately 
  39  DEBUG_PRINT_INSERT_QUERY = False  # Set True to control queries, will print insert_db_content queries  
  40   
41 -class DB(object):
42 """ 43 Class for establishing conexions to pianaDB and handling inserts and selects 44 """ 45
46 - def __init__(self, dbname=None, dbhost=None, dbuser=None, dbpassword=None, dbport=None, dbsocket=None, buffer=True, lock_tables=False):
47 """ 48 "dbname" is the database name to which you want to connect to (required) 49 50 "dbhost" is the machine with the mysql server that holds the piana database (required) 51 52 "dbuser" is the mysql user (not required in most systems) 53 54 "dbpassword" is the mysql password (not required in most systems) 55 56 "dbport" is the mysql port (not required in most systems) 57 58 "buffer" determines if an insert buffer is going to be used. Default is True. Alert! DB Object must be closed before in order to empty the buffer! 59 Buffer is used due to performance when parsing data 60 61 "lock_tables" is used to lock the used tables 62 """ 63 64 self.dbname = dbname 65 self.dbhost = dbhost 66 self.dbuser = dbuser 67 self.dbpassword = dbpassword 68 self.dbport = dbport 69 self.dbsocket = dbsocket 70 71 # init database connection (different connection parameters depending on user preferences...) 72 if dbsocket is not None or dbsocket=="": 73 if not dbport: 74 if not dbuser is None and not dbpassword is None: 75 self.db = MySQLdb.connect(host= dbhost, user=dbuser, passwd=dbpassword, unix_socket=self.dbsocket) 76 elif not dbuser is None: 77 self.db = MySQLdb.connect(host= dbhost, user=dbuser, unix_socket=self.dbsocket) 78 else: 79 self.db = MySQLdb.connect(host= dbhost, unix_socket=self.dbsocket) 80 else: 81 if not dbuser is None and not dbpassword is None: 82 self.db = MySQLdb.connect(host= dbhost, user=dbuser, passwd=dbpassword, port=dbport, unix_socket=self.dbsocket) 83 elif not dbuser is None: 84 self.db = MySQLdb.connect(host= dbhost, user=dbuser, port=dbport, unix_socket=self.dbsocket) 85 else: 86 self.db = MySQLdb.connect(host= dbhost, port=dbport, unix_socket = self.dbsocket) 87 else: 88 if not dbport: 89 if not dbuser is None and not dbpassword is None: 90 self.db = MySQLdb.connect(host= dbhost, user=dbuser, passwd=dbpassword) 91 elif not dbuser is None: 92 self.db = MySQLdb.connect(host= dbhost, user=dbuser) 93 else: 94 self.db = MySQLdb.connect(host= dbhost) 95 else: 96 if not dbuser is None and not dbpassword is None: 97 self.db = MySQLdb.connect(host= dbhost, user=dbuser, passwd=dbpassword, port=dbport) 98 elif not dbuser is None: 99 self.db = MySQLdb.connect(host= dbhost, user=dbuser, port=dbport) 100 else: 101 self.db = MySQLdb.connect(host= dbhost, port=dbport) 102 103 104 # Not necessary to do autocommit as the Engine selected for tables is MyISAM (MyISAM does not accept commit) 105 # If anytime this changes, it will be necessary to activate autocommit or to do commit at each parser 106 #self.db.autocommit(1) 107 108 self.cursor = self.db.cursor() 109 110 self.dbmaxpacket = self._get_max_packet() 111 self.lock_frequency = 100 #20000 112 self.current_lock_num = 0 113 self.is_locked = False 114 self.locked_tables = sets.Set() 115 self.locked_tables_win = sets.Set() # The same as self.locked_tables, but corrected for windows (as table names are case insensitive and they usually are lowercased) 116 self.lock_tables = lock_tables 117 118 # Set dbmaxpacket 119 #self.insert_db_content("SET SESSION max_allowed_packet=16777216") 120 #self.insert_db_content("SET SESSION max_allowed_packet=4777216") 121 #self.insert_db_content("SET SESSION max_allowed_packet=1000000") 122 #self.insert_db_content("SET SESSION max_allowed_packet=1047552") 123 124 if buffer is True and self.dbmaxpacket is not None: 125 self.uses_buffer = True 126 self.insert_buffer = Buffer(self.dbmaxpacket,self) 127 self.autoincrement_values = {} 128 else: 129 self.uses_buffer = False 130 self.insert_buffer = None 131 132 if( dbname is not None ): 133 self.select_db_content("use "+dbname) 134 135 self.table_names = None 136 137 return
138 139 140 # -- 141 # methods needed for using pickle with piana objects 142 # -- 143
144 - def __getstate__(self):
145 146 odict = self.__dict__.copy() # copy the dict since we are going to change it 147 del odict['db'] # remove conexion to MySQL: attribute self.db cannot be pickled 148 return odict
149
150 - def __setstate__(self, dict):
151 152 self.__dict__ = dict # recover previous dictionary 153 dict.__class__.__init__(dict) # recover conexion to MySQL by calling init
154
155 - def __getnewargs__(self):
156 157 return (self.dbname, self.dbhost, self.dbuser, self.dbpassword) # returns arguments that init will get
158 # when it is called after the pickle.load 159
160 - def __str__(self):
161 162 return "Connection to database %s on %s as %s" %(self.dbname, self.dbhost, self.dbuser)
163 164
165 - def close(self):
166 """ 167 Closes the connection with the database 168 """ 169 self._unlock_tables() 170 self.cursor.close() 171 self.db.close()
172
173 - def check_consistency_with_given_source_version(self, source_code_version):
174 #print self._get_source_code_version_in_db(), source_code_version 175 if self._get_source_code_version_in_db() is None: # newly created database 176 self._set_source_code_version_in_db(source_code_version) 177 return True 178 elif self._get_source_code_version_in_db().lower() == source_code_version.lower(): # check code version 179 return True 180 return False
181 182 # -- 183 # handling inserts and selects through a generalized class 184 # -- 185
186 - def insert_db_content(self, sql_query, answer_mode = None):
187 """ 188 Inserts values into a piana database (connection was established in self.db) 189 190 Depending on argument "answer_mode", different things are returned. 191 192 This method is called by PianaDBaccess to then process the results and return them to the user 193 194 "slq_query" is normally obtained through classes implemented in PianaInsertSQL.py, which have a method get_sqlquery 195 that creates the sql query needed to retrieve the searched value. 196 197 "answer_mode" can be one of the following: 198 199 - None: nothing is returned 200 - 'last_id' : last id inserted is returned 201 - 'num_updated' : number of rows that were updated (used by UPDATE statements...) 202 203 --> 'last_id' mode only works for those tables that have an auto_increment ID!!!!!!! Will not work with primary keys that are not 204 auto_increment. Currently, following tables have auto_increment ids: protein (ID=proteinPiana) and interaction (ID=interactionPiana) 205 206 "buffer" indicates if this sql_query has to be inserted or not. It can be: 207 - None: sql_query will be treated as a sql_query and it will be executed (if possible) 208 - dictionary: the insert has been inserted into the dictionary buffer. So, the sql_query will not be executed. (So, sql_query can be None too. It will be ignored. 209 210 """ 211 212 if sql_query is None: 213 return None 214 215 # Checks lock frequency 216 #self._check_lock_frequency() # ENTERS INTO A LOOP 217 218 if isinstance(sql_query,list): 219 for actual_query in sql_query: 220 if DEBUG_PRINT_INSERT_QUERY: 221 sys.stderr.write(actual_query+"\n") 222 try: 223 if( actual_query != "" ): 224 self.cursor.execute(actual_query) 225 except Exception, inst: 226 sys.stderr.write("Attention: this query was not executed due to a mysql exception: <<%s>>\n" %(actual_query)) 227 sys.stderr.write(" Error Reported: %s\n" %(inst)) 228 answer_mode = None 229 raise ValueError(inst) 230 return None 231 232 233 if sql_query is not None: 234 if DEBUG_PRINT_INSERT_QUERY: 235 sys.stderr.write("%s\n" %sql_query) 236 try: 237 #sys.stderr.write("Sending query to MySQL server (%s)...\n" %(sql_query) ) 238 # In order to avoid any insertion 239 #pass 240 self.cursor.execute(sql_query) 241 except Exception, inst: 242 sys.stderr.write("Attention: this query was not executed due to a mysql exception: <<%s>>\n" %(sql_query)) 243 sys.stderr.write(" Error Reported: %s\n" %(inst)) 244 raise ValueError(inst) 245 answer_mode = None 246 #else: 247 # raise ValueError("Trying to execute an empty insertion sqlquery") 248 249 if answer_mode == "last_id": 250 # in case mode "last_id" was chosen, do a select to the database to obtain the last autoincrement id inserted 251 252 self.cursor.execute("""Select LAST_INSERT_ID()""") 253 answer = self.cursor.fetchall() 254 255 if answer: 256 return answer[0][0] 257 else: 258 return None 259 260 elif answer_mode == "num_updated": 261 # in case mode "num_updated" was chosen, the number of columns that were updated will be in the answer of the sql query 262 answer = self.cursor.fetchall() 263 if not answer: 264 return None 265 else: 266 return answer[0] 267 268 else: 269 return None 270 # END OF else: (elif answer_mode == "num_updated":) 271 272 sys.stderr.write("Query executed!\n")
273 274
275 - def select_db_content(self, sql_query= None, answer_mode="single", remove_duplicates="yes", number_of_selected_elems= 1):
276 """ 277 : Returns content from a piana database (connection was established in self.db) 278 279 "slq_query" is normally obtained through classes implemented in PianaSelectSQL.py, which have a method get_sqlquery 280 that creates the sql query needed to retrieve the searched value 281 282 "answer_mode" is used to let the user choose what kind of answer he is expecting from the sql query 283 answer_mode can take values (default is "single"): 284 - "single": just one element (if nothing is found, returns None) 285 - "list": a list of single elements (if nothing is found, returns empty list []) 286 - "raw": the raw result from the sql query (a matrix) 287 288 "remove_duplicates" is used to let the user decide if the returning list can contain duplicates or not 289 (only used when answer_mode="list") 290 (this is useful to remove similar entries from one query, for example same uniprot accession numbers returned 291 that are actually the same under different swissAccession source DB ) 292 293 - "yes" will return a list where all elements are unique 294 - "no" will return the complete list returned by the sql query 295 296 "number_of_selected_elems" sets the number of elements being selected in the sql query. 297 - If 1, only the string of the element is returned. 298 - If >1, each group of elements is represented by a tuple (elem1, elem2, ...) 299 - if you want to use a number_of_selected_elems higher than 3, you have to modify the code below 300 to create keys of the number of elements you wish to select 301 """ 302 303 if sql_query is not None: 304 try: 305 self.cursor.execute(sql_query) 306 answer = self.cursor.fetchall() 307 except Exception, inst: 308 sys.stderr.write("Attention: this query was not executed due to a mysql exception: <<%s>>\n" %(sql_query)) 309 sys.stderr.write(" Error Reported: %s\n" %(inst)) 310 answer = None 311 #print self._get_max_packet() 312 raise ValueError(inst) 313 else: 314 raise ValueError("Trying to execute an empty selection sqlquery") 315 316 # when fetchall doesn't find anything, it returns an empty object. 317 # if something was found, transform to answer_mode requested by user 318 319 # TO DO!!!! Speed up things by creating a dictionary with those values that have been already inserted... 320 # that will be much faster than checking on a list!!! 321 322 if answer: 323 #sys.stderr.write("Answer: %s Answer[0]: %s Answer[0][0]: %s\n" %(answer, answer[0], answer[0][0]) ) 324 # something was found: each answer_mode returns a different "something" 325 if answer_mode == "single": 326 if number_of_selected_elems == 1: 327 return answer[0][0] 328 else: 329 return answer[0] 330 331 elif answer_mode == "list": 332 # Optimization? I don't know... Done by Javi. It is not necessary to sort to form the key (MySQL always will return the result in the same order...) 333 # Furthermore, in many tables data is sorted in some ways (for example, in interactionTable proteinPianaA and proteinPianaB are sorted... 334 # Take into account this! 335 336 if number_of_selected_elems == 1: 337 temporal_list = [element[0] for element in answer] 338 if remove_duplicates=="yes": 339 return list(sets.Set(temporal_list)) 340 else: 341 return temporal_list 342 343 else: 344 # if there are more than one selected elements: 345 # -> 346 # 347 temporal_list = list(answer) 348 349 if remove_duplicates=="yes": 350 #return dict(map(None,temporal_list,[None])).keys() 351 return dict(map(None,temporal_list,[])).keys() 352 else: 353 return temporal_list 354 355 elif answer_mode == "raw": 356 return answer 357 else: 358 raise ValueError("answer_mode value is not correct") 359 # END OF if answer: 360 361 else: 362 # nothing was found: each answer_mode returns a different "nothing" 363 if answer_mode == "single": 364 return None 365 elif answer_mode == "list": 366 return [] 367 elif answer_mode == "raw": 368 return answer 369 else: 370 raise ValueError("answer_mode value is not correct")
371 # END OF else: (if answer:) 372 373
374 - def add_autoincrement_columns(self, table, attribute):
375 376 if self.uses_buffer is False: 377 raise ValueError("Cannot add an autoincrement column if buffer is not being used") 378 379 if self.autoincrement_values.has_key((table,attribute)): 380 raise ValueError("Trying to add twice the same autoincrement column") 381 382 self.autoincrement_values[(table,attribute)] = None
383 384
385 - def get_next_autoincrement(self, table, attribute ):
386 387 self._check_locked_table(table) 388 389 if self.is_locked is False: 390 raise ValueError("Trying to get an autoincrement value without locking the table") 391 392 if self.autoincrement_values[(table,attribute)] is None: 393 self.autoincrement_values[(table,attribute)] = self._get_current_autoincrement(table, attribute) 394 395 self.autoincrement_values[(table,attribute)] += 1 396 397 return self.autoincrement_values[(table,attribute)]
398
399 - def _get_last_stable_autoincrement(self, table, attribute):
400 return self.select_db_content(self._get_select_sql_query(tables=["BianaDatabase"], columns=["last_"+attribute]))
401
402 - def _get_current_autoincrement(self, table, attribute ):
403 404 if self.is_locked is False: 405 raise ValueError("Trying to get an autoincrement value without locking the table") 406 407 if self.autoincrement_values[(table,attribute)] is None: 408 #max_value = self._get_max_value(table,attribute) 409 #if max_value is None: 410 # return 0 411 #else: 412 # return max_value 413 #max_value = self.select_db_content("SELECT %s FROM %s" % (attribute, table), answer_mode="single" ) 414 max_value = self._get_last_stable_autoincrement( table = table, 415 attribute = attribute ) 416 #print max_value 417 return max_value 418 else: 419 return self.autoincrement_values[(table,attribute)]
420 421
422 - def set_lock_tables(self, value):
423 self.lock_tables = value
424
425 - def _check_locked_table(self, table):
426 if self.lock_tables is True: 427 if table.lower() not in self.locked_tables_win: 428 self.locked_tables.add(table) 429 self.locked_tables_win.add(table.lower()) 430 self._unlock_tables() 431 self._lock_tables()
432
434 """ 435 Fetches value in source_code_version field in BianaDatabase table 436 """ 437 table = "BianaDatabase" 438 column = "source_code_version" 439 self._check_locked_table(table) 440 return self.select_db_content( "SELECT %s FROM %s" % (column, table), answer_mode="single" )
441
442 - def _set_source_code_version_in_db(self, source_code_version):
443 """ 444 Sets value in source_code_version field in BianaDatabase table 445 """ 446 table = "BianaDatabase" 447 column = "source_code_version" 448 self._check_locked_table(table) 449 return self.insert_db_content( "UPDATE %s set %s=\"%s\"" % (table, column, source_code_version) )
450
451 - def _get_max_value(self, table, column):
452 """ 453 OBSOLETE - last_ fields in BianaDatabase is used now 454 """ 455 456 self._check_locked_table(table) 457 return self.select_db_content( "SELECT MAX(%s) FROM %s" %(column,table), answer_mode="single" )
458
459 - def _get_max_packet(self):
460 """ 461 Returns the maximum packet size that the sql server accepts 462 """ 463 464 query = "show variables like \"max_allowed_packet\"" 465 466 maxpacket_res = self.select_db_content(sql_query= query, answer_mode="raw", remove_duplicates="yes", number_of_selected_elems= 1) 467 468 if maxpacket_res: 469 return maxpacket_res[0][1] 470 else: 471 sys.stderr.write("Unknown database maximum packet size") 472 return None
473 474
475 - def check_database(self, database, ignore_primary_keys=False, verbose=False):
476 """ 477 Method for checking database 478 479 It checks if all tables are existing 480 481 It does not drop any table 482 483 For the moment, it does not check default value of table fields 484 """ 485 486 #return 487 488 existing_tables_str = sets.Set(self._get_table_names()) 489 490 if( sys.platform.lower()=="win32" or sys.platform.lower()=="win64" ): 491 existing_tables_str = sets.Set([ x.lower() for x in existing_tables_str ]) 492 493 checked_tables_str = sets.Set() 494 495 database_tables = database.get_tables() 496 497 for current_table_obj in database_tables: 498 current_table_str = current_table_obj.get_table_name() 499 500 if( sys.platform.lower()=="win32" or sys.platform.lower()=="win64" ): 501 current_table_str = current_table_str.lower() 502 503 checked_tables_str.add(current_table_str) 504 if current_table_str not in existing_tables_str: 505 # Table does not exist. Create it 506 self.insert_db_content( sql_query = current_table_obj.create_mysql_query(ignore_primary_key=ignore_primary_keys) ) 507 else: 508 # Table exists. Check column names & types 509 existing_columns = self.select_db_content( sql_query = "DESC %s" % current_table_str, 510 answer_mode = "raw" ) 511 # It should be: 512 # 0.- field name 513 # 1.- Type 514 # 2.- Null 515 # 3.- Key 516 ## differantiating between table column (info comes from database) and table field (info comes from database.py) 517 column_dictionary = {} 518 for current_column in existing_columns: 519 column_name = current_column[0].lower() 520 ## !!mysql (at least 5.0.22) automatically converts int->int(10), smallint->smallint(5), float(4)->float and text(xxx)->text, bool->tinyint(1) 521 column_type = current_column[1].lower().replace("smallint(5)", "smallint").replace("int(10)", "int").replace("float(4)", "float").replace("integer", "int").replace("'","\"").strip() 522 if current_column[2].startswith("NO"): #find("NO") != -1: 523 column_null = False 524 elif current_column[2].startswith("YES"): 525 column_null = True 526 else: 527 print "Warning: NULL column not recognized: ", current_column[2] 528 column_extra = current_column[5].lower() 529 if column_extra != "": 530 column_type += " " + column_extra 531 column_dictionary[column_name] = (column_type, column_null) 532 table_fields = current_table_obj.get_fields() 533 for current_field in table_fields: 534 field_name = current_field.field_name.lower() 535 field_type = current_field.data_type.lower().replace("integer", "int").replace("float(4)", "float").replace("bool","tinyint(1)").replace("'","\"").replace("default 0","").strip() 536 field_null = current_field.null 537 ## check whether field name is existing 538 if column_dictionary.has_key(field_name): 539 ## check whether field data is consistent 540 (column_type, column_null) = column_dictionary[field_name] 541 if field_type != column_type or field_null != column_null: 542 if field_null == column_null and column_type == "text": 543 continue 544 #if verbose: 545 # print "NEW: ",field_type, column_type, field_null, column_null 546 if field_null: 547 null_str = "NULL" 548 else: 549 null_str = "NOT NULL" 550 self._check_locked_table( current_table_str ) 551 query_str = "ALTER TABLE %s MODIFY %s %s %s" % (current_table_str, current_field.field_name, field_type, null_str) 552 self.insert_db_content( sql_query = query_str ) 553 #print field_type, column_type, field_null, column_null, query_str 554 if verbose: 555 sys.stderr.write("%s\n" %query_str) 556 else: 557 if field_null: 558 null_str = "NULL" 559 else: 560 null_str = "NOT NULL" 561 self._check_locked_table( current_table_str ) 562 query_str = "ALTER TABLE %s ADD %s %s %s" % (current_table_str, current_field.field_name, field_type, null_str) 563 self.insert_db_content( sql_query = query_str ) 564 #print field_type, column_type, field_null, column_null, query_str 565 if verbose: 566 #print "NEW: ",field_type, column_type, field_null, column_null 567 sys.stderr.write("%s\n" %query_str) 568 569 checked_tables_str = existing_tables_str - checked_tables_str 570 if len(checked_tables_str)>0: 571 for db_str in checked_tables_str: 572 if not db_str.lower().startswith("userentityunification_protocol_") and not db_str.lower().startswith("key_attribute_"): 573 print "Warning: Following tables are not encoded in source: ",db_str 574 return
575 576
577 - def GetTableNames(self):
578 """ 579 Generates the SQL statement that gets all database table names 580 """ 581 582 return """ SHOW TABLES """
583 584
585 - def _get_select_sql_query(self,tables,columns=None,fixed_conditions=None,join_conditions=None, group_conditions=None, distinct_columns=False):
586 """ 587 Generates a general select sql statement 588 589 "tables" is a list or tuple of tables where the value/s must be searched. If the elements of the list or tuple are tuples of length 2, the format taken will be the following: 590 591 (table_name or table_object, alias to the table) 592 593 "columns" is a list or tuple of the columns searched (columns must be preceeded by the table where they are searched). If it is None, all values will be selected 594 595 "fixed_conditions" is a list or tuple of tuples with the following format: (column,type,restriction_value) 596 597 "join_conditions" is a list or tuple of tuples with the following format: (column,type,column) to restrict the selection to the joint 598 599 "type" can be "=",">","<",... 600 601 "group_conditions" is a list of columns where it must be grouped 602 603 It returns the sql query. 604 """ 605 606 607 if( fixed_conditions is None or not len(fixed_conditions) ): 608 fixed_conditions_sql = "" 609 else: 610 #fixed_conditions_sql = " AND ".join(["%s %s \"%s\"" %(x[0],x[1],x[2]) for x in fixed_conditions]) 611 fixed_conditions_sql = " AND ".join( [self._get_cond_str(x) for x in fixed_conditions] ) 612 613 if( join_conditions is None or not len(join_conditions) ): 614 join_conditions_sql = "" 615 else: 616 join_conditions_sql = " AND ".join(["%s %s %s" %(x[0],x[1],x[2]) for x in join_conditions]) 617 if fixed_conditions_sql != "": 618 join_conditions_sql = " AND %s" %(join_conditions_sql) 619 620 if( join_conditions or fixed_conditions ): 621 where_sql = " WHERE " 622 else: 623 where_sql = "" 624 625 626 if columns is None: 627 columns_sql = "*" 628 else: 629 columns_list = [] 630 for current_column in columns: 631 if( isinstance(current_column, tuple) ): 632 columns_list.append("%s AS %s" %(current_column)) 633 else: 634 columns_list.append(current_column) 635 columns_sql = ",".join(columns_list) 636 637 # tranform table objects to table name strings 638 #tables = [ "%s" %(x) for x in tables ] 639 # Check if the tables are locked in case it is necessary 640 tables_list = [] 641 for actual_table in tables: 642 if( isinstance(actual_table,tuple) ): 643 tables_list.append("%s AS %s " %(actual_table[0],actual_table[1]) ) 644 self._check_locked_table(str(actual_table[1])) 645 else: 646 tables_list.append("%s" %actual_table) 647 self._check_locked_table(str(actual_table)) 648 649 if group_conditions is not None and len(group_conditions)>0: 650 group_conditions_sql = "GROUP BY %s" %(",".join(group_conditions)) 651 else: 652 group_conditions_sql = "" 653 654 if distinct_columns: 655 distinct_str = "DISTINCT" 656 else: 657 distinct_str = "" 658 659 return """SELECT %s %s FROM %s %s %s %s %s""" %(distinct_str, 660 columns_sql, 661 ",".join(tables_list), 662 where_sql, 663 fixed_conditions_sql, 664 join_conditions_sql, 665 group_conditions_sql)
666
667 - def _get_delete_sql_query(self, table, fixed_conditions=None):
668 """ 669 Generates a general delete sql statement 670 671 "table" is a table name or table object 672 673 "fixed_conditions" is a list or tuple of tuples with the following format: (column,type,restriction_value) 674 675 "type" can be "=",">","<",... 676 677 It returns the sql query. 678 """ 679 680 self._check_locked_table(table) 681 682 if( fixed_conditions is None or not len(fixed_conditions) ): 683 fixed_conditions_sql = "" 684 else: 685 fixed_conditions_sql = " AND ".join( [self._get_cond_str(x) for x in fixed_conditions] ) 686 687 if fixed_conditions: 688 where_sql = " WHERE " 689 else: 690 where_sql = "" 691 692 return """DELETE FROM %s %s %s""" %(table, 693 where_sql, 694 fixed_conditions_sql)
695 696
697 - def _get_union_queries(self, queries):
698 """ 699 "queries" is the list of queries to make the union 700 """ 701 702 return " UNION ".join(queries)
703 704
705 - def _get_cond_str(self, conditions):
706 707 if len(conditions) == 4: 708 return "%s %s %s" %(conditions[0],conditions[1],conditions[2]) 709 else: 710 return "%s %s \"%s\"" %(conditions[0],conditions[1],conditions[2])
711 712 713 #### 714 # INSERT RELATED METHODS 715 #### 716
717 - def _get_nested_insert_sql_query(self, table, columns, subquery):
718 719 self._check_locked_table(table) 720 721 return """INSERT INTO %s (%s) (%s)""" %(table, ",".join(columns), subquery)
722 723
724 - def _get_insert_sql_query(self,table,column_values,special_column_values=[],use_buffer=True, max_elements_in_buffer=None, on_duplicate_key="IGNORE"):
725 """ 726 Generates a general sql statement 727 728 "column_values" must be a tuple of tuples with the following format: (column, value) 729 730 It returns the sql query. It does not use the buffer! 731 732 By default, if the buffer is active, it will use the buffer, unless the "use_buffer" atributte is set to False 733 734 It can return a list of queries instead a single query in the case the buffer is being used 735 736 "max_elements_in_buffer" is only used when buffer is used. It is not mandatory 737 738 "on_duplicate_key" specifies the query to be used when duplicate keys exists 739 740 """ 741 742 # tranform table objects to table name 743 table = "%s" %(table) 744 745 self._check_locked_table(table) 746 747 columns = [] 748 values = [] 749 for x in column_values: 750 columns.append(x[0]) 751 if x[1] is None: 752 raise ValueError("Trying to insert a None in table %s" %(table)) 753 if isinstance(x[1],unicode): 754 # Transform to ascii 755 value = x[1].encode('ascii','replace') 756 else: 757 value = str(x[1]) 758 values.append("\"%s\"" %value.replace('\\','\\\\').replace('"','\\"')) 759 760 #columns, values = zip(*column_values) 761 762 #columns = list(columns) 763 764 # Get real values 765 #values = [ "\"%s\"" %str(x).encode('ascii','replace').replace('\\','\\\\').replace('"','\\"') for x in values ] 766 767 if len(special_column_values)>0: 768 ncolumns, nvalues = zip(*special_column_values) 769 columns.extend(ncolumns) 770 values.extend(nvalues) 771 del ncolumns 772 del nvalues 773 774 if on_duplicate_key is None or on_duplicate_key == "IGNORE": 775 dupl_key = "" 776 else: 777 dupl_key = "ON DUPLICATE KEY UPDATE %s" %(on_duplicate_key) 778 779 if on_duplicate_key == "IGNORE": 780 ignore = "IGNORE" 781 else: 782 ignore = "" 783 784 if self.insert_buffer is None or use_buffer==False: 785 return """INSERT %s INTO %s (%s) VALUES (%s) %s""" %(ignore, 786 table, 787 ",".join(columns), 788 ",".join(values), 789 dupl_key) 790 else: 791 return self.insert_buffer.insert2buffer( key = self._get_buffer_key(table = table, 792 columns = columns), 793 table = table, 794 columns = columns, 795 values = tuple(values), 796 max_elements_in_buffer = max_elements_in_buffer )
797 798
799 - def _get_buffer_multiple_queries(self, key_buffer=None):
800 """ 801 Returns all queries of insert buffer and empties it 802 803 If key_buffer is None, it generates all the queries to empty the buffer. Otherwise, it will insert only the key specified 804 """ 805 806 return_queries = [] 807 808 if self.insert_buffer is None: 809 raise ValueError("To execute insert buffer into database buffer object cannot be \"None\". First it is ncessary to create a buffer Object") 810 else: 811 if key_buffer is None: 812 # Empties all buffer 813 list_of_buffer_keys = self.insert_buffer.get_buffer().keys() 814 else: 815 # Empties only the key_buffer requested 816 list_of_buffer_keys = [key_buffer] 817 818 for actual_key in list_of_buffer_keys: 819 820 bufferElement = self.insert_buffer.get_buffer()[actual_key] 821 822 if bufferElement.num_elements>0: 823 824 if actual_key[0]=='I' and actual_key[1]=='U' and actual_key[2]=='_': 825 return_queries.append( self._get_multiple_insert_query( table = bufferElement.getTable(), 826 columns=bufferElement.getColumns(), 827 values=bufferElement.getValues(), 828 externalDatabaseID = bufferElement.getSourceDBID() ) ) 829 830 else: 831 return_queries.append( self._get_multiple_insert_query( table=bufferElement.getTable(), 832 columns=bufferElement.getColumns(), 833 values=bufferElement.getValues()) ) 834 835 # It not should be here... 836 bufferElement.restart_bufferElement() 837 838 839 return return_queries
840 841
842 - def _get_multiple_insert_query(self, table, columns, values):
843 """ 844 "columns" must be a tuple with the name of the columns 845 846 "values" must be a Set of tuples of values to insert 847 """ 848 849 column_separator=", " 850 values_separator=", " 851 852 values_string = [ "(%s)" %(values_separator.join(x)) for x in values ] 853 854 # tranform table objects to table name 855 table = "%s" %(table) 856 857 sqlquery = "INSERT IGNORE INTO %s (%s) VALUES %s" %(table, 858 column_separator.join(columns), 859 column_separator.join(values_string)) 860 861 return sqlquery
862 863
864 - def _uses_buffer(self):
865 """ 866 Returns False if it is not using buffer 867 868 Otherwise return true 869 """ 870 871 if self.insert_buffer is None: 872 return False 873 else: 874 return True
875 876
877 - def _get_buffer_key(self, table, columns):
878 """ 879 columns must be a list of the columns 880 """ 881 882 return "%s%s" %(table,str(columns))
883 884
885 - def _lock_all_tables(self):
886 887 self._unlock_tables() 888 889 all_tables = self._get_table_names() 890 891 self._lock_tables( table_list = all_tables ) 892 893 [ self.locked_tables.add(x) for x in all_tables ] 894 [ self.locked_tables_win.add(x.lower()) for x in all_tables ]
895 896
897 - def _lock_tables(self, table_list=None):
898 """ 899 Method used to Lock tables. Insertions are faster if tables are previously locked. 900 901 table_list is a list of table names 902 """ 903 904 if self.is_locked: 905 raise ValueError("Trying to lock tables when there are locked tables yet") 906 907 self.is_locked = True 908 909 if table_list is None: 910 table_list = list(self.locked_tables) 911 else: 912 [ self.locked_tables.add(x) for x in table_list ] 913 [ self.locked_tables_win.add(x.lower()) for x in table_list ] 914 915 if( len(table_list)>0 ): 916 self.insert_db_content( sql_query = """ LOCK TABLES %s """ %( " WRITE, ".join(table_list)+" WRITE ") )
917 918 # COMMENTED BECAUSE VERY SLOW WHEN ADDING A NEW DATABASE WITH INSERTIONS IN LOTS OF TABLES... 919 # IT SHOULD BE SUBSTITUTED BY: 920 # Locking all tables 921 # Put a variable telling than database is being used for insertions and then should not be used again for insertions 922 #for current_autoincrement_value in self.autoincrement_values: 923 # if current_autoincrement_value[0] in self.locked_tables: 924 # self.autoincrement_values[current_autoincrement_value] = self._get_max_value(current_autoincrement_value[0],current_autoincrement_value[1]) 925 926
927 - def _unlock_tables(self):
928 """ 929 Generates the SQL statement that unlocks tables previously locked 930 931 Method used to return the SQL query that locks access to mysql tables. Insertions are faster if tables are previously locked. 932 933 table_list is a list of table names 934 """ 935 936 if self._uses_buffer(): 937 self._empty_buffer() 938 939 self.insert_db_content( sql_query = """UNLOCK TABLES """ ) 940 941 self.is_locked = False
942
943 - def _get_table_names(self):
944 945 if self.table_names is None: 946 self.table_names = self.select_db_content( self.GetTableNames(), answer_mode="list" ) 947 948 return self.table_names
949 950
951 - def _disable_indices(self, table_list=None):
952 953 if table_list is None: 954 table_list = self._get_table_names() 955 else: 956 if len(table_list)==0: 957 table_list = self._get_table_names() 958 959 if len(table_list)==0: 960 raise ValueError("Error: there are no tables in the database.") 961 962 [ self._check_locked_table(str(actual_table)) for actual_table in table_list ] 963 964 self.insert_db_content( [ "ALTER TABLE %s DISABLE KEYS" %x for x in table_list ] ) 965 966 return
967
968 - def _enable_indices(self, table_list=None):
969 970 if table_list is None: 971 table_list = self._get_table_names() 972 else: 973 if len(table_list)==0: 974 table_list = self._get_table_names() 975 976 [ self._check_locked_table(str(actual_table)) for actual_table in table_list ] 977 978 if len(table_list)==0: 979 raise ValueError("Error: there are no tables in the database.") 980 981 self.insert_db_content( [ "ALTER TABLE %s ENABLE KEYS" %x for x in table_list ] ) 982 983 return
984
985 - def _get_update_sql_query(self, table, update_column_values, fixed_conditions=None):
986 """ 987 """ 988 989 if fixed_conditions is None: 990 fixed_conditions_sql = "" 991 else: 992 fixed_conditions_sql = " AND ".join( [self._get_cond_str(x) for x in fixed_conditions] ) 993 994 if fixed_conditions: 995 where_sql = " WHERE " 996 else: 997 where_sql = "" 998 999 update_values = ",".join( ["%s=\"%s\"" %(x[0],x[1]) for x in update_column_values ] ) 1000 1001 #print """UPDATE %s SET %s %s %s""" %(table, update_values,where_sql,fixed_conditions_sql) 1002 1003 return """UPDATE %s SET %s %s %s""" %(table, update_values,where_sql,fixed_conditions_sql)
1004 1005 1006 #################################################################################### 1007 # GENERAL METHODS OF DATABASE CONTROL (LOCKS, CLOSE,...) # 1008 #################################################################################### 1009
1010 - def set_lock_frequency(self,frequency_value):
1011 """ 1012 Method to change the lock/unlock frequency (used only in parsers, to speed up insertions and deletions 1013 """ 1014 self.lock_frequency = frequency_value
1015 1016
1017 - def _check_lock_frequency(self):
1018 1019 if self.current_lock_num == self.lock_frequency: 1020 # Unlocking tables 1021 print "Unlocking and locking tables" 1022 self._unlock_tables() 1023 self.current_lock_num = 0 1024 self._lock_tables(self.locked_tables) 1025 else: 1026 self.current_lock_num += 1
1027 1028
1029 - def _get_drop_sql_query(self, table_list):
1030 1031 for x in table_list: 1032 self._check_locked_table(str(x)) 1033 1034 # Remove dropped tables from locked tables 1035 for x in table_list: 1036 self.locked_tables.discard(x) 1037 self.locked_tables_win.discard(x.lower()) 1038 1039 return ["DROP TABLE %s" %x for x in table_list ]
1040 1041
1042 - def _empty_buffer(self):
1043 """ 1044 It only can be used if insert buffer is being used 1045 """ 1046 1047 # First, obtain the list of all queries to insert 1048 queries = self._get_buffer_multiple_queries() 1049 1050 for actual_query in queries: 1051 self.insert_db_content( actual_query, answer_mode=None )
1052 1053 1054 ## BUFFER RELATED METHODS ## 1055 1056 1057 1058 1059 1060 1061 ############################################### 1062 ## BUFFER RELATED CLASSES ## 1063 ############################################### 1064 1065
1066 -class Buffer(object):
1067 """ 1068 Class used as an buffer for piana Inserts 1069 """
1070 - def __init__(self, max_size, parent):
1071 1072 self.buffer = {} 1073 1074 # Maxim size for each buffer... I take a margin of 50k aprox (it works) 1075 #self.maxsize = int(max_size) - 50000 1076 self.maxsize = int(max_size) - int(int(max_size)*0.05) 1077 1078 #print "Maximum buffer size: ",self.maxsize 1079 1080 # Links it to the db_insert object which is using it 1081 self.db_insert = parent
1082 1083 # Store the number of bytes (caracters) saved into the buffer 1084 #self.bytes = 0 1085 1086
1087 - def get_buffer(self):
1088 """ 1089 Returns the buffer dictionary 1090 """ 1091 return self.buffer
1092
1093 - def get_buffer_element(self,key):
1094 1095 if self.buffer.has_key(key): 1096 return self.buffer[key] 1097 else: 1098 raise ValueError("Buffer has not Buffer Element with key %s\n" %(key) )
1099 1100 1101
1102 - def insert2buffer( self, key, table, columns, values, max_elements_in_buffer=None ):
1103 """ 1104 Inserts a query into the insert buffer 1105 1106 Alert! Only used for insert buffers!!! 1107 1108 Returns "None" if the query can be added to the buffer or the multiple query associated if it cannot be inserted (because buffer is full) 1109 """ 1110 1111 # Check if exists a buffer element with key "key" 1112 if self.buffer.has_key(key): 1113 # Insert if the values can be inserted in Buffer Element or restart it 1114 if (self.buffer[key]).insert_values(values) is None: 1115 multiple_query = self.db_insert._get_buffer_multiple_queries(key_buffer=key) 1116 self.buffer[key].restart_bufferElement(values) 1117 return multiple_query 1118 else: 1119 # Create a new buffer element 1120 self.buffer[key] = BufferElement(self.maxsize,table,columns,values,max_elements_in_buffer) 1121 1122 # Control to execute one by one 1123 if DEBUG_BUFFER_INSERT_SINGLE: 1124 self.buffer[key].restart_bufferElement(values) 1125 return self.db_insert._get_buffer_multiple_queries(key_buffer=key) 1126 1127 return None
1128 1129
1130 -class BufferElement(object):
1131 """ 1132 Class used in Buffer object to store a buffer element 1133 """ 1134
1135 - def __init__(self,max_size,table,columns,values=None,max_elements_in_buffer=None):
1136 """ 1137 Initializes a new buffer element 1138 1139 "max_size" is the maximum size of the buffer element 1140 1141 "table" is the table name in the database 1142 1143 "columns" is a tuple with the names of the columns 1144 1145 "values" is a tuple with the values to insert 1146 1147 """ 1148 if max_size > 2000000: 1149 max_size = 2000000 1150 self.max_size = max_size 1151 self.table = table 1152 self.columns = columns 1153 self.values = sets.Set() 1154 self.initial_size = len(table)+4 1155 1156 self.size = self.initial_size 1157 1158 self.max_elements_in_buffer = max_elements_in_buffer 1159 1160 self.num_elements = 1 # it stores the number of elements that the buffer contains (because sometimes it is necessary to put a limit) 1161 1162 1163 # Sum the length of the column plus 1 (the comma) 1164 try: 1165 #print self.columns 1166 for i in self.columns: 1167 self.size += len(i)+1 1168 except: 1169 raise 1170 #print self.columns 1171 1172 # Insert the values to the buffer 1173 if values: 1174 self.insert_values(values=values)
1175 1176 #Control: 1177 #self.max_elements_in_buffer=1 1178 1179
1180 - def getTable(self):
1181 1182 return self.table
1183
1184 - def getColumns(self):
1185 1186 return self.columns
1187
1188 - def getValues(self):
1189 1190 return self.values
1191
1192 - def getSize(self):
1193 1194 return self.size
1195
1196 - def insert_values(self,values):
1197 """ 1198 Insert into buffer element a new tuple of values 1199 1200 "values" must be a tuple of values to insert 1201 1202 If the values cannot be inserted because the size has been exceeded, return None 1203 1204 """ 1205 1206 # First, calculate the bytes size of this values 1207 # Sum the length of the value plus the comma plus the 2 parenthesis plus the 2" 1208 # plus the comma between two inserts 1209 actual_size=0 1210 1211 for i in values: 1212 actual_size += len(str(i))+5+2 1213 1214 if self.size+actual_size >= self.max_size: 1215 # values cannot be inserted: buffer size is exceeded 1216 return None 1217 elif self.max_elements_in_buffer is not None and self.num_elements > self.max_elements_in_buffer: 1218 return None 1219 else: 1220 #insert the new tuple of values and increment the size 1221 #self.values[values]=None 1222 self.values.add(values) 1223 self.size += actual_size 1224 1225 self.num_elements += 1 1226 1227 return 1
1228 1229
1230 - def restart_bufferElement(self,values=None):
1231 """ 1232 Restarts a Buffer Element to its initial values and insert the new tuple of values 1233 """ 1234 1235 # Empties the dictionary of tuples of values to insert and reset the size 1236 1237 self.size = self.initial_size 1238 1239 self.values.clear() 1240 1241 self.num_elements = 0 1242 1243 if values is not None: 1244 self.values.add(values) 1245 1246 self.num_elements = 1 1247 1248 # Calculate the bytes size of this values 1249 # Sum the length of the value plus the comma plus the 2 parenthesis plus the 2" 1250 # plus the comma between two inserts 1251 for i in values: 1252 self.size += len(str(i))+5+2
1253 1254 1255 1256 1257 1258 if __name__ == "__main__": 1259 1260 print "Testing ConnectorDB" 1261 1262 1263 connector = DB( dbname="berit_v4", dbhost="localhost", dbuser="root", dbsocket="/home/jgarcia/local/mysql/var/mysql.sock" ) 1264