当前位置: 首页 > news >正文

大连专业手机自适应网站制作常用的搜索引擎

大连专业手机自适应网站制作,常用的搜索引擎,中企动力淄博分公司,做产品批发的网站目录 完整代码代码解释1. 消息的数据类:2. 创建代理人(MyAgent):3. 创建和运行代理人的运行时环境:4. 根据发送者路由消息的代理(RoutedBySenderAgent):5. 创建和运行带路由的代理&a…

目录

    • 完整代码
    • 代码解释
      • 1. 消息的数据类:
      • 2. 创建代理人(MyAgent):
      • 3. 创建和运行代理人的运行时环境:
      • 4. 根据发送者路由消息的代理(RoutedBySenderAgent):
      • 5. 创建和运行带路由的代理:
      • 6. 内外代理的通信(InnerAgent 和 OuterAgent)
      • 7. 运行内外代理
      • 8. 消息广播
        • 1. ReceivingAgent:接收消息的代理
        • 2. BroadcastingAgent:发布消息的代理
        • 3. 注册代理并处理消息
        • 4. 默认主题与订阅
        • 5. 注册代理并启动消息发布
    • 类似例子

完整代码

from dataclasses import dataclass@dataclass
class TextMessage:content: strsource: str@dataclass
class ImageMessage:url: strsource: str
from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handlerclass MyAgent(RoutedAgent):@message_handlerasync def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you said {message.content}!")@message_handlerasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you sent me {message.url}!")
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()
Hello, User, you said Hello, World!!
Hello, User, you sent me https://example.com/image.jpg!
class RoutedBySenderAgent(RoutedAgent):@message_handler(match=lambda msg, ctx: msg.source.startswith("user1"))  # type: ignoreasync def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello from user 1 handler, {message.source}, you said {message.content}!")@message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignoreasync def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello from user 2 handler, {message.source}, you said {message.content}!")@message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignoreasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you sent me {message.url}!")
runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hello from user 1 handler, user1-test, you said Hello, World!!
Hello from user 2 handler, user2-test, you said Hello, World!!
Hello, user2-test, you sent me https://example.com/image.jpg!
from dataclasses import dataclassfrom autogen_core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler@dataclass
class Message:content: strclass InnerAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> Message:return Message(content=f"Hello from inner, {message.content}")class OuterAgent(RoutedAgent):def __init__(self, description: str, inner_agent_type: str):super().__init__(description)self.inner_agent_id = AgentId(inner_agent_type, self.id.key)@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:print(f"Received message: {message.content}")# Send a direct message to the inner agent and receves a response.response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)print(f"Received inner response: {response.content}")
runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Received message: Hello, World!
Received inner response: Hello from inner, Hello from outer, Hello, World!
from autogen_core import RoutedAgent, message_handler, type_subscription@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:print(f"Received a message: {message.content}")
from autogen_core import TopicIdclass BroadcastingAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:await self.publish_message(Message("Publishing a message from broadcasting agent!"),topic_id=TopicId(type="default", source=self.id.key),)
from autogen_core import TypeSubscriptionruntime = SingleThreadedAgentRuntime()# Option 1: with type_subscription decorator
# The type_subscription class decorator automatically adds a TypeSubscription to
# the runtime when the agent is registered.
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=TopicId(type="default", source="default")
)
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!
from autogen_core import DefaultTopicId, default_subscription@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:# Publish a message to all agents in the same namespace.await self.publish_message(Message("Publishing a message from broadcasting agent!"),topic_id=DefaultTopicId(),)
runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent")
)
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Received a message: Hello, World! From the runtime!
Received a message: Publishing a message from broadcasting agent!

代码解释

1. 消息的数据类:

from dataclasses import dataclass@dataclass
class TextMessage:content: strsource: str@dataclass
class ImageMessage:url: strsource: str

这段代码定义了两个数据类:TextMessage 和 ImageMessage。它们分别用于表示文本消息和图片消息。

  • TextMessage 包含两个字段:content(消息内容)和 source(发送者)。
  • ImageMessage 包含 url(图片的链接地址)和 source(发送者)。

2. 创建代理人(MyAgent):

from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handlerclass MyAgent(RoutedAgent):@message_handlerasync def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you said {message.content}!")@message_handlerasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you sent me {message.url}!")

MyAgent 继承自 RoutedAgent 类,并定义了两个消息处理器:

  • on_text_message:处理 TextMessage 消息,打印出发送者和内容。
  • on_image_message:处理 ImageMessage 消息,打印出发送者和图片的链接地址。

3. 创建和运行代理人的运行时环境:

runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="User"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="User"), agent_id)
await runtime.stop_when_idle()

这段代码启动了一个单线程的运行时环境 SingleThreadedAgentRuntime

  • 注册了 MyAgent 代理,代理名称为 my_agent
  • 使用 runtime.send_message 向代理发送了一条文本消息和一条图片消息。
  • 最后,运行时环境会在空闲时停止。

4. 根据发送者路由消息的代理(RoutedBySenderAgent):

class RoutedBySenderAgent(RoutedAgent):@message_handler(match=lambda msg, ctx: msg.source.startswith("user1"))  # type: ignoreasync def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello from user 1 handler, {message.source}, you said {message.content}!")@message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignoreasync def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:print(f"Hello from user 2 handler, {message.source}, you said {message.content}!")@message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignoreasync def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:print(f"Hello, {message.source}, you sent me {message.url}!")

RoutedBySenderAgent 继承自 RoutedAgent,并通过 message_handler 装饰器根据消息的发送者 (source) 来路由消息。

  • 如果消息来自以 user1 开头的发送者,它会调用 on_user1_message 处理文本消息。
  • 如果消息来自以 user2 开头的发送者,它会调用 on_user2_message 处理文本消息。
  • 如果是来自 user2 的图片消息,则调用 on_image_message 进行处理。

5. 创建和运行带路由的代理:

runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()

在这个部分,运行时环境启动并注册了 RoutedBySenderAgent 代理。
发送了不同来源的文本消息和图片消息。
根据消息的 source 字段,消息会被路由到对应的处理函数。
最终,代理会打印不同的响应信息,取决于消息的发送者。

6. 内外代理的通信(InnerAgent 和 OuterAgent)

@dataclass
class Message:content: strclass InnerAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> Message:return Message(content=f"Hello from inner, {message.content}")class OuterAgent(RoutedAgent):def __init__(self, description: str, inner_agent_type: str):super().__init__(description)self.inner_agent_id = AgentId(inner_agent_type, self.id.key)@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:print(f"Received message: {message.content}")response = await self.send_message(Message(f"Hello from outer, {message.content}"), self.inner_agent_id)print(f"Received inner response: {response.content}")

InnerAgent 是一个简单的代理,它收到消息后,返回一个修改过的消息。

OuterAgent 也作为代理存在,并包含对 InnerAgent 的引用。它会发送消息给 InnerAgent,并等待回应。

7. 运行内外代理

runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()

这个部分启动了 InnerAgentOuterAgent 代理。

OuterAgent 会发送一条消息给 InnerAgent,并接收到来自 InnerAgent 的响应。

最终,输出显示了 OuterAgent 接收到的消息以及 InnerAgent 的响应。

8. 消息广播

这一部分涉及了通过 topic 和 subscription 实现的消息广播和接收机制。它展示了如何通过 topic 来发布消息以及如何订阅消息。

1. ReceivingAgent:接收消息的代理
from autogen_core import RoutedAgent, message_handler, type_subscription@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:print(f"Received a message: {message.content}")

ReceivingAgent 继承自 RoutedAgent,并使用了 @type_subscription(topic_type="default") 装饰器。这个装饰器将代理与某个类型的 topic(本例中是 default)关联。

当收到来自 default 主题的消息时,代理会调用 on_my_message 方法,并打印出消息内容。

2. BroadcastingAgent:发布消息的代理
from autogen_core import TopicIdclass BroadcastingAgent(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:await self.publish_message(Message("Publishing a message from broadcasting agent!"),topic_id=TopicId(type="default", source=self.id.key),)

BroadcastingAgent 也继承自 RoutedAgent,它定义了一个消息处理方法,当收到消息时,它会发布一条新的消息,消息内容为 “Publishing a message from broadcasting agent!”。

这条消息会发布到 default 主题,且 topic_id 包含代理的 source(即 self.id.key)作为源。

3. 注册代理并处理消息
from autogen_core import TypeSubscriptionruntime = SingleThreadedAgentRuntime()# Option 1: with type_subscription decorator
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))# Option 2: with TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))# Start the runtime and publish a message.
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=TopicId(type="default", source="default")
)
await runtime.stop_when_idle()

runtime 创建了一个单线程的运行时环境。

选项 1:通过 @type_subscription 装饰器,ReceivingAgent 被注册到运行时,它会订阅 default 主题。

选项 2BroadcastingAgent 注册后,通过 runtime.add_subscription() 将其与 default 主题的消息订阅关联起来。

runtime.start() 启动运行时并发布了一条消息 “Hello, World! From the runtime!”,这条消息会被所有订阅 default 主题的代理接收。

由于 ReceivingAgent 订阅了 default 主题,它会接收到来自 runtime.publish_message 的消息,并输出 “Received a message: Hello, World! From the runtime!”。

之后,BroadcastingAgent 发布了一条消息 “Publishing a message from broadcasting agent!”,并发送到 default 主题。

4. 默认主题与订阅
from autogen_core import DefaultTopicId, default_subscription@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):@message_handlerasync def on_my_message(self, message: Message, ctx: MessageContext) -> None:# Publish a message to all agents in the same namespace.await self.publish_message(Message("Publishing a message from broadcasting agent!"),topic_id=DefaultTopicId(),)

BroadcastingAgentDefaultTopic 通过 @default_subscription 装饰器,订阅了一个特殊的默认主题(DefaultTopicId())。

当它收到消息时,它会将 “Publishing a message from broadcasting agent!” 消息发布到同一命名空间的所有代理,所有订阅该主题的代理都会收到这条消息。

5. 注册代理并启动消息发布
runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent")
)
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("Hello, World! From the runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
  • 注册了 BroadcastingAgentDefaultTopicReceivingAgent 两个代理。

  • 在运行时启动后,runtime.publish_message 发布了一条消息 “Hello, World! From the runtime!” 到默认主题 DefaultTopicId(),这个消息会被 ReceivingAgent 接收并打印。

  • 随后,BroadcastingAgentDefaultTopic 会发布一条消息 “Publishing a message from broadcasting agent!”,并将其广播到同一个命名空间中的所有代理,ReceivingAgent 收到这条消息并打印。

类似例子

from autogen_core import RoutedAgent, message_handler, type_subscription, TopicId# Agent that will receive status updates
@type_subscription(topic_type="status")
class StatusReceiverAgent(RoutedAgent):@message_handlerasync def on_status_update(self, message: Message, ctx: MessageContext) -> None:print(f"Status received: {message.content}")# Agent that publishes status updates
@type_subscription(topic_type="command")
class StatusPublisherAgent(RoutedAgent):@message_handlerasync def on_command_message(self, message: Message, ctx: MessageContext) -> None:# Publish a status update to the "status" topicawait self.publish_message(Message("System is up and running!"),topic_id=TopicId(type="status", source=self.id.key),)# Agent runtime and agent registration
runtime = SingleThreadedAgentRuntime()# Register the status receiver and publisher agents
await StatusReceiverAgent.register(runtime, "status_receiver", lambda: StatusReceiverAgent("Status Receiver"))
await StatusPublisherAgent.register(runtime, "status_publisher", lambda: StatusPublisherAgent("Status Publisher"))# Start the runtime and simulate sending a command to the publisher
runtime.start()# The publisher publishes a status update when it receives a message (e.g., "send status update")
await runtime.publish_message(Message("send status update"), topic_id=TopicId(type="command", source="status_publisher"))# The receiver will print the status update message
await runtime.stop_when_idle()

输出

Status received: System is up and running!

参考链接:
https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/framework/message-and-communication.html

http://www.mmbaike.com/news/29375.html

相关文章:

  • 微信公众号做留言网站建网站需要什么条件
  • 杭州市做外贸网站的公司seo网站排名优化价格
  • 做网页设计师需要学什么seo专业课程
  • 做flash网站遇到函数广告竞价
  • 网站布局有哪些推广平台排行榜
  • 网站制作的分割线怎么做制作一个小型网站
  • 网站备案查询验证码错误深圳设计公司
  • 云速成美站做网站好吗网络安全有名的培训学校
  • 建设网站公司谁家好上海网站关键词排名优化报价
  • wordpress添加主题设置选项深圳网站关键词优化公司
  • web做网站含源代码google优化排名
  • 公司网站建设总结泰安百度推广代理商
  • asp sqlserver做网站2020 惠州seo服务
  • 政府网站 cms百度指数的作用
  • 第三方商城网站建设seo简介
  • 电商b2c网站icp备案免费下载百度软件
  • 企业网站搭建费用3d建模培训班一般多少钱
  • 网站的推广有哪些方式济南网站建设公司
  • 外汇网站开发推广普通话的宣传语
  • 湖北省住建厅网站官网百度收录需要多久
  • 做网站的排名网站seo具体怎么做?
  • 国内外网站开发技术有哪些舟山seo
  • 番禺网站建设平台一篇好的营销软文
  • 南宁网站建设电话百度的推广方式有哪些
  • 用cms建设网站课程宅门营销型网站建设应该考虑哪些因素
  • zencart 官方网站软文写作服务
  • 建设网站的步百度经验官网登录
  • 国内最最早做虚拟货币的网站关键词优化包年推广
  • 城市建设鹤岗市网站海口关键词优化报价
  • 国内三大电商平台分析报告宁波优化网站哪家好