import grpc from concurrent import futures import time import threading import hr_pb2 import hr_pb2_grpc # ========== IAM Service ========== class IAMService(hr_pb2_grpc.IAMServiceServicer): def GetUser(self, request, context): print(f"[IAM] GetUser called with id={request.id}") return hr_pb2.UserResponse(id=request.id, name="Alice", role="Engineer") def Login(self, request, context): print(f"[IAM] Login attempt: {request.username}") if request.username == "admin" and request.password == "1234": return hr_pb2.LoginResponse(success=True, message="Login successful") else: return hr_pb2.LoginResponse(success=False, message="Invalid credentials") # ========== Personality Test Service ========== class PersonalityTestService(hr_pb2_grpc.PersonalityTestServiceServicer): def __init__(self, channel): self.iam_stub = hr_pb2_grpc.IAMServiceStub(channel) def GetTestResult(self, request, context): print(f"[PersonalityTest] GetTestResult for user {request.user_id}") # Call IAM service to verify user exists try: user = self.iam_stub.GetUser(hr_pb2.UserRequest(id=request.user_id)) print(f"[PersonalityTest] User verified: {user.name}") except grpc.RpcError as e: print(f"[PersonalityTest] Failed to verify user: {e}") return hr_pb2.TestResponse(user_id=request.user_id, result="INTJ - The Architect") # ========== Interview Service ========== class InterviewService(hr_pb2_grpc.InterviewServiceServicer): def __init__(self, channel): self.data = {} self.iam_stub = hr_pb2_grpc.IAMServiceStub(channel) self.workspace_stub = hr_pb2_grpc.WorkspaceServiceStub(channel) def ScheduleInterview(self, request, context): print(f"[Interview] Scheduling interview for user {request.user_id} on {request.date}") # Call IAM to get user info try: user = self.iam_stub.GetUser(hr_pb2.UserRequest(id=request.user_id)) print(f"[Interview] Scheduling for: {user.name} ({user.role})") except grpc.RpcError as e: print(f"[Interview] Could not get user info: {e}") # Call Workspace to get user profile try: profile = self.workspace_stub.GetProfile(hr_pb2.GetProfileRequest(user_id=request.user_id)) if profile.success: print(f"[Interview] Candidate skills: {profile.skills}") except grpc.RpcError as e: print(f"[Interview] Could not get profile: {e}") self.data.setdefault(request.user_id, []).append(request.date) return hr_pb2.InterviewResponse(success=True, message="Interview scheduled") def GetInterviews(self, request, context): print(f"[Interview] Getting interviews for user {request.user_id}") interviews = self.data.get(request.user_id, []) return hr_pb2.InterviewListResponse(interviews=interviews) # ========== Workspace Service ========== class WorkspaceService(hr_pb2_grpc.WorkspaceServiceServicer): def __init__(self, channel): self.profiles = {} self.job_descriptions = {} self.iam_stub = hr_pb2_grpc.IAMServiceStub(channel) self.test_stub = hr_pb2_grpc.PersonalityTestServiceStub(channel) def CreateProfile(self, request, context): print(f"[Workspace] Creating profile for user {request.user_id}") # Call IAM to verify user try: user = self.iam_stub.GetUser(hr_pb2.UserRequest(id=request.user_id)) print(f"[Workspace] Creating profile for verified user: {user.name}") except grpc.RpcError as e: print(f"[Workspace] Could not verify user: {e}") # Get personality test result try: test_result = self.test_stub.GetTestResult(hr_pb2.TestRequest(user_id=request.user_id)) print(f"[Workspace] User personality: {test_result.result}") except grpc.RpcError as e: print(f"[Workspace] Could not get personality test: {e}") self.profiles[request.user_id] = { 'name': request.name, 'skills': request.skills, 'experience': request.experience } return hr_pb2.ProfileResponse( user_id=request.user_id, name=request.name, skills=request.skills, experience=request.experience, success=True, message="Profile created successfully" ) def GetProfile(self, request, context): print(f"[Workspace] Getting profile for user {request.user_id}") profile = self.profiles.get(request.user_id) if profile: return hr_pb2.ProfileResponse( user_id=request.user_id, name=profile['name'], skills=profile['skills'], experience=profile['experience'], success=True, message="Profile retrieved" ) else: return hr_pb2.ProfileResponse( user_id=request.user_id, success=False, message="Profile not found" ) def CreateJobDescription(self, request, context): print(f"[Workspace] Creating job description {request.job_id}") self.job_descriptions[request.job_id] = { 'title': request.title, 'description': request.description, 'requirements': request.requirements } return hr_pb2.JobDescriptionResponse( job_id=request.job_id, title=request.title, description=request.description, requirements=request.requirements, success=True, message="Job description created successfully" ) def GetJobDescription(self, request, context): print(f"[Workspace] Getting job description {request.job_id}") job = self.job_descriptions.get(request.job_id) if job: return hr_pb2.JobDescriptionResponse( job_id=request.job_id, title=job['title'], description=job['description'], requirements=job['requirements'], success=True, message="Job description retrieved" ) else: return hr_pb2.JobDescriptionResponse( job_id=request.job_id, success=False, message="Job description not found" ) # ========== Server Function ========== def serve(): # Create internal channel for inter-service communication internal_channel = grpc.insecure_channel('localhost:50051') server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) hr_pb2_grpc.add_IAMServiceServicer_to_server(IAMService(), server) hr_pb2_grpc.add_PersonalityTestServiceServicer_to_server(PersonalityTestService(internal_channel), server) hr_pb2_grpc.add_InterviewServiceServicer_to_server(InterviewService(internal_channel), server) hr_pb2_grpc.add_WorkspaceServiceServicer_to_server(WorkspaceService(internal_channel), server) server.add_insecure_port('[::]:50051') server.start() print("🚀 gRPC server running on port 50051 with inter-service communication enabled") server.wait_for_termination() # ========== Client Function ========== def run_client(): time.sleep(2) # Wait for server to fully start with grpc.insecure_channel('localhost:50051') as channel: iam = hr_pb2_grpc.IAMServiceStub(channel) test = hr_pb2_grpc.PersonalityTestServiceStub(channel) interview = hr_pb2_grpc.InterviewServiceStub(channel) workspace = hr_pb2_grpc.WorkspaceServiceStub(channel) print("\n=== Testing IAM ===") user = iam.GetUser(hr_pb2.UserRequest(id=1)) print(f"✅ User: {user.name}, Role: {user.role}") login = iam.Login(hr_pb2.LoginRequest(username="admin", password="1234")) print(f"✅ Login: {login.message}") print("\n=== Testing Personality Test (calls IAM) ===") result = test.GetTestResult(hr_pb2.TestRequest(user_id=1)) print(f"✅ Personality result: {result.result}") print("\n=== Testing Workspace - Profile (calls IAM & PersonalityTest) ===") profile = workspace.CreateProfile(hr_pb2.ProfileRequest( user_id=1, name="Alice Johnson", skills="Python, gRPC, Microservices", experience="5 years" )) print(f"✅ Profile created: {profile.message}") print("\n=== Testing Interview Scheduling (calls IAM & Workspace) ===") interview.ScheduleInterview(hr_pb2.InterviewRequest(user_id=1, date="2025-10-25")) interview.ScheduleInterview(hr_pb2.InterviewRequest(user_id=1, date="2025-10-30")) interviews = interview.GetInterviews(hr_pb2.InterviewListRequest(user_id=1)) print(f"✅ Interview dates: {list(interviews.interviews)}") print("\n=== Testing Workspace - Job Description ===") job = workspace.CreateJobDescription(hr_pb2.JobDescriptionRequest( job_id=101, title="Senior Backend Engineer", description="Build scalable microservices", requirements="5+ years Python, gRPC experience" )) print(f"✅ Job description created: {job.message}") # ========== Run Both ========== if __name__ == "__main__": t = threading.Thread(target=serve, daemon=True) t.start() run_client()