Koishi 源码解析:生命周期与执行顺序 & 中间件与观察者机制

Koishi 源码解析:中间件与观察者机制

在 Koishi 的架构中,中间件 (Middleware) 与观察者 (Observer) 是负责消息生命周期流转与数据状态管理的两大核心模块。本文将基于 @packages/core/src 中的源码,深入解析这两者的执行原理与实现。

一、 中间件 (Middleware) 与事件执行机制

1. 什么是中间件?

在 Koishi 中,中间件本质上是一个基于洋葱模型(Onion Model)的异步函数:

export type Middleware<S extends Session = Session> = (session: S, next: Next) => Awaitable<void | Fragment>

中间件构成了机器人处理 message(消息)的核心拦截链,每个收到消息的事件都会进入此调用链,通过调用 next() 将执行权传递给下一个中间件。

2. 中间件的优先级

中间件通过 ctx.middleware(callback, prepend) 注册。

  • 普通注册ctx.middleware(fn) 会被添加到当前生命周期 Hooks 的队尾。
  • 前置注册ctx.middleware(fn, true) 会以 prepend: true 的模式插入到队首。
    这种机制确保了一些关键的内置处理可以优先于用户的业务逻辑运行。

3. 生命周期与执行顺序(中间件、事件与指令 Action)

当平台适配器(Adapter)触发 message 事件后,会进入 Processor 类的 _handleMessage 方法,构建一个中间件调用链(queue)。在此流程中,中间件、事件触发与指令(Command)的 Action 发生着严密的协作:

第一阶段:数据装载(前置核心中间件 attach
packages/core/src/middleware.ts 中,Koishi 以最高优先级 (prepend: true) 注册了一个名为 attach 的系统级中间件。

  1. 触发 before-attach 事件:此时指令系统 (Commander) 会拦截该事件并根据前缀解析出 session.argv;同时快捷方式与 Matcher 也会在此处进行检测,若命中则准备 session.response
  2. 触发数据绑定事件:先后触发 before-attach-channelbefore-attach-user 事件。各个模块(包含即将触发的指令)可以在此提交需要的字段依赖 (fields)。
  3. 按需抓取数据:通过 session.observeChannel()session.observeUser() 抓取所需字段的数据库数据,并将其转化为观察者对象附加到 Session。
  4. 触发 attach-channel / attach-user / attach 事件:所有数据均装载完毕。若前面命中过 Matcher 缓存了内容,则在此直接短路返回,不执行后续逻辑。

第二阶段:指令系统执行(Commander 级中间件)
packages/core/src/command/index.ts 在系统启动时注册了指令控制中间件:

  1. 指令匹配:检查 session.argv 是否能匹配到有效的 Command 实例。如果未匹配,则调用 next() 将控制权交给其他用户编写的普通中间件。
  2. 触发指令拦截器:如果匹配成功,调用 session.execute,进而调用 command.execute(argv, next)。首先执行指令的 checkers(包含参数校验、权限校验等),触发 command/before-execute 生命周期事件。
  3. 执行指令 Action(公开 API 与内部调度)
    对插件开发者来说,cmd.action(action, prepend?) 的执行函数签名是 action(argv, ...args),不接收 next 参数;prepend 用于把该 action 插入到当前指令 action 列表前部。
    在内部实现中,command.execute() 会构造执行队列,并将内部调度函数挂载到 argv.next。因此,源码层存在链式调度能力,但公开 API 仍以 action(argv, ...args) 为主。

第三阶段:结果收尾与数据落库
当完整的洋葱链执行结束(包含捕获可能的异常):

  1. 如果有返回结果,通过 session.send() 发送消息到对应群聊。
  2. finally 块中执行清理:
    await session.user?.$update()
    await session.channel?.$update()
    
    这里调用的便是 Observer 对象的方法,从而将在中间件及 Action 流程中产生的任何数据变动自动保存回数据库。
  3. 最终抛出 middleware 结束事件。

4. 当接收到消息时:完整处理周期时序图

sequenceDiagram
    autonumber
    participant Adapter as 平台适配器/机器人
    participant Bus as Context 事件总线
    participant PluginEvent as 插件事件监听(ctx.on)
    participant Processor as Processor._handleMessage(内置 message 监听)
    participant MW as 中间件队列
    participant Attach as attach(前置中间件)
    participant Event as 生命周期事件总线
    participant Commander as Commander 中间件
    participant Command as command.execute
    participant DB as Database/Observer
    participant Send as session.send

    Adapter->>Bus: emit('message', session)
    Note right of Bus: 【插件 API 执行点】ctx.on('message', handler)
    Bus->>PluginEvent: 调用插件 message 监听器
    PluginEvent->>Send: 可直接 session.send(...)
    Bus->>Processor: 调用 Processor._handleMessage(session)
    Processor->>MW: 构建 queue(filterHooks + bind session)
    Processor->>MW: next()

    MW->>Attach: 进入首个中间件(prepend=true)
    Attach->>Event: emit('before-attach')
    Note right of Event: 这里可能解析前缀/argv\nMatcher 也可能命中并设置 session.response
    Attach->>Event: emit('before-attach-channel', fields)
    Attach->>DB: session.observeChannel(fields)
    DB-->>Attach: 返回 channel 观察者对象
    Attach->>Event: serial('attach-channel')
    Attach->>Event: emit('before-attach-user', fields)
    Attach->>DB: session.observeUser(fields)
    DB-->>Attach: 返回 user 观察者对象
    Attach->>Event: serial('attach-user')
    Attach->>Event: emit('attach')

    alt attach 阶段已有 session.response
        Attach-->>Processor: 直接返回 response() (短路)
    else 继续执行后续中间件
        Note over MW: 【插件 API 执行点】ctx.middleware(callback, true)\n前置中间件比普通中间件更早进入队列
        Attach->>MW: next()
        Note over MW: 【插件 API 执行点】ctx.middleware(callback)\n普通/异步中间件在这里按注册顺序执行
        Note over MW: 【插件 API 执行点】dispose = ctx.middleware(...)\ndispose() 后将从队列移除
        Note over MW: 【插件 API 执行点】临时中间件:next(callback)\n会向当前会话队列动态追加一个回调/返回值
        MW->>Commander: 指令系统中间件
        alt resolveCommand(session.argv) 成功
            Commander->>Command: session.execute() -> command.execute(argv, next)
            Command->>Event: serial('command/before-execute')
            Command->>Command: 依次执行 checkers
            Command->>Command: 执行 actions 队列(按 prepend/注册顺序)
            Note right of Command: 【插件 API 执行点】ctx.command(...).action(...)
            Note right of Command: 公开签名: (argv, ...args)\n内部存在 argv.next 调度
            Command-->>Commander: 返回 Fragment/void
        else 未识别指令
            Commander->>MW: next()
        end
        MW-->>Processor: 队列结束
    end

    Processor->>Send: 若 result 存在则 session.send(result)
    Processor->>DB: finally: session.user?.$update()
    Processor->>DB: finally: session.channel?.$update()
    Processor->>DB: finally: session.guild?.$update()
    Processor->>Event: emit('middleware')

5. 第三方插件常用 API 在处理周期中的落点

下面把常见插件写法和“它到底在时序图哪一段执行”逐一对齐。

5.1 指令 API:ctx.command(...).action(...)

ctx.command('echo <message>')
  .action((_, message) => message)

执行落点:Commander 中间件识别出指令后,进入 session.execute(),随后触发 command.execute(),最后执行 action 队列。
公开 API 语义如下:

  • cmd.action(action, prepend?)
  • action: (argv, ...args) => Awaitable<string | void>
  • prepend: boolean 决定是否前置插入 action 顺序

也就是说,插件作者在 .action() 回调里拿到的是 argv 和参数,不是 (session, next) 这种中间件签名。

5.1.1 多次调用 .action(...) 会发生什么?

这一行为可以分成“注册顺序”和“执行行为”两个层面。

(1) 公开 API 暴露链路(从 ctxcmd.action

  1. Context 构造阶段通过 this.mixin('$commander', ['command'])command 方法混入到 ctx
  2. 内部实现会把 $commander 服务实例挂载到上下文(源码中可见 provide 用法,这是框架内部实现细节)。
  3. ctx.command(...) 实际调用的是 Commander.command(...),返回 Command 实例。
  4. Command 实例的 action(callback, prepend = false) 会把回调记录到内部数组 _actions,并返回 this,因此可以链式调用。

(2) 多个 action 的注册顺序

  • .action(fn)push_actions 末尾
  • .action(fn, true)unshift_actions 开头

prepend 只影响“该 action 在列表中的先后顺序”。

(3) 多个 action 的执行顺序与停止条件

command.execute() 会把 _actions 映射为一个内部队列 queue,再把 fallback 压到末尾,然后从 argv.next() 启动执行。
由于公开 action 回调签名不包含 next 参数,在默认写法下:

  • 第一个被执行的 action 如果返回了字符串/片段:立即作为结果返回。
  • 第一个 action 如果返回 void 且不主动调用 argv.next:执行同样在这里结束(最终返回空字符串)。
  • 后续 action 不会“自动”继续执行。

结论:多次 .action() 并不等于中间件那种天然逐层透传。默认情况下,通常只会执行到“当前 action 决定终止”的位置。
因此阅读源码时可以将其理解为:公开层是 action(argv, ...args),实现层才包含 argv.next 调度能力。

5.1.2 插件开发中如何实现“多 action 透传”

如果确实需要在一个指令上串联多个 action,可以使用 argv.next 进行显式透传。推荐把它视为“高级用法”:可行,但应谨慎使用并做好版本升级回归测试。

基础写法(当前 action 主动放行)

ctx.command('demo <text>')
  .action(async (argv, text) => {
    // 前置逻辑
    if (!text.trim()) return '请输入内容。'
    // 显式透传给后续 action / fallback
    return argv.next?.()
  })
  .action((argv, text) => {
    // 后续 action
    return `处理完成:${text}`
  })

在透传前后包裹逻辑(近似“前后置”效果)

ctx.command('timed <text>')
  .action(async (argv, text) => {
    const start = Date.now()
    const result = await argv.next?.()
    const cost = Date.now() - start
    argv.session?.app.logger('timed').info(`执行耗时 ${cost}ms`)
    return result
  }, true) // prepend=true,优先进入该 action
  .action((_, text) => `echo: ${text}`)

使用建议

  • 需要“逐层放行”时,使用 return argv.next?.() 显式透传。
  • 不调用 argv.next 且直接 return 非空结果,会在当前 action 终止执行链。
  • 返回 void 但也不调用 argv.next,通常会导致后续 action 不再执行(最终结果为空)。
  • 当 action 逻辑较复杂时,优先考虑改用 ctx.middleware() 承担流程控制,再让 action 专注业务产出。

5.1.3 cmd.action(action, prepend?)prepend=true 的含义

prepend=true 表示将当前 action 插入到该指令 action 列表的头部(unshift),因此它会比已注册的普通 action 更早执行。

ctx.command('order')
  .action(() => 'B')          // 先注册
  .action(() => 'A', true)    // 前置插入

在上述示例中,执行顺序是 A -> B(前提是 A 内部选择透传,例如调用 argv.next)。
因此,prepend 控制的是“进入顺序”,不是“是否自动继续执行后续 action”。

5.2 事件 API:ctx.on('message', ...)

ctx.on('message', (session) => {
  if (session.content === '天王盖地虎') {
    session.send('宝塔镇河妖')
  }
})

执行落点:Adapter -> Context.emit('message') 之后立刻进入事件总线分发阶段
它与 Processor 同属 message 监听器体系,因此在宏观上位于“中间件链启动之前的事件派发入口”。

5.3 普通中间件:ctx.middleware(callback) + dispose()

declare const callback: import('koishi').Middleware
// ---cut---
const dispose = ctx.middleware(callback)
dispose() // 取消中间件

执行落点:在 attach 之后、Commander 前后(取决于注册先后)进入中间件队列。
dispose() 会注销该 hook,使后续消息不再进入该中间件。

5.4 异步中间件:await + next()

ctx.middleware(async (session, next) => {
  // 获取数据库中的用户信息
  // 这里只是示例,事实上 Koishi 会自动获取数据库中的信息并存放在 session.user 中
  const user = await session.getUser(session.userId)
  if (user.authority === 0) {
    return '抱歉,你没有权限访问机器人。'
  } else {
    return next()
  }
})

执行落点:与普通中间件一致,只是具备异步等待能力。
若返回字符串/消息片段会短路后续链路;若 return next() 则继续向后传递。

5.5 前置中间件:ctx.middleware(callback, true)

let times = 0 // 复读次数
let message = '' // 当前消息

ctx.middleware((session, next) => {
  if (session.content === message) {
    times += 1
    if (times === 3) return message
  } else {
    times = 0
    message = session.content
    return next()
  }
}, true /* true 表示这是前置中间件 */)

执行落点:会通过 prepend 插入靠前位置,优先于后注册的普通中间件。
注意:Koishi 核心自身也有一个系统前置中间件 attach,业务前置中间件并不一定早于它(取决于注册时机与作用域)。

5.6 临时中间件:next(callback) 动态追加

let times = 0 // 复读次数
let message = '' // 当前消息

ctx.middleware((session, next) => {
  if (session.content === message) {
    times += 1
    if (times === 3) return next(message)
  } else {
    times = 0
    message = session.content
    return next()
  }
}, true)

执行落点:运行在中间件内部;调用 next(callback) 时,会向当前会话执行队列动态 push 一段临时回调(或直接返回值),属于同一轮消息处理周期内的“局部插桩”。

6. 补充:泳道流程图(按职责分区)

flowchart LR
    A[收到平台消息] --> B[Context.emit 'message']

    subgraph Lane1[泳道1: 事件层]
      B --> C1[插件事件监听 ctx.on(message)]
      B --> C2[Processor._handleMessage]
    end

    subgraph Lane2[泳道2: 会话中间件层]
      C2 --> D1[构建 middleware queue]
      D1 --> D2[attach 前置中间件]
      D2 --> D3[before-attach / attach-channel / attach-user / attach 事件]
      D2 --> D4[observeChannel / observeUser]
      D2 --> D5{session.response?}
      D5 -- 是 --> D6[短路返回 response]
      D5 -- 否 --> D7[继续后续中间件]
      D7 --> D8[插件前置中间件 ctx.middleware(..., true)]
      D8 --> D9[插件普通/异步中间件 ctx.middleware(...)]
      D9 --> D10[可 dispose 注销]
      D9 --> D11[可 next(callback) 注入临时回调]
    end

    subgraph Lane3[泳道3: 指令层]
      D11 --> E1[Commander.resolveCommand]
      E1 --> E2{是否识别到指令}
      E2 -- 是 --> E3[session.execute]
      E3 --> E4[command.checkers + command/before-execute]
      E4 --> E5[action 队列: ctx.command(...).action(..., prepend?)]
      E5 --> E5a[公开参数: argv + args]
      E5a --> E5b[内部调度: argv.next + fallback]
      E2 -- 否 --> E6[回到中间件 next]
    end

    subgraph Lane4[泳道4: 输出与持久化]
      D6 --> F1[session.send]
      E5 --> F1
      E6 --> F2[若有 result 则 send]
      F1 --> F3[finally: user/channel/guild.$update]
      F2 --> F3
      F3 --> F4[emit middleware 结束事件]
    end

二、 观察者 (Observer) 对象

在处理上下文和数据库关联时,Koishi 使用了观察者模式,通过代理(Proxy)来追踪对象变更,实现零感知的按需数据库更新,而不是在每次消息处理结束时都去进行昂贵的全量更新。

1. 观察者的核心结构与实现原理

观察者的底层实现主要位于 @koishijs/utils/src/observe.ts 中:

  • 深层 Proxy 劫持observe() 会返回对象的 Proxy 包装。当读取某个嵌套的普通对象时,它会按需去返回该内层对象的 Proxy。
  • $diff 记录差分:观察者对象会被混入一个隐藏的 $diff 属性,它是一个 Partial<T>。所有的 setdeleteProperty 劫持方法在被触发时,如果不属于未经修改的操作,都会将被更改的键同步记录到 $diff 内部。
  • $update 批量应用:在触发更新时,观察者会调用 $update()。它首先提取所有的 $diff 对象,并将原始 $diff 清空,随后将变动集传递给初始传入的 update 回调(通常封装了数据库 set 操作)。
  • $merge 方法:如果在生命周期中需要主动拉取并覆盖新数据,可以使用 $merge 方法无缝合并。

2. 在 Koishi 源码中的实际使用

session.ts 中,获取绑定用户数据调用的是:

const cache = observe(data, async (diff) => {
    if (data['$detached']) return
    await this.app.database.setUser(this.platform, userId, diff as any)
}, `user ${this.uid}`)

在任何指令动作 (Action) 或中间件中,开发者只需:

// 直接赋值
session.user.authority = 2;

authority 的修改立即会被 Proxy 捕获并记录至 session.user.$diff 属性中。在消息处理流程结束后,生命周期的 finally 块执行 await session.user?.$update(),从而执行回调,将 diff 内容安全、精准地保存到数据库,无需手动写 database.set()

3. 如何实现自定义的观察者对象?

你可以引入 @koishijs/utils 提供的 observe API 来构建自己的业务观察者:

import { observe } from '@koishijs/utils'

// 1. 准备你的目标对象
const myConfig = {
  theme: 'dark',
  points: 100,
  settings: {
    notifications: true
  }
};

// 2. 将其转化为观察者对象
// 参数 1:需要监听的对象
// 参数 2:触发 $update() 时的回调逻辑,接收参数就是被改动过的键值对集合 (diff)
const observedConfig = observe(myConfig, async (diff) => {
  console.log('检测到数据变动:', diff);
  // 在此处执行你自定义的数据持久化逻辑,如写入本地文件,或同步 API
  // await fs.writeFile('config.json', JSON.stringify({ ...myConfig, ...diff }))
});

// 3. 像普通对象一样使用它
observedConfig.points += 50;
observedConfig.theme = 'light';

// (此时并不会立即触发 console.log)

// 4. 在合适的时机(例如流程末尾),调用 $update 批量应用变更
observedConfig.$update();
// 输出: 检测到数据变动: { points: 150, theme: 'light' }

这种模式在需要进行复杂状态处理或高频操作修改(最后统一合并落库)时,具有极大的性能提升与开发体验优势。

4. 进阶:通过注入 Context 实现“类似 user/channel”的观察者落库

如果希望达到 session.user / session.channel 那种体验,推荐做法是:

  1. 把“创建观察者对象”的能力封装成 Context 服务(通过 ctx.set() 挂载服务实例)。
  2. WeakMap<Session, Set<Observed<...>>> 记录“本轮消息处理创建过的观察者”。
  3. 监听 middleware 结束事件,在每轮消息收尾统一执行 $update()

下面给出一个可落地的模式(以 profile 为例):

import { Context, Session } from 'koishi'
import { observe, Observed } from '@koishijs/utils'

interface Profile {
  id: string
  score: number
  updatedAt: Date
  $detached?: boolean
}

declare module 'koishi' {
  interface Context {
    $profileStore: ProfileStore
    observeProfile(session: Session, id?: string): Promise<Observed<Profile, Promise<void>>>
  }
}

class ProfileStore {
  private pending = new WeakMap<Session, Set<Observed<Profile, Promise<void>>>>()

  constructor(private ctx: Context) {}

  async observeProfile(session: Session, id = session.userId) {
    // 下面的数据库读写以 profile 表为例,请替换为你的实际模型/字段
    const data = await this.ctx.database.get('profile', { id }, ['id', 'score', 'updatedAt'])
      .then(rows => rows[0] || { id, score: 0, updatedAt: new Date(), $detached: true } as Profile)

    const cache = observe(data, async (diff) => {
      if (data.$detached) return
      await this.ctx.database.set('profile', { id }, diff)
    }, `profile ${id}`)

    const set = this.pending.get(session) || new Set<Observed<Profile, Promise<void>>>()
    set.add(cache)
    this.pending.set(session, set)
    return cache
  }

  async flush(session: Session) {
    const set = this.pending.get(session)
    if (!set) return
    this.pending.delete(session)
    for (const obj of set) {
      await obj.$update()
    }
  }
}

export function apply(ctx: Context) {
  // 1) 注册服务实例(插件侧推荐使用 set)
  ctx.set('$profileStore', new ProfileStore(ctx))
  // 2) 暴露 Context API:ctx.observeProfile(...)
  ctx.mixin('$profileStore', ['observeProfile'])
  // 3) 在每轮消息处理结束时统一落库(与核心流程收尾时机一致)
  ctx.on('middleware', async (session) => {
    await ctx.$profileStore.flush(session)
  })
}

在插件业务代码中即可这样使用:

ctx.command('profile.add <n:number>')
  .action(async ({ session }, n) => {
    const profile = await ctx.observeProfile(session)
    profile.score += n
    profile.updatedAt = new Date()
    return `当前积分:${profile.score}`
  })

这个模式的关键价值是:业务侧只做“普通赋值”,而持久化在消息周期收尾统一处理,体验和核心 user/channel 观察者模型保持一致。

4.1 mixin 的作用与是否必须使用

mixin 的作用是把“服务对象的方法”代理到 Context 上,便于调用。

  • 使用 ctx.mixin('$profileStore', ['observeProfile']) 后,可以直接写 ctx.observeProfile(session)
  • 不使用 mixin 也完全可行,此时写法是 ctx.$profileStore.observeProfile(session)

可将其理解为:

// 语义上近似于(示意)
ctx.observeProfile = (...args) => ctx.$profileStore.observeProfile(...args)

因此,mixin 负责“API 形态(易用性)”,不负责“服务注册(生命周期)”。

4.2 关于 provide 的说明

在 Koishi 内部源码里仍能看到 provide 的使用,但已标记弃用的接口,本文示例采用 ctx.set() 作为推荐写法。


参考资料

6 个赞

这么nb,羡慕

2 个赞