Name: Samantha Brown Job Role: Lead Region: AU Phone Number: 555-1357 Email Address: sbrown@example.com
配置好了Agent和Task,然后就可以开始配置团队了(编写crew.py )
from pathlib import Path
from crewai import Agent, Crew, Process, Task from crewai.project import CrewBase, agent, crew, task from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource import os from dotenv import load_dotenv
load_dotenv() # 加载环境变量
@CrewBase classDevelopersCrew: """Crew for retrieving developers information."""
@crew defcrew(self) -> Crew: """Provides Employee details.""" return Crew( agents=self.agents, # Automatically collected by the @agent decorator tasks=self.tasks, # Automatically collected by the @task decorator process=Process.sequential, verbose=True, )
然后编写main.py ,给出程序的输入内容
from pydantic import BaseModel from crewai.flow.flow import Flow, listen, start , router , or_
from src.human_resource_assistance_flow.crews.developers_crew.developers_crew import DevelopersCrew from src.human_resource_assistance_flow.crews.leads_crew.leads_crew import LeadsCrew
@start() defget_employee_details(self): # In a real application, these details might come from user input or a database self.state.employeeCode = "L209" self.state.job_role = "Lead" print(f"employeeCode: {self.state.employeeCode}, job_role: {self.state.job_role}") return"details_collected"
@router(get_employee_details) defroute_based_on_relationship(self): """Route based on job role.""" if self.state.job_role == "Lead": return"Lead" elif self.state.job_role == "Developer": return"Developer" else: return"NA"
@listen("Lead") defleads_information(self): """Retrieve the infomation about the lead""" crew = LeadsCrew().crew() inputs = { "employeeCode": self.state.employeeCode } result = crew.kickoff(inputs=inputs) self.state.employee_details = result.raw print(f"Employee Information: {self.state.employee_details}") return"details_retrieved"
@listen("Developer") defdevelopers_information(self): """Retrieve the infomation about the developer""" crew = DevelopersCrew().crew() inputs = { "employeeCode": self.state.employeeCode, } result = crew.kickoff(inputs=inputs) self.state.employee_details = result.raw print(f"Employee Information: {self.state.employee_details}") return"details_retrieved"
@listen(or_(leads_information,developers_information)) defsave_details(self): withopen("employee_details.txt", "w") as file: file.write(self.state.employee_details) print("Employee details saved to employee_details.txt")