# Time
import time
# Color
from colors import GREEN
# Commands
from commands.say import SayCommand
from commands.client import ClientCommand
# Contexlib
from contextlib import contextmanager
# Cvar
from cvars import ConVar
# Engine
from engines.gamerules import find_game_rules
from engines.server import global_vars
# Entity
from entities.entity import BaseEntity
# Event
from events import Event
# Filter
from filters.players import PlayerIter
# Listeners
from listeners import ListenerManager
from listeners import ListenerManagerDecorator
from listeners import OnClientActive, OnPlayerRunCommand, OnLevelInit
from listeners.tick import Repeat
# Messages
from messages import HudMsg, SayText2
# Path
from paths import PLUGIN_DATA_PATH, GAME_PATH
# Player
from players.constants import PlayerButtons, HitGroup
from players.entity import Player
from players.helpers import index_from_userid, userid_from_index
# Que
from queue import Empty, Queue
# Thread
from threading import Thread
# SQL Alchemy
from sqlalchemy import Column, Integer, String, Index, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
#############################
# Globals
#############################
STATS_DATA_PATH = PLUGIN_DATA_PATH / 'stats'
if not STATS_DATA_PATH.exists():
STATS_DATA_PATH.makedirs()
CORE_DB_PATH = STATS_DATA_PATH / 'players.db'
CORE_DB_REL_PATH = CORE_DB_PATH.relpath(GAME_PATH.parent)
player_loaded = {}
output = Queue()
statsplayers = {}
player_session = {}
stats_screen = {}
stats_active = {}
stats_rank = {}
mp_timelimit = ConVar('mp_timelimit')
stats_button = PlayerButtons.SCORE
npc_list = [
'npc_zombie',
'npc_manhack',
'npc_headcrab',
'npc_antilion',
'npc_antilionguard',
'npc_clawscanner',
'npc_combinedropship',
'npc_combinegunship',
'npc_crow',
'combine_mine',
'npc_headcrab_black',
'npc_headcrab_fast',
'npc_helicopter',
'npc_hunter',
'npc_ichthyosaur',
'npc_ministriper',
'npc_missildefense',
'npc_mortarsynth',
'npc_pigeon',
'npc_poisonzombie',
'npc_rollermine',
'npc_sniper',
'npc_stalker',
'npc_strider',
'npc_turret_ceiling',
'npc_turret_floor',
'npc_turret_ground',
'npc_vortigaunt',
'npc_zombie_torso',
'npc_zombine'
]
@contextmanager
def session_scope():
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
SESSION_HUD_MESSAGE = HudMsg(message='', x=0.01, y=-0.60, color1=GREEN, color2=GREEN, effect=0, fade_in=0.05, fade_out=0.1, hold_time=0.5, fx_time=1.0, channel=0)
GLOBAL_HUD_MESSAGE = HudMsg(message='', x=0.85, y=-0.60, color1=GREEN, color2=GREEN, effect=0, fade_in=0.05, fade_out=0.1, hold_time=0.5, fx_time=1.0, channel=1)
TIME_HUD_MESSAGE = HudMsg(message='', x=-1, y=0.07, color1=GREEN, color2=GREEN, effect=0, fade_in=0.05, ade_out=0.1, hold_time=0.5, fx_time=1.0, channel=2)
TIME_LEFT_HUD_MESSAGE = HudMsg(message='', x=-1, y=0.90, color1=GREEN, color2=GREEN, effect=0, fade_in=0.05, fade_out=0.1, hold_time=0.5, fx_time=1.0, channel=3)
EMPTY_HUD_MESSAGE = HudMsg(message='', x=0.01, y=-0.88, color1=GREEN, color2=GREEN, effect=0, fade_in=0.05, fade_out=0.5, hold_time=0.5, fx_time=1.0, channel=1)
#############################
# Timeleft
#############################
def get_map_remaining_time():
timelimit = mp_timelimit.get_int()
if timelimit < 1:
return -1
start = find_game_rules().get_property_float('cs_gamerules_data.m_flGameStartTime')
timeleft = (start + timelimit * 60) - global_vars.current_time
if timeleft < 0.0:
return 0
return timeleft
# =============================================================================
# >> DATABASE
# =============================================================================
Base = declarative_base()
engine = create_engine(f'sqlite:///{CORE_DB_REL_PATH}')
class Players(Base):
__tablename__ = 'Players'
UserID = Column(Integer,nullable=False,primary_key=True)
steamid = Column(String(30),nullable=False)
name = Column(String(30),nullable=False)
kills = Column(Integer,default=0)
deaths = Column(Integer,default=0)
headshots = Column(Integer,default=0)
suicides = Column(Integer,default=0)
killstreak = Column(Integer,default=0)
distance = Column(Float,default=0.0)
npc_kills = Column(Integer,default=0)
Index('playersIndex', steamid)
if not engine.dialect.has_table(engine, 'Players'):
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
# =============================================================================
# >> LOAD
# =============================================================================
def load():
for player in PlayerIter():
userid = player.userid
statsplayers[userid] = StatsPlayer(userid)
init_player_session(userid)
_load_ranks()
show_stats_repeat.start(0.1)
repeat.start(0.1)
def _load_ranks():
with session_scope() as session:
query = session.query(Players).all()
if query != None:
for (user) in query:
steamid = user.steamid
stats_rank[steamid] = {}
stats_rank[steamid]['name'] = user.name
stats_rank[steamid]['kills'] = user.kills
stats_rank[steamid]['deaths'] = user.deaths
stats_rank[steamid]['points'] = user.kills-user.deaths
# =============================================================================
# >> HELPERS
# =============================================================================
@Repeat
def repeat():
try:
callback = output.get_nowait()
except Empty:
pass
else:
callback()
@Repeat
def show_stats_repeat():
for player in PlayerIter('human'):
userid = player.userid
index = player.index
player_kills = player_session[userid]["kills"]
player_deaths = player_session[userid]["deaths"]
player_suicides = player_session[userid]["suicides"]
player_headshots = player_session[userid]["headshots"]
player_highest_killstreaks = player_session[userid]["highest_killstreak"]
player_npc_kills = player_session[userid]["npc_kills"]
stats_player_kills = statsplayers[userid].kills
stats_player_deaths = statsplayers[userid].deaths
if stats_screen[userid] == True:
session_message = f'Session Stats:\n\nPoints: {player_kills-player_deaths}\nKills: {player_kills}\nDeaths: {player_deaths}\nSuicides: {player_suicides}\nHeadshots: {player_headshots}\nKDR: {calc_session_kdr(player_kills, player_deaths)}\nKillstreak: {player_highest_killstreaks}\nNPC Kills: {player_npc_kills}'
SESSION_HUD_MESSAGE.message = session_message
SESSION_HUD_MESSAGE.send(index)
global_message = f'Global Stats:\n\nPoints: {stats_player_kills - stats_player_deaths}\nKills: {stats_player_kills}\nDeaths: {stats_player_deaths}\nSuicides: {statsplayers[userid].suicides}\nHeadshots: {statsplayers[userid].headshots}\nKDR: {statsplayers[userid].calc_kdr(stats_player_kills, stats_player_deaths)}\nKillstreak: {statsplayers[userid].killstreak}\nNPC Kills: {statsplayers[userid].npc_kills}'
GLOBAL_HUD_MESSAGE.message = global_message
GLOBAL_HUD_MESSAGE.send(index)
time_message = time.strftime('%a, %m.%y. %H:%M:%S',time.localtime(time.time()))
TIME_HUD_MESSAGE.message = time_message
TIME_HUD_MESSAGE.send(index)
timeleft = get_map_remaining_time()
minutes, seconds = divmod(timeleft, 60)
timeleft_message = "Timeleft: %.0f minutes and %.0f seconds remaining." % (minutes,seconds)
TIME_LEFT_HUD_MESSAGE.message = timeleft_message
TIME_LEFT_HUD_MESSAGE.send(index)
elif stats_active[userid] == True:
for i in range(4):
EMPTY_HUD_MESSAGE.channel = i
EMPTY_HUD_MESSAGE.send(index)
stats_active[userid] = False
def exists(userid):
try:
index_from_userid(userid)
except ValueError:
return False
return True
def init_player_session(userid):
if userid not in player_session:
player_session[userid] = {}
player_session[userid]["kills"] = 0
player_session[userid]["deaths"] = 0
player_session[userid]["suicides"] = 0
player_session[userid]["killstreak"] = 0
player_session[userid]["highest_killstreak"] = 0
player_session[userid]["headshots"] = 0
player_session[userid]["npc_kills"] = 0
def calc_session_kdr(kills,deaths):
if kills == 0:
kills = 1
if deaths == 0:
deaths = 1
return ("%.2f" % (kills/deaths))
def get_player_rank_position(index):
player = Player(index)
rank_list = stats_rank.values()
rank_list = sorted(stats_rank, key=lambda x: stats_rank[x]['points'],reverse=True)
i = 0
rank = 0
for x in rank_list:
i+=1
if player.uniqueid == x:
rank = i
break
return rank
# =============================================================================
# >> PLAYER CLASS
# =============================================================================
class StatsPlayer(object):
def __init__(self,userid):
self.userid = int(userid)
self.player_entity = Player.from_userid(self.userid)
self.index = self.player_entity.index
self.steamid = self.player_entity.uniqueid
self.name = self.remove_warnings(self.player_entity.name)
#Dict to check for load status
player_loaded[self.userid] = False
stats_screen[self.userid] = False
stats_active[self.userid] = False
#Player data
self.UserID = -1
self.points = 0
self.kills = 0
self.deaths = 0
self.headshots = 0
self.suicides = 0
self.kdr = 0.0
self.killstreak = 0
self.distance = 0.0
self.npc_kills = 0
Thread(target=self._load_from_database).start()
def _load_from_database(self):
with session_scope() as session:
player = session.query(Players).filter(Players.steamid==self.steamid).one_or_none()
if player is None:
player = Players(steamid=self.steamid,name=self.name)
session.add(player)
session.commit()
self.UserID = player.UserID
self.kills = player.kills
self.deaths = player.deaths
self.headshots = player.headshots
self.suicides = player.suicides
self.killstreak = player.killstreak
self.distance = player.distance
self.points = self.kills-self.deaths
self.kdr = self.calc_kdr(self.kills,self.deaths)
self.npc_kills = player.npc_kills
_load_ranks()
output.put(self._on_finish)
def _on_finish(self):
if exists(self.userid):
OnPlayerLoaded.manager.notify(self)
def save(self):
if exists(self.userid):
Thread(target=self._save_player_to_database).start()
def _save_player_to_database(self):
with session_scope() as session:
player = session.query(Players).filter(Players.UserID==self.UserID).one_or_none()
player.steamid = self.steamid
player.name = self.name
player.kills = self.kills
player.deaths = self.deaths
player.headshots = self.headshots
player.suicides = self.suicides
player.killstreak = self.killstreak
player.distance = self.distance
player.npc_kills = self.npc_kills
session.commit()
output.put(self._on_player_saved)
def _on_player_saved(self):
if exists(self.userid):
OnPlayerSaved.manager.notify(self)
def remove_warnings(self, value):
return str(value).replace("'", "").replace('"', '')
def calc_kdr(self,kills,deaths):
if kills == 0: kills = 1
if deaths == 0: deaths = 1
return ("%.2f" % (kills/deaths))
for player in PlayerIter('all'):
userid = player.userid
statsplayers[userid] = StatsPlayer(userid)
init_player_session(userid)
# =============================================================================
# >> LISTENERS
# =============================================================================
class OnPlayerSaved(ListenerManagerDecorator):
manager = ListenerManager()
class OnPlayerLoaded(ListenerManagerDecorator):
manager = ListenerManager()
@OnPlayerLoaded
def on_loaded(statsplayer):
player_loaded[statsplayer.userid] = True
@OnPlayerRunCommand
def _on_player_run_command(player, usercmd):
if player.is_bot():
return
userid = player.userid
if usercmd.buttons & stats_button:
stats_screen[userid] = True
stats_active[userid] = True
else:
stats_screen[userid] = False
@OnLevelInit
def level_init(map_name=None):
for player in PlayerIter('all'):
statsplayers[player.userid].save()
@OnClientActive
def on_client_active(index):
userid = userid_from_index(index)
statsplayers[userid] = StatsPlayer(userid)
init_player_session(userid)
# =============================================================================
# >> EVENTS
# =============================================================================
@Event('player_disconnect')
def player_disconnect(ev):
userid = ev.get_int('userid')
player_entity = Player(index_from_userid(userid))
statsplayers[userid].save()
if userid in statsplayers:
statsplayers[userid].name = statsplayers[userid].remove_warnings(player_entity.name)
statsplayers[userid].save()
@Event('player_activate')
def player_activated(args):
userid = args.get_int('userid')
player = Player.from_userid(userid)
rank_list = stats_rank.values()
rank_list = sorted(stats_rank, key=lambda x: stats_rank[x]['points'],reverse=True)
SayText2(f'\x04[Stats] {player.name} \x01has joined the server with \x04{get_player_rank_position(player.index)} \x01of \x04{len(rank_list)} \x01rank').send()
@Event('player_hurt')
def player_hurt(ev):
victim = Player.from_userid(ev['userid'])
attacker = ev.get_int('attacker')
if attacker > 0:
if victim.hitgroup == HitGroup.HEAD:
player_session[attacker]["headshots"] += 1
statsplayers[attacker].headshots += 1
@Event('player_death')
def player_death(ev):
victim_userid = ev['userid']
attacker_userid = ev['attacker']
victim = Player.from_userid(victim_userid)
try:
attacker = Player.from_userid(attacker_userid)
except ValueError:
return
if victim_userid == attacker_userid:
statsplayers[victim_userid].suicides += 1
player_session[victim_userid]["suicides"] += 1
statsplayers[victim_userid].deaths += 1
player_session[victim_userid]["deaths"] += 1
player_session[victim_userid]["killstreak"] = 0
stats_rank[victim.uniqueid]['points'] = statsplayers[victim_userid].kills-statsplayers[victim_userid].deaths
else:
statsplayers[attacker_userid].kills += 1
player_session[attacker_userid]["kills"] += 1
player_session[attacker_userid]["killstreak"] += 1
if player_session[attacker_userid]["killstreak"] > statsplayers[attacker_userid].killstreak:
statsplayers[attacker_userid].killstreak = player_session[attacker_userid]["killstreak"]
if player_session[attacker_userid]["killstreak"] > player_session[attacker_userid]["highest_killstreak"]:
player_session[attacker_userid]["highest_killstreak"] = player_session[attacker_userid]["killstreak"]
statsplayers[victim_userid].deaths += 1
player_session[victim_userid]["deaths"] += 1
if player_session[victim_userid]["killstreak"] > player_session[victim_userid]["highest_killstreak"]:
player_session[victim_userid]["highest_killstreak"] = player_session[victim_userid]["killstreak"]
player_session[victim_userid]["killstreak"] = 0
stats_rank[attacker.uniqueid]['points'] = statsplayers[attacker_userid].kills-statsplayers[attacker_userid].deaths
stats_rank[victim.uniqueid]['points'] = statsplayers[victim_userid].kills-statsplayers[victim_userid].deaths
@Event('entity_killed')
def npc_killed(event):
classname = BaseEntity(event['entindex_killed']).classname
if classname in npc_list:
player = Player(event['entindex_attacker'])
player_session[player.userid]["npc_kills"] += 1
statsplayers[player.userid].npc_kills += 1
# =============================================================================
# >> CLIENTCOMMANDS
# =============================================================================
@ClientCommand('rank')
@SayCommand('rank')
def rank_command(command, index, team_only=False):
player = Player(index)
rank_list = stats_rank.values()
rank_list = sorted(stats_rank, key=lambda x: stats_rank[x]['points'],reverse=True)
SayText2(f"\x04[Stats]\x03 Your rank is \x04{get_player_rank_position(index)} \x03of \x04{len(rank_list)} \x03with \x04{stats_rank[player.uniqueid]['points']} \x03points.").send(index)