Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 181 additions & 11 deletions packages/comms/src/feishu/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
import { createServer, type IncomingMessage, type ServerResponse } from 'node:http';
import { createLogger, msgId, type Message } from '@markus/shared';
import type { CommAdapter, CommAdapterConfig, IncomingMessageHandler, SendOptions } from '../adapter.js';
import { FeishuClient } from './client.js';
import { FeishuClient, type ReceiveIdType } from './client.js';
import { createHmac, randomBytes, createCipheriv, createDecipheriv, scrypt } from 'node:crypto';
import { promisify } from 'node:util';

const log = createLogger('feishu-adapter');

/** Extended send options for Feishu adapter */
export interface FeishuSendOptions extends SendOptions {
/** Target ID type for sending messages — 'chat_id' (default), 'open_id', 'user_id', 'union_id' */
receiveIdType?: ReceiveIdType;
/** Send as card/interactive message */
asCard?: boolean;
/** Rich text content (post format) */
richText?: boolean;
/** Send as image */
asImage?: boolean;
}

export interface FeishuAdapterConfig extends CommAdapterConfig {
platform: 'feishu';
appId: string;
appSecret: string;
verificationToken?: string;
encryptKey?: string;
webhookPort?: number;
/** Enable WebSocket event subscription instead of webhook */
wsMode?: boolean;
domain?: string;
}

Expand Down Expand Up @@ -50,6 +64,8 @@ export class FeishuAdapter implements CommAdapter {
private config?: FeishuAdapterConfig;
private handlers: IncomingMessageHandler[] = [];
private server?: ReturnType<typeof createServer>;
private ws?: any;
private wsHeartbeatTimer?: ReturnType<typeof setInterval>;
private connected = false;
private processedEvents = new Set<string>();

Expand All @@ -63,17 +79,22 @@ export class FeishuAdapter implements CommAdapter {

await this.client.getTenantToken();

const port = this.config.webhookPort ?? 9000;
this.server = createServer((req, res) => this.handleWebhook(req, res));
this.server.listen(port, () => {
log.info(`Feishu webhook server listening on port ${port}`);
});
if (this.config.wsMode) {
await this.setupWsSubscription();
} else {
const port = this.config.webhookPort ?? 9000;
this.server = createServer((req, res) => this.handleWebhook(req, res));
this.server.listen(port, () => {
log.info(`Feishu webhook server listening on port ${port}`);
});
}

this.connected = true;
log.info('Feishu adapter connected');
log.info(`Feishu adapter connected (mode: ${this.config.wsMode ? 'websocket' : 'webhook'})`);
}

async disconnect(): Promise<void> {
this.teardownWsSubscription();
if (this.server) {
this.server.close();
this.server = undefined;
Expand All @@ -84,20 +105,63 @@ export class FeishuAdapter implements CommAdapter {

async sendMessage(channelId: string, content: string, options?: SendOptions): Promise<string> {
if (!this.client) throw new Error('Feishu adapter not connected');
const feishuOpts = options as FeishuSendOptions | undefined;
const idType = feishuOpts?.receiveIdType ?? 'chat_id';

if (feishuOpts?.asCard) {
return this.client.sendInteractiveMessage(channelId, JSON.parse(content), idType);
}
if (options?.richText) {
return this.client.sendInteractiveMessage(channelId, JSON.parse(content));
return this.client.sendInteractiveMessage(channelId, JSON.parse(content), idType);
}
return this.client.sendTextMessage(channelId, content);
if (feishuOpts?.asImage) {
return this.client.sendTextMessage(channelId, content, idType);
}
return this.client.sendTextMessage(channelId, content, idType);
}

async sendCard(channelId: string, card: Record<string, unknown>): Promise<string> {
if (!this.client) throw new Error('Feishu adapter not connected');
return this.client.sendInteractiveMessage(channelId, card);
}

async sendReply(channelId: string, replyToId: string, content: string): Promise<string> {
async sendReply(channelId: string, replyToId: string, content: string, options?: SendOptions): Promise<string> {
if (!this.client) throw new Error('Feishu adapter not connected');
const feishuOpts = options as FeishuSendOptions | undefined;
const msgType = feishuOpts?.asCard ? 'interactive' : feishuOpts?.richText ? 'post' : 'text';

if (msgType === 'interactive') {
return this.client.replyCard(replyToId, JSON.parse(content));
}
// For rich text (post) and plain text, content format differs
if (msgType === 'post') {
return this.client.replyMessage(replyToId, content, 'post');
}
return this.client.replyMessage(replyToId, JSON.stringify({ text: content }));
}

async updateMessage(channelId: string, messageId: string, content: string): Promise<void> {
if (!this.client) throw new Error('Feishu adapter not connected');

try {
await this.client.updateMessage(messageId, JSON.stringify({ text: content }));
log.info(`Feishu message updated in channel ${channelId}: ${messageId}`);
} catch (error) {
log.error(`Failed to update Feishu message ${messageId} in ${channelId}:`, { error });
throw error;
}
}

async deleteMessage(channelId: string, messageId: string): Promise<void> {
if (!this.client) throw new Error('Feishu adapter not connected');
return this.client.replyMessage(replyToId, content);

try {
await this.client.deleteMessage(messageId);
log.info(`Feishu message deleted from channel ${channelId}: ${messageId}`);
} catch (error) {
log.error(`Failed to delete Feishu message ${messageId} from ${channelId}:`, { error });
throw error;
}
}

onMessage(handler: IncomingMessageHandler): void {
Expand All @@ -108,6 +172,112 @@ export class FeishuAdapter implements CommAdapter {
return this.connected;
}

// ─── WebSocket Event Subscription ────────────────────────────────────────────

private async setupWsSubscription(): Promise<void> {
if (!this.client || !this.config) return;
const token = await this.client.getTenantToken();

// Step 1: Get WebSocket URL from Feishu API
const res = await fetch(`${this.config.domain ?? 'https://open.feishu.cn'}/open-apis/ws/v1/apps/${this.config.appId}/subscribe`, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({}),
});

const data = (await res.json()) as { code: number; data?: { url?: string } };
if (data.code !== 0) {
throw new Error(`Feishu WS subscribe failed: ${JSON.stringify(data)}`);
}

const wsUrl = data.data?.url;
if (!wsUrl) {
throw new Error('Feishu WS subscribe returned no URL');
}

// Step 2: Connect WebSocket
this.ws = new (globalThis as any).WebSocket(wsUrl);

this.ws.onopen = () => {
log.info('Feishu WebSocket connected');
};

this.ws.onmessage = (event: { data: Buffer }) => {
try {
const payload = JSON.parse(event.data.toString()) as FeishuEvent;
this.handleWsEvent(payload).catch((err) => {
log.error('Failed to handle WS event', { error: err.message });
});
} catch (err) {
log.error('Failed to parse WS message', { error: err instanceof Error ? err.message : String(err) });
}
};

this.ws.onclose = (event: { code: number; reason: Buffer }) => {
log.warn(`Feishu WebSocket closed: code=${event.code}, reason=${event.reason.toString()}`);
// Auto-reconnect after 5 seconds
setTimeout(() => {
if (this.connected) {
log.info('Feishu WebSocket reconnecting...');
this.setupWsSubscription().catch((err) => {
log.error('Feishu WS reconnect failed', { error: err.message });
});
}
}, 5000);
};

this.ws.onerror = () => {
log.error('Feishu WebSocket error occurred');
};

// Step 3: Heartbeat at 30s intervals (Feishu WS requirement)
this.wsHeartbeatTimer = setInterval(() => {
if (this.ws?.readyState === (globalThis as any).WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'heartbeat' }));
}
}, 30_000);
}

private teardownWsSubscription(): void {
if (this.wsHeartbeatTimer) {
clearInterval(this.wsHeartbeatTimer);
this.wsHeartbeatTimer = undefined;
}
if (this.ws) {
this.ws.close();
this.ws = undefined;
}
}

private async handleWsEvent(event: FeishuEvent): Promise<void> {
// Handle challenge/pong
if (event.type === 'pong') return;

// Deduplicate events
const eventId = event.header?.event_id;
if (eventId) {
if (this.processedEvents.has(eventId)) return;
this.processedEvents.add(eventId);
if (this.processedEvents.size > 1000) {
const arr = Array.from(this.processedEvents);
this.processedEvents = new Set(arr.slice(-500));
}
}

// Process message events
if (event.header?.event_type === 'im.message.receive_v1') {
await this.processMessageEvent(event);
}

// Card action callbacks
if ((event as Record<string, unknown>)['action']) {
await this.processCardAction(event as Record<string, unknown>);
}
}

/**
* Decrypt Feishu encrypted webhook payload using AES-256-CBC.
* The encryptKey is derived via scrypt with salt='key' (Feishu convention).
Expand Down
Loading
Loading