http_gateway.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. from flask import Flask, jsonify, request
  2. import grpc
  3. import hr_pb2
  4. import hr_pb2_grpc
  5. app = Flask(__name__)
  6. # gRPC channel setup
  7. channel = grpc.insecure_channel('localhost:50051')
  8. iam_stub = hr_pb2_grpc.IAMServiceStub(channel)
  9. test_stub = hr_pb2_grpc.PersonalityTestServiceStub(channel)
  10. interview_stub = hr_pb2_grpc.InterviewServiceStub(channel)
  11. @app.route('/')
  12. def home():
  13. return '''
  14. <h1>HR gRPC Gateway</h1>
  15. <h2>Available Endpoints:</h2>
  16. <ul>
  17. <li><a href="/user/1">GET /user/&lt;id&gt;</a> - Get user info</li>
  18. <li>POST /login - Login (send JSON: {"username": "admin", "password": "1234"})</li>
  19. <li><a href="/test/1">GET /test/&lt;user_id&gt;</a> - Get personality test result</li>
  20. <li>POST /interview - Schedule interview (send JSON: {"user_id": 1, "date": "2025-10-25"})</li>
  21. <li><a href="/interviews/1">GET /interviews/&lt;user_id&gt;</a> - Get user interviews</li>
  22. </ul>
  23. '''
  24. @app.route('/user/<int:user_id>')
  25. def get_user(user_id):
  26. try:
  27. response = iam_stub.GetUser(hr_pb2.UserRequest(id=user_id))
  28. return jsonify({
  29. 'id': response.id,
  30. 'name': response.name,
  31. 'role': response.role
  32. })
  33. except grpc.RpcError as e:
  34. return jsonify({'error': str(e)}), 500
  35. @app.route('/login', methods=['POST'])
  36. def login():
  37. try:
  38. data = request.json
  39. response = iam_stub.Login(hr_pb2.LoginRequest(
  40. username=data['username'],
  41. password=data['password']
  42. ))
  43. return jsonify({
  44. 'success': response.success,
  45. 'message': response.message
  46. })
  47. except grpc.RpcError as e:
  48. return jsonify({'error': str(e)}), 500
  49. @app.route('/test/<int:user_id>')
  50. def get_test(user_id):
  51. try:
  52. response = test_stub.GetTestResult(hr_pb2.TestRequest(user_id=user_id))
  53. return jsonify({
  54. 'user_id': response.user_id,
  55. 'result': response.result
  56. })
  57. except grpc.RpcError as e:
  58. return jsonify({'error': str(e)}), 500
  59. @app.route('/interview', methods=['POST'])
  60. def schedule_interview():
  61. try:
  62. data = request.json
  63. response = interview_stub.ScheduleInterview(hr_pb2.InterviewRequest(
  64. user_id=data['user_id'],
  65. date=data['date']
  66. ))
  67. return jsonify({
  68. 'success': response.success,
  69. 'message': response.message
  70. })
  71. except grpc.RpcError as e:
  72. return jsonify({'error': str(e)}), 500
  73. @app.route('/interviews/<int:user_id>')
  74. def get_interviews(user_id):
  75. try:
  76. response = interview_stub.GetInterviews(hr_pb2.InterviewListRequest(user_id=user_id))
  77. return jsonify({
  78. 'user_id': user_id,
  79. 'interviews': list(response.interviews)
  80. })
  81. except grpc.RpcError as e:
  82. return jsonify({'error': str(e)}), 500
  83. if __name__ == '__main__':
  84. print("🌐 HTTP Gateway running on http://localhost:5000")
  85. app.run(debug=True, port=5000)