Latest version: 2.0.4 (from 04-10-2018)
** UPDATE 04-10-2018 **
- Fixed issue with prioritized queue not working
- Fixed issue with commit freezing the server and added so commit is now happening on a different thread.
** UPDATE 15-02-2017 **
- Now uses GameThread instead of Repeat and Delay
- Delaying the queue is still possible tho, through
Syntax: Select all
TSQL.wait(5) # Seconds
If your server requires a remote connection to a database, stacked up queries can cause noticeable lag in your game server (freezing, players twitching) since MySQL doesn't 'really' queue up the queries.
I've made a library to fix this problem, it basically queues up the queries and executes them with a dispatched GameThread. It's inspired from the Sourcemod threaded mysql https://wiki.alliedmods.net/SQL_(SourceMod_Scripting)
Remember that all queries now requires a callback, since they are truely dispatched.
The library:
See bottom for download instructions..
threaded_mysql/__init__.py
Syntax: Select all
"""
Created By Velocity-plus
Github: https://github.com/Velocity-plus/
"""
from listeners.tick import GameThread
from core import echo_console
from traceback import format_exc as traceback_format_exc
from logging import error as logging_error
from sys import exc_info
from queue import Queue
from time import time as timestamp, sleep
import pymysql.cursors
class ThreadedMySQL:
def __init__(self):
# Is the thread running?
self.thread_status = False
# Regular Queue
self._r_queue = Queue()
# Prioitized Queue
self._p_queue = Queue()
self.connection_method = 0
# Issue with print for WINDOWS user set the variable to 'echo_console'
self.error_handling = 'print'
# Show print messages?
self._debug = True
self.wait = 0
def wait(self, delay):
"""
If you for some reason want to delay the queue
:param delay: The delay in seconds
:return:
"""
self.wait = delay
def execute(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function cannot pass fetched data to the callback!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
# We need this later
query_type = 0
# If callback = None assuming no data returned needed
if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}
if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])
def fetchone(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function both execute and fetch data, no need to execute before using this!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
query_type = 1
if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}
# If callback = None assuming no data returned needed
if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])
def fetchall(self, query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
"""
This function both execute and fetch data, no need to execute before using this!
:param query: The SQL query that you want to execute
:param args: If the SQL query have any args
:param callback: The callback for the query
:param data_pack: If you want to pass special information to the callback
:param prioritize: If you have large queues, prioritizing the query can make it skip the queue
before the rest of the queue is finished
:param get_info: If you want information about the query passed to the callback
(such as timestamp, query and prioritized)
:return:
"""
query_type = 2
if get_info:
get_info = {'query': query, 'time': timestamp(), 'prioritized': prioritize}
# If callback = None assuming no data returned needed
if not prioritize:
self._r_queue.put([query, args, callback, data_pack, get_info, query_type])
else:
self._p_queue.put([query, args, callback, data_pack, get_info, query_type])
def complete_task(self, worker, prio=None):
query = worker[0]
args = worker[1]
callback = worker[2]
data_pack = worker[3]
get_info = worker[4]
query_type = worker[5]
try:
if get_info:
get_info['time'] = timestamp() - get_info['time']
if args:
self.cursor.execute(query, args)
else:
self.cursor.execute(query)
if query_type == 0:
if get_info:
if callback:
if data_pack:
callback(data_pack, get_info)
else:
callback(get_info)
else:
if callback:
if data_pack:
callback(data_pack)
else:
callback()
if query_type == 1:
data = self.cursor.fetchone()
if get_info:
if callback:
if data_pack:
callback(data, data_pack, get_info)
else:
callback(data, get_info)
else:
if callback:
if data_pack:
callback(data, data_pack)
else:
callback(data)
if query_type == 2:
data = self.cursor.fetchall()
if get_info:
if callback:
if data_pack:
callback(data, data_pack, get_info)
else:
callback(data, get_info)
else:
if callback:
if data_pack:
callback(data, data_pack)
else:
callback(data)
if prio:
self._p_queue.task_done()
else:
self._r_queue.task_done()
except Exception as SQL_ERROR:
# Possible errors
class_error, actual_error, traceback = exc_info()
format_error = '-' * 64 + '\nExceptions probable cause (SQL Query: {0})\n{1}\nActual Error:\n{2}'.format(query,
class_error,
SQL_ERROR)
if self.error_handling == 'print':
print(format_error)
print('-' * 64)
else:
echo_console(format_error)
echo_console('-' * 64)
logging_error(traceback_format_exc())
def _threader(self):
while self.thread_status:
if self.wait:
sleep(self.wait)
if self._p_queue.empty():
worker = self._r_queue.get()
self.complete_task(worker, prio=False)
else:
worker = self._p_queue.get()
self.complete_task(worker, prio=True)
def _start_thread(self):
# Creates the thread
self.t = GameThread(target=self._threader)
self.t.daemon = True
self.t.start()
def handlequeue_start(self):
"""
This handles the queue, should be stopped on unload
:return:
"""
# Starts the queue
self.thread_status = True # This must be true before the thread can loop
self._start_thread()
def handlequeue_stop(self):
"""
This stops the queue for being processed, while a connection still might be open
no queries can be executed.
:return:
"""
self.thread_status = False
def queue_size(self):
"""
:return: Returns the size of the queue
"""
return self._r_queue.qsize() + self._p_queue.qsize()
def connect(self, host, user, password, db, charset, cursorclass=pymysql.cursors.DictCursor):
"""
Checkout PyMYSQL documentation for complete walkthrough
"""
try:
self.connection = pymysql.connect(host=host,
user=user,
password=password,
db=db,
charset=charset,
cursorclass=cursorclass)
self.cursor = self.connection.cursor()
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [SUCCES] connection was succesfully established.')
else:
echo_console('threaded_mysql: [SUCCES] connection was succesfully established.')
self.connection_method = 1
except:
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [ERROR] Not possible to make a connection.')
else:
echo_console('threaded_mysql: [ERROR] Not possible to make a connection.')
def connect_use(self, connection):
"""
If you created your connection elsewhere in your code, you can pass it to Threaded MySQL
:param connection: Your connection socket
:return:
"""
try:
self.connection = connection
self.cursor = self.connection.cursor()
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [SUCCES] Cursor created succesfully for your connection.')
else:
echo_console('threaded_mysql: [SUCCES] Cursor created succesfully for your connection.')
self.connection_method = 2
except:
if self._debug:
if self.error_handling == 'print':
print('threaded_mysql: [ERROR] Not possible to create cursor.')
else:
echo_console('threaded_mysql: [ERROR] Not possible to create cursor.')
def commit(self):
"""
Regular pymysql commit
:return:
"""
self.connection.commit()
def close(self, finish_queue_before_close=False):
"""
Closes the mysql connection
:param finish_queue_before_close: Finishes the queue before it terminates the connection
:return:
"""
if finish_queue_before_close:
while self.queue_size() > 0:
pass
else:
self.connection.close()
else: self.connection.close()
Documentation
The library works as an extension of PyMYSQL, so as any other MySQL script, a connection must be established to a database, but before we can do that, let us initialize the class for threaded MySQL.
Syntax: Select all
from threaded_mysql import ThreadedMySQL
TSQL = ThreadedMySQL()
After we have initialized our class, we can connect to our MySQL database, you can use Threaded MySQL to connect to your database.
Syntax: Select all
# Available paramenters (host, user, password, db ,charset, cursorclass)
TSQL.connect(host='localhost', user='root', password='123', db='utf8')
If you don't want to connect with Threaded MySQL you can make your connection elsewhere and pass it to Threaded MySQL as shown below with PyMYSQL:
Syntax: Select all
import pymysql.cursors
connection = pymysql.connect(host="localhost",
user="root",
password="123",
db="test",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor)
TSQL.connect_use(connection)
Now that our connection has been made, we need to start the gamethread that handles the queue of queries, as seen below.
Syntax: Select all
TSQL.handlequeue_start()
Finally, now we can make use of it. The functions available are listed below
Syntax: Select all
# Different types of queries
TSQL.execute(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
TSQL.fetchone(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
TSQL.fetchall(query, args=None, callback=None, data_pack=None, prioritize=False, get_info=False):
# Returns the size of the queue
TSQL.queue_size()
#If you want to delay the queue for a specific amount time, 1 being 1 seconed
TSQL.wait(delay)
# Refreshes the tables
TSQL.commit()
# Closes the mysql connection
TSQL.close()
It's important to note that when using the fetchone or fetchall it will execute the query BEFORE fetching it, so no need to use TSQL.execute when you want to fetch something.
If you want to grab the data from fetchone or fetchall a callback is necessary. To demonstrate this look at the code examples below:
Code examples (Updated)
Syntax: Select all
from messages import SayText2
from events import Event
from threaded_mysql import ThreadedMySQL
# Initializes the class
TSQL = ThreadedMySQL()
# Connects to a mysql database
TSQL.connect('localhost', 'root', '123', 'my_database', 'utf8')
# Starts the queuehandler (should only be called once)
TSQL.handlequeue_start()
# The callback from !fetchone
def sql_callback(data):
name = data['name']
SayText2(name).send()
# The callback from !fetchall
def sql_callback_2(data, data_pack):
text = data_pack['text']
SayText2("You wrote {}".format(text)).send()
for x in data:
name = x['name']
SayText2('Name: {}'.format(name)).send()
# The callback from !info
def sql_callback_3(get_info):
"""
get_info includes 'query', 'time', 'prioritized'
"""
query = get_info['query']
time = get_info['time']
prio = get_info['prioritized']
SayText2('Query: {0}\nTime: {1} seconds\nPrioritized: {2}'.format(query, time, prio)).send()
@Event('player_say')
def on_player_say(game_event):
# What did the player write
text = game_event['text']
if text == '!fetchone':
# Fetches all the names
TSQL.fetchone('SELECT name FROM my_database', callback=sql_callback)
if text == '!fetchall':
# Let's pass some extra things...
data_pack = {'text': text}
# Fetches one name
TSQL.fetchall('SELECT name FROM my_database', callback=sql_callback_2, data_pack=data_pack)
if text == '!info':
# Fetches one name
TSQL.execute("INSERT INTO my_database (name) VALUES('John')", callback=sql_callback_3, get_info=True)
Output !fetchall
=> You wrote: !fetchall
=> Name: <name >
=> Name: <name >
=> (...)
Output !fetchone
=> Name: John
Output !info
=> Query: INSERT INTO stats (name) VALUES('John')
=> Time: 0.014952421188354492 seconds
=> Prioritized: False
Stress test
Set the variable use_threaded_mysql = 1 or 0 to see difference.
Syntax: Select all
from messages import SayText2
from threaded_mysql import ThreadedMySQL
import pymysql.cursors
# ON = No lag | OFF = Server freeze
use_threaded_mysql = 1
connection = pymysql.connect(host="localhost",
user="root",
password="123",
db="my_database",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()
def load():
# Put use_threaded_mysql = 1 to test the difference
if not use_threaded_mysql:
# Executes the query 1000 times
for x in range(1000):
cursor.execute('SELECT name FROM my_database')
data = cursor.fetchone()
# Prints it out (not necessary tho)
SayText2('Name: {}'.format(data['name'])).send()
else:
# Class
TSQL = ThreadedMySQL()
# Use the connection already created
TSQL.connect_use(connection)
# Starts the queuehandler
TSQL.handlequeue_start()
for x in range(1000):
TSQL.fetchone('SELECT name FROM my_database', callback=test)
def test(data):
SayText2('Name: {}'.format(data['name'])).send()
Download
Latest release and notes are on my github
https://github.com/Velocity-plus/threaded_mysql
You can even create tick listener and spam queries without any lag at all
Enjoy :)