Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| from datetime import datetime, timedelta | |
| from typing import Union | |
| import pathlib | |
| import smtplib | |
| from email.message import EmailMessage | |
| import streamlit as st | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy import select | |
| from models import Employee, Job , JobsApplied,User, ENGINE | |
| from llm import parse, rerank | |
| st.markdown("""- | |
| <style> | |
| .stButton > button{ | |
| border: black; | |
| box-shadow: 0 12px 16px 0 rgba(0,0,0,0.24), 0 17px 50px 0 rgba(0,0,0,0.19); | |
| } | |
| </style>""",unsafe_allow_html=True) | |
| def clear_cache(): | |
| for k in st.session_state: | |
| del st.session_state[k] | |
| def login() -> Union[Employee,None]: | |
| login_container = st.empty() | |
| with login_container.container(): | |
| employee_id = st.text_input(label=":email: User ID", placeholder = "User ID").upper().strip() | |
| password = st.text_input(label = ":lock: Password", placeholder = "Password", type = "password") | |
| button = st.button('Login', type = "secondary") | |
| if button: | |
| if employee_id and password: | |
| with Session(ENGINE) as session: | |
| stmt = select(Employee).where(Employee.employee_id == employee_id).\ | |
| where(Employee.password == password) | |
| employee = session.scalars(stmt).one_or_none() | |
| print(employee) | |
| if employee is None: | |
| st.error('Invalid UserID/password') | |
| return | |
| st.session_state['employee_logged'] = True | |
| st.session_state['employee'] = employee | |
| login_container.empty() | |
| return employee | |
| else: | |
| st.error('Empty UserID/Password') | |
| def add_to_db(**job) -> None: | |
| job_id = ''.join(chr(random.randint(65,90)) for i in range(5))+f'{random.randint(100,1000)}' | |
| job['job_id'] = job_id | |
| job['employee_id'] = st.session_state["employee"].employee_id | |
| job['created_at'] = datetime.now().strftime("%d-%m-%Y %H:%M:%S") | |
| job['primary_skills'] = ', '.join(job['primary_skills']) | |
| job['secondary_skills'] = ', '.join(job['secondary_skills']) | |
| with Session(ENGINE) as session: | |
| job_object = Job(**job) | |
| session.add_all([job_object]) | |
| session.commit() | |
| st.success(f"Successfully posted job {job['post_name']}!") | |
| def add_job() -> None: | |
| post_name = st.text_input(label = 'Post Name').strip().title() | |
| description = st.text_area(label = 'Job Description').strip() | |
| responsibilities = st.text_area(label = "Responsibility").strip() | |
| vacancies = st.number_input(label="Number Vacancies",min_value=1) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| min_experience = st.number_input(label = "Minimum Experience",min_value = 0, ) | |
| with col2: | |
| max_experience = st.number_input(label = "Maximum Experience",min_value = 1, ) | |
| number_of_primary_skills = st.number_input(label = "How many primary skills?",min_value = 1, ) | |
| primary_skills = [st.text_input(label = f'Primary Skill {i}',key=f'Primary Skill {i}').strip() for i in range(1,number_of_primary_skills+1)] | |
| number_of_secondary_skills = st.number_input(label = "How many secondary skills?",min_value = 0, ) | |
| secondary_skills = [st.text_input(label = f'Secondary Skill {i}',key=f'Secondary Skill {i}').strip() for i in range(1,number_of_secondary_skills+1)] | |
| st.markdown("Expires at") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| expires_date = st.date_input(label = "expries date",value =datetime.now() + timedelta(days = 1), min_value = datetime.now() + timedelta(days = 1), max_value=None, label_visibility="collapsed") | |
| print(expires_date ) | |
| with col2: | |
| expires_time = st.time_input(label = "Time Input", label_visibility="collapsed") | |
| print(expires_time) | |
| expires_at = f'{expires_date.strftime("%d-%m-%Y")} {expires_time.strftime("%H:%M:%S")}' | |
| st.markdown("<p style='color: red'>⚠️ Check before you submit once submitted you cannot make changes</p>", unsafe_allow_html=True) | |
| if st.button("Post"): | |
| if (post_name and description and responsibilities and primary_skills): | |
| if min_experience <= max_experience: | |
| job = { | |
| "post_name" : post_name, 'description': description,'min_experience': min_experience,'max_experience': max_experience, | |
| 'primary_skills': primary_skills, 'secondary_skills': secondary_skills,'responsibilities':responsibilities, | |
| 'expires_at': expires_at, 'vacancies': vacancies | |
| } | |
| add_to_db(**job) | |
| return True | |
| st.error("Minimum Experience must be less than maximum experience") | |
| return | |
| st.error("Post Name/Description/Responsibility/Primary SKills are required") | |
| def post_job(): | |
| st.session_state['employee'] | |
| st.session_state['employee_logged'] | |
| if st.session_state['employee_logged']: | |
| col1, col2 = st.columns([0.001, 0.01]) | |
| with col2: | |
| st.markdown(f'Would you like to include a job vacancy listing. {st.session_state["employee"].full_name}?') | |
| with col1: | |
| if st.button(':heavy_plus_sign:'): | |
| st.session_state['add_job'] = True | |
| if st.session_state['add_job']: | |
| add_job_container = st.empty() | |
| with add_job_container.container(): | |
| is_job_posted = add_job() | |
| if is_job_posted: | |
| add_job_container.empty() | |
| def get_jobs_posted() -> None: | |
| with Session(ENGINE) as session: | |
| stmt = select(Job).where(Job.employee_id == st.session_state["employee"].employee_id) | |
| jobs = session.scalars(stmt).all() | |
| print(jobs) | |
| for job in jobs: | |
| with st.expander(f"{job.job_id}: {job.post_name} posted at: {job.created_at}"): | |
| pass | |
| def get_users_who_applied_for_jobs(): | |
| #Get Jobs Posted | |
| with Session(ENGINE) as session: | |
| stmt = select(Job).where(Job.employee_id == st.session_state["employee"].employee_id) | |
| jobs = session.scalars(stmt).all() | |
| if not jobs: | |
| st.info("No jobs posted!") | |
| return | |
| job_id = st.selectbox("Filter by job_id",options=[job.job_id for job in jobs]) | |
| with Session(ENGINE) as session: | |
| stmt = select(JobsApplied).where(JobsApplied.job_id == job_id) | |
| users_applied_for_jobs = session.scalars(stmt).all() | |
| if not users_applied_for_jobs: | |
| st.info('No users have applied for job') | |
| return False | |
| re_rank_resumes_container = st.empty() | |
| with re_rank_resumes_container.container(): | |
| st.markdown("""<p style="font-family:Garamond;text-indent: 45vh;font-size:25px">Jobs Application Details</p>""",unsafe_allow_html=True) | |
| col1, col2 = st.columns([0.001, 0.01]) | |
| with col1: | |
| round_number = users_applied_for_jobs[0].round_number | |
| st.markdown(f"""<p style="font-size:20px"><span style = "font-family:Garamond;">Round</span><span style="font-family:monospace;padding: 10px 20px">{round_number}</span></p>""", unsafe_allow_html=True) | |
| with col2: | |
| num_docs = st.number_input(label="Top Resumes",min_value=1, max_value=len(users_applied_for_jobs)) | |
| for jobs_application in users_applied_for_jobs: | |
| st.markdown(f"""<p style="border: 2px solid black;padding: 10px 20px; box-shadow: 10px 10px 5px #aaaaaa"><span style="font-family:Garamond;">User Email ID</span>: <span style="font-family:monospace">{jobs_application.email_id}</span><br><span style="font-family:Garamond">Current Rank: <span style="font-family:monospace">{jobs_application.rank}</span></p>""",unsafe_allow_html=True) | |
| col1,col2, col3 = st.columns(3) | |
| with col2: | |
| if st.button("Rank Resumes"): | |
| st.session_state['rank_resumes'] = True | |
| #Ranking Resumes | |
| if st.session_state['rank_resumes']: | |
| re_rank_resumes_container.empty() | |
| docs = get_docs(users_applied_for_jobs,job_id) | |
| re_ranked = rerank(job_id,docs,num_docs) | |
| candidates = [r.document['text'].split('-'*50)[0].strip() for r in re_ranked.results] | |
| st.markdown(f"<p style='font-family:Garamond;text-indent: 45vh;font-size:25px'>Resumes re-ranked:</p>",unsafe_allow_html=True) | |
| for candidate in candidates: | |
| st.markdown(f"""<p style="border: 2px solid black;padding: 10px 20px; box-shadow: 10px 10px 5px #aaaaaa"><span style="font-family:Garamond;">User Email ID</span>: <span style="font-family:monospace">{candidate}</span></p>""",unsafe_allow_html=True) | |
| col1,col2, col3 = st.columns(3) | |
| with col2: | |
| if st.button("Send Emails"): | |
| st.session_state['send_resumes'] = True | |
| if st.session_state['send_resumes']: | |
| add_job_aplication_details(job_id,round_number,candidates) | |
| def add_job_aplication_details(job_id,round_number,candidates): | |
| def update(job_application): | |
| print('----------------------------------------4') | |
| job_application.round_number += 1 | |
| job_application.rank = job_application.round_number | |
| return job_application | |
| with Session(ENGINE) as session: | |
| stmt = select(JobsApplied).where(JobsApplied.job_id == job_id).where(JobsApplied.email_id.not_in(candidates)) | |
| jobs = session.scalars(stmt).all() | |
| for job in jobs: | |
| session.delete(job) | |
| session.commit() | |
| print('deleted') | |
| with Session(ENGINE) as session: | |
| stmt = select(JobsApplied).where(JobsApplied.job_id == job_id).where(JobsApplied.email_id.in_(candidates)) | |
| jobs = session.scalars(stmt).all() | |
| for job in jobs: | |
| update(job) | |
| session.commit() | |
| print('updated') | |
| sending_emails(job_id, candidates) | |
| def sending_emails(job_id, candidates): | |
| email_message = EmailMessage() | |
| email_message['From'] = st.session_state['employee'].email_id | |
| email_message['To'] = candidates | |
| email_message['Subject'] = f'You are selected for Job ID: {job_id}.' | |
| email_message.set_content('Hello,\nYou are selected to attend the interview. Please login to your profile and attend.') | |
| with smtplib.SMTP_SSL('smtp.gmail.com',465) as smtp: | |
| # st.write(os.environ['EMAIL']) | |
| smtp.login(st.session_state['employee'].email_id, os.environ['EMAIL']) | |
| smtp.send_message(email_message) | |
| st.success(f'⚠️Email Sent SUccessfully') | |
| def get_docs(jobs_applications,job_id): | |
| docs = [] | |
| print(f'resumes/{job_id}') | |
| resumes_dir = pathlib.Path(f'resumes/{job_id}') | |
| for resume in resumes_dir.iterdir(): | |
| print(resume) | |
| parsed_text = parse(resume) | |
| print(parsed_text) | |
| print('---------------------------------------------------------------------------') | |
| email_id = resume.stem.split('-')[0] | |
| text = f"{email_id}"+'-'*50+parsed_text | |
| docs.append(text) | |
| return docs | |
| print('Before settings') | |
| # Setting Session states | |
| if 'employee_logged' not in st.session_state: | |
| st.session_state['employee_logged'] = False | |
| if 'employee' not in st.session_state: | |
| st.session_state['employee'] = None | |
| if 'add_job' not in st.session_state: | |
| st.session_state['add_job'] = False | |
| if 'add_new_skill' not in st.session_state: | |
| st.session_state['add_new_skill'] = False | |
| if 'rank_resumes' not in st.session_state: | |
| st.session_state['rank_resumes'] = False | |
| if 'rank_resumes' not in st.session_state: | |
| st.session_state['rank_resumes'] = False | |
| if 'send_resumes' not in st.session_state: | |
| st.session_state['send_resumes'] = False | |
| print('script start') | |
| #Login | |
| if not st.session_state['employee_logged']: | |
| employee = login() | |
| if st.session_state['employee_logged']: | |
| col1, col2 = st.columns(2) | |
| with col2: | |
| st.button('Log Out',on_click=clear_cache) | |
| tab1, tab2, tab3 = st.tabs(["Post Jobs","Jobs Posted","Get Job Applied Details"]) | |
| with tab1: | |
| post_job() | |
| with tab2: | |
| get_jobs_posted() | |
| with tab3: | |
| get_users_who_applied_for_jobs() | |