import polars as pl from data import data_df from types import SimpleNamespace from convert import verify_and_return_presult valid_pitch = pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0) swing = (pl.col('swing').sum() / pl.col('pitch').sum()).alias('Swing%') z_swing = ((pl.col('swing') & pl.col('zone')).sum() / pl.col('zone').sum()).alias('Z-Swing%') chase = ((pl.col('swing') & ~pl.col('zone')).sum() / (~pl.col('zone')).sum()).alias('Chase%') contact = ((pl.col('swing') & ~pl.col('whiff')).sum()/pl.col('swing').sum()).alias('Contact%') z_con = ((pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(pl.col('zone') & pl.col('swing')).sum()).alias('Z-Contact%') o_con = ((~pl.col('zone') & pl.col('swing') & ~pl.col('whiff')).sum()/(~pl.col('zone') & pl.col('swing')).sum()).alias('O-Contact%') whiff = (pl.col('whiff').sum() / pl.col('swing').sum()).alias('Whiff%') swstr = (pl.col('whiff').sum() / pl.col('pitch').sum()).alias('SwStr%') csw = (pl.col('csw').sum() / pl.col('pitch').sum()).alias('CSW%') is_ball = pl.col('presult').is_in(verify_and_return_presult(['Ball', 'Walk'])) is_non_ball = pl.col('pitch') & ~is_ball # pitches that are not balls i.e. no catcher interference, etc. ball = (is_ball.sum() / pl.col('pitch').sum()).alias('Ball%') strike = (is_non_ball.sum() / pl.col('pitch').sum()).alias('Strike%') is_two_str = pl.col('before_s') == 2 # named this way in case I use two_str for 2-Str% first_count = (pl.col('before_s') == 0) & (pl.col('before_b') == 0) f_strike = ((is_non_ball & first_count).sum() / first_count.sum()).alias('F-Str%') par = (((is_two_str & pl.col('presult').str.contains('strikeout')).sum()) / is_two_str.sum()).alias('PAR%') behind = (((pl.col('before_b') > pl.col('before_s')) & (pl.col('before_s') < 2) & (pl.col('before_b') > 1)).sum() / pl.len()).alias('Behind%') zone = (pl.col('zone').sum() / pl.col('pitch').sum()).alias('Zone%') glove = (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') < 0).otherwise(pl.col('x') > 0)).mean().alias('Glove%') arm = (pl.when(pl.col('pitLR') == 'r').then(pl.col('x') >= 0).otherwise(pl.col('x') <= 0)).mean().alias('Arm%') high = (pl.col('y') > 125).mean().alias('High%') low = (pl.col('y') <= 125).mean().alias('Low%') mm = (pl.col('x').is_between(-20, 20) & pl.col('y').is_between(100, 100+50)).mean().alias('MM%') obp = ( pl.col('presult').is_in(verify_and_return_presult(['Single', 'Double', 'Triple', 'Home run', 'Walk', 'Inside-the-park home run', 'Hit by pitch'])).sum() / (pl.col('AB').first() + pl.col('presult').is_in(verify_and_return_presult(['Walk', 'Hit by pitch', 'Sacrifice fly'])).sum()) ).round(3).alias('OBP') h = pl.col('presult').is_in(verify_and_return_presult(['Single', 'Double', 'Triple', 'Home run', 'Inside-the-park home run'])).sum().alias('H') bb = pl.col('presult').is_in(verify_and_return_presult(['Walk'])).sum().alias('BB') hbp = pl.col('presult').is_in(verify_and_return_presult(['Hit by pitch'])).sum().alias('HBP') sf = pl.col('presult').is_in(verify_and_return_presult(['Sacrifice fly'])).sum().alias('SF') def filter_data_by_date_and_game_kind(data, start_date=None, end_date=None, game_kind=None): if start_date is not None: data = data.filter(pl.col('date') >= start_date) if end_date is not None: data = data.filter(pl.col('date') <= end_date) if game_kind is not None: data = data.filter(pl.col('coarse_game_kind') == game_kind) return data def compute_team_games(data): data = ( data .with_columns( pl.col('gameId').unique().len().over('HomeTeamNameES').alias('home_games'), pl.col('gameId').unique().len().over('VisitorTeamNameES').alias('visitor_games') ) ) game_data = ( data .group_by('HomeTeamNameES') .first() [['HomeTeamNameES', 'home_games']] .rename({'HomeTeamNameES': 'team'}) .join( ( data .group_by('VisitorTeamNameES') .first() [['VisitorTeamNameES', 'visitor_games']] .rename({'VisitorTeamNameES': 'team'}) ), on='team', how='full' ) .fill_null(0) .with_columns( (pl.col('home_games')+pl.col('visitor_games')).alias('games'), pl.when(pl.col('team').is_null()) .then(pl.col('team_right')) .otherwise(pl.col('team')).alias('team') ) ) return ( data .drop('home_games', 'visitor_games') .join( game_data[['team', 'games']].rename({'games': 'home_games'}), left_on='HomeTeamNameES', right_on='team' ) .join( game_data[['team', 'games']].rename({'games': 'visitor_games'}), left_on='VisitorTeamNameES', right_on='team' ) ) def compute_pitch_stats(data, player_type, pitch_class_type, min_pitches=1, pitcher_lr='both', batter_lr='both', group_by_team=False): assert pitcher_lr in ('both', 'l', 'r') assert batter_lr in ('both', 'l', 'r') assert player_type in ('pitcher', 'batter') assert pitch_class_type in ('general', 'specific') pitching = player_type in ('pitcher', ) if pitcher_lr != 'both': data = data.filter(pl.col('pitLR') == pitcher_lr) if batter_lr != 'both': data = data.filter(pl.col('batLR') == batter_lr) id_cols = ['pitId' if player_type == 'pitcher' else 'batId'] team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short' if group_by_team: id_cols.append(team_col) name_col = 'pitcher_name' if player_type == 'pitcher' else 'batter_name' pitch_col = 'ballKind_code' if pitch_class_type == 'specific' else 'general_ballKind_code' pitch_name_col = 'ballKind' if pitch_class_type == 'specific' else 'general_ballKind' pitch_stats = ( data .with_columns((pl.col('ballSpeed') / 1.609).round(1).alias('mph')) .group_by(*id_cols, pitch_col) .agg( pl.first(name_col), pl.col('pitLR').first().str.to_uppercase().alias('Throws'), *([pl.first('general_ballKind')] if pitch_class_type == 'specific' else []), pl.first(pitch_name_col), pl.len().alias('count'), pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('ballSpeed').mean().alias('Avg KPH'), pl.col('ballSpeed').max().alias('Max KPH'), pl.when(pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0)).then('mph').mean().round(1).alias('Avg MPH'), pl.col('mph').max().alias('Max MPH'), pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True), swing, z_swing, chase, contact, z_con, o_con, whiff, swstr, csw, strike, ball, f_strike, par, zone, glove, arm, high, low, mm, behind ) .with_columns( (pl.col('count')/pl.sum('count').over('pitId')).alias('usage'), (pl.col('count') >= min_pitches).alias('qualified'), ) .explode('batType') .unnest('batType') .pivot(on='batType', values='proportion') .fill_null(0) .with_columns( (pl.col('G') + pl.col('B')).alias('GB%'), (pl.col('F') + pl.col('P')).alias('FB%'), pl.col('L').alias('LD%'), pl.col('P').alias('IFFB%'), pl.col('F').alias('OFFB%'), (pl.col('F') + pl.col('P') + pl.col('L')).alias('AIR%') ) .drop('G', 'F', 'B', 'P', 'L', 'null') .with_columns( (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=((stat in ['FB%', 'LD%', 'OFFB%', 'AIR%', 'Ball%', 'Behind%'] or 'Contact%' in stat)))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl') for stat in ['Avg KPH', 'Max KPH', 'Avg MPH', 'Max MPH', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'Strike%', 'Ball%', 'F-Str%', 'PAR%', 'GB%', 'FB%', 'LD%', 'OFFB%', 'IFFB%', 'AIR%', 'Zone%', 'Behind%'] ) .rename({pitch_col: 'ballKind_code', pitch_name_col: 'ballKind'} if pitch_class_type == 'general' else {}) .sort(id_cols[0], 'count', descending=[False, True]) ) return pitch_stats def compute_player_stats(data, player_type, qual='qualified', pitcher_lr='both', batter_lr='both', group_by_team=False): assert pitcher_lr in ('both', 'l', 'r') assert batter_lr in ('both', 'l', 'r') assert player_type in ('pitcher', 'batter', 'team pitching', 'team batting') # pitching or batting, player or team pitching = player_type in ('pitcher', 'team pitching') team = player_type in ('team pitching', 'team batting') # handedness filters if pitcher_lr != 'both': data = data.filter(pl.col('pitLR') == pitcher_lr) if batter_lr != 'both': data = data.filter(pl.col('batLR') == batter_lr) if pitching: over_col = 'pitId' if not team else 'pitcher_team_name_short' else: over_col = 'batId' if not team else 'batter_team_name_short' data = ( compute_team_games(data) .with_columns( pl.when(pl.col('half_inning').str.ends_with('1')).then('home_games').otherwise('visitor_games').first().over('pitId').alias('games'), # pl.col('inning_code').unique().len().over(over_col).alias('IP'), (pl.col('bso').struct.field('o').cast(pl.Int32) - pl.col('beforeBso').struct.field('o').cast(pl.Int32)).sum().mul(1/3).over(over_col).alias('IP'), pl.col('pa_code').unique().len().over(over_col).alias('PA'), pl.col('presult').is_in(verify_and_return_presult([ 'Single', 'Double', 'Triple', 'Home run', 'Inside-the-park home run', 'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)', 'Foul fly', 'Foul line (?)', 'Error', 'Sacrifice hit error', 'Sacrifice fly error', "Fielder's choice", 'Bunt strikeout', 'Swinging strikeout', 'Looking strikeout' ])).sum().over(over_col).alias('AB'), # pl.col('presult').is_in(verify_and_return_presult([ # 'Groundout', 'Flyout', 'Lineout', 'Groundout (Double play)', # 'Foul fly', 'Foul line (?)', # 'Sacrifice bunt', 'Sacrifice fly', # "Fielder's choice", "Sacrifice fielder's choice", # 'Bunt strikeout', 'Swinging strikeout', 'Looking strikeout' # ])).sum().over('pitId').mul(1/3).alias('IP') ) ) # qualifiers qualified_factor = 1 if pitching else 3.1 qual_col = 'IP' if pitching else 'PA' if qual == 'qualified': data = data.with_columns((pl.col(qual_col) >= qualified_factor * pl.col('games')).alias('qualified')) else: data = data.with_columns((pl.col(qual_col) >= qual).alias('qualified')) # percentile ascending/descending if pitching: stat_descending_pctl = lambda stat: stat in ['BB%', 'Ball%', 'FB%', 'LD%', 'OFFB%', 'AIR%', 'Z-Swing%', 'Behind%', 'OBP'] or 'Contact%' in stat else: stat_descending_pctl = lambda stat: not (stat in ['BB%', 'Ball%', 'FB%', 'LD%', 'OFFB%', 'AIR%', 'Swing%', 'Z-Swing%', 'Behind%', 'OBP'] or 'Contact%' in stat) # col names match player_type: case 'pitcher': id_cols = ['pitId'] name_col = 'pitcher_name' case 'batter': id_cols = ['batId'] name_col = 'batter_name' case _: id_cols = [] name_col = None team_col = 'pitcher_team_name_short' if pitching else 'batter_team_name_short' if group_by_team or team: id_cols.append(team_col) handedness_col = 'pitLR' if pitching else 'batLR' new_handedness_col = 'Throws' if pitching else 'Bats' player_stats = ( data .with_columns(pl.when(pl.col('general_ballKind_code').is_in(['4S', 'FC', 'SI'])).then(pl.when(valid_pitch).then('ballSpeed').mean().over(over_col, 'general_ballKind_code')).mul(1/1.609).round(1).alias('FB Velo')) .group_by(id_cols) .agg( *([pl.col(name_col).first()] if not team else []), *([] if group_by_team or team else [pl.col(team_col).last()]), *( [pl.col(handedness_col).first().str.to_uppercase().alias(new_handedness_col) ] if not (team and ((pitcher_lr == 'both') if pitching else (batter_lr == 'both'))) else [] ), pl.col('IP').first(), pl.col('PA').first(), pl.col('FB Velo').max(), (pl.when(pl.col('presult').str.contains('strikeout')).then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('K%'), (pl.when(pl.col('presult') == 'Walk').then(1).otherwise(0).sum() / pl.col('pa_code').unique().len()).alias('BB%'), pl.col('aux_bresult').struct.field('batType').drop_nulls().value_counts(normalize=True), swing, z_swing, chase, contact, z_con, o_con, whiff, swstr, csw, strike, ball, f_strike, par, zone, glove, arm, high, low, mm, behind, pl.col('AB').first(), h, bb, hbp, sf, obp, pl.first('qualified') ) .explode('batType') .unnest('batType') .pivot(on='batType', values='proportion') .fill_null(0) .with_columns( (pl.col('G') + pl.col('B')).alias('GB%'), (pl.col('F') + pl.col('P')).alias('FB%'), pl.col('L').alias('LD%'), pl.col('P').alias('IFFB%'), pl.col('F').alias('OFFB%'), (pl.col('F') + pl.col('P') + pl.col('L')).alias('AIR%') ) .drop('G', 'F', 'B', 'P', 'L') .with_columns( (pl.when(pl.col('qualified')).then(pl.col(stat)).rank(descending=stat_descending_pctl(stat))/pl.when(pl.col('qualified')).then(pl.col(stat)).count()).alias(f'{stat}_pctl') for stat in ['FB Velo', 'K%', 'BB%', 'Swing%', 'Z-Swing%', 'Chase%', 'Contact%', 'Z-Contact%', 'O-Contact%', 'SwStr%', 'Whiff%', 'CSW%', 'Strike%', 'Ball%', 'F-Str%', 'PAR%', 'GB%', 'FB%', 'LD%', 'OFFB%', 'IFFB%', 'AIR%', 'Zone%', 'Behind%', 'OBP'] ) .sort(qual_col, descending=True) ) return player_stats def get_pitcher_stats(id, lr='both', game_kind=None, start_date=None, end_date=None, min_ip=1, min_pitches=1, pitch_class_type='specific'): source_data = data_df source_data = filter_data_by_date_and_game_kind(source_data, start_date=start_date, end_date=end_date, game_kind=game_kind) # if lr is not None: # source_data = pitch_stats = compute_pitch_stats(source_data, player_type='pitcher', pitch_class_type=pitch_class_type, min_pitches=min_pitches, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id) pitch_shapes = ( (source_data.filter(pl.col('batLR') == lr) if lr != 'both' else source_data) .filter( (pl.col('pitId') == id) & pl.col('x').is_not_null() & pl.col('y').is_not_null() & (pl.col('ballSpeed') > 0) ) [['pitId', 'general_ballKind_code', 'ballKind_code', 'ballSpeed', 'x', 'y']] .with_columns((pl.col('ballSpeed')/1.609).alias('ballSpeed_mph')) ) pitcher_stats = compute_player_stats(source_data, player_type='pitcher', qual=min_ip, batter_lr=lr, group_by_team=False).filter(pl.col('pitId') == id) return SimpleNamespace(pitcher_stats=pitcher_stats, pitch_stats=pitch_stats, pitch_shapes=pitch_shapes)