Initial commit with Advoware proxy

This commit is contained in:
root
2025-10-19 14:57:07 +00:00
commit 273aa8b549
45771 changed files with 5534555 additions and 0 deletions

View File

@@ -0,0 +1,107 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const stream_group_1 = require("../src/stream-group");
describe('StreamGroupSubscription', () => {
const joinMessage = {
streamName: 'test-stream',
groupId: 'test-group',
subscriptionId: 'sub-1',
};
function makeMessage(type, data, timestamp = Date.now()) {
return {
streamName: 'test-stream',
groupId: 'test-group',
timestamp,
event: { type, ...(type === 'event' ? { event: data } : { data }) },
};
}
it('notifies change listeners on state change', () => {
const sub = new stream_group_1.StreamGroupSubscription(joinMessage);
const listener = jest.fn();
sub.addChangeListener(listener);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
expect(listener).toHaveBeenCalledWith(syncData);
});
it('should be able to sort by a key', () => {
const sub = new stream_group_1.StreamGroupSubscription(joinMessage, 'name');
const listener = jest.fn();
sub.addChangeListener(listener);
const syncData = [
{ id: '1', name: 'B' },
{ id: '2', name: 'A' },
];
sub.listener(makeMessage('sync', syncData));
expect(listener).toHaveBeenCalledWith(syncData);
expect(sub.getState()).toEqual([
{ id: '2', name: 'A' },
{ id: '1', name: 'B' },
]);
});
it('should discard old events', () => {
const sub = new stream_group_1.StreamGroupSubscription(joinMessage);
const listener = jest.fn();
sub.addChangeListener(listener);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
const oldSyncData = [
{ id: '3', name: 'C' },
{ id: '4', name: 'D' },
];
sub.listener(makeMessage('sync', oldSyncData, Date.now() - 1000));
expect(sub.getState()).toEqual(syncData);
});
it('should add items', () => {
const sub = new stream_group_1.StreamGroupSubscription(joinMessage);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('create', { id: '3', name: 'C' }));
expect(sub.getState()).toEqual([...syncData, { id: '3', name: 'C' }]);
});
it('should update items', () => {
const sub = new stream_group_1.StreamGroupSubscription(joinMessage);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('update', { id: '1', name: 'A1' }));
expect(sub.getState()).toEqual([
{ id: '1', name: 'A1' },
{ id: '2', name: 'B' },
]);
});
it('should remove items', () => {
const sub = new stream_group_1.StreamGroupSubscription(joinMessage);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('delete', { id: '1' }));
expect(sub.getState()).toEqual([{ id: '2', name: 'B' }]);
});
it('should discard old on event on a particular item', () => {
const sub = new stream_group_1.StreamGroupSubscription(joinMessage);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('update', { id: '1', name: 'A1' }));
sub.listener(makeMessage('update', { id: '1', name: 'A2' }, Date.now() - 1000));
expect(sub.getState()).toEqual([
{ id: '1', name: 'A1' },
{ id: '2', name: 'B' },
]);
});
});

View File

@@ -0,0 +1,48 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const stream_item_1 = require("../src/stream-item");
describe('StreamItemSubscription', () => {
const joinMessage = {
streamName: 'test-stream',
groupId: 'test-group',
subscriptionId: 'sub-1',
};
function makeMessage(type, data, timestamp = Date.now()) {
return {
streamName: 'test-stream',
groupId: 'test-group',
timestamp,
event: { type, ...(type === 'event' ? { event: data } : { data }) },
};
}
it('notifies change listeners on state change', () => {
const sub = new stream_item_1.StreamItemSubscription(joinMessage);
const listener = jest.fn();
sub.addChangeListener(listener);
const syncData = { id: '1', name: 'A', value: 1 };
sub.listener(makeMessage('sync', syncData));
expect(listener).toHaveBeenCalledWith(syncData);
});
it('should discard old events', () => {
const sub = new stream_item_1.StreamItemSubscription(joinMessage);
const syncData = { id: '1', name: 'A', value: 1 };
const oldSyncData = { id: '1', name: 'B', value: 3 };
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('sync', oldSyncData, Date.now() - 1000));
expect(sub.getState()).toEqual(syncData);
});
it('should update item', () => {
const sub = new stream_item_1.StreamItemSubscription(joinMessage);
const syncData = { id: '1', name: 'A', value: 1 };
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('update', { id: '1', name: 'A1', value: 2 }, Date.now() + 1000));
expect(sub.getState()).toEqual({ id: '1', name: 'A1', value: 2 });
});
it('should remove item', () => {
const sub = new stream_item_1.StreamItemSubscription(joinMessage);
const syncData = { id: '1', name: 'A', value: 1 };
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('delete', { id: '1' }, Date.now() + 1000));
expect(sub.getState()).toEqual(null);
});
});

View File

@@ -0,0 +1 @@
export {};

View File

@@ -0,0 +1,83 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const stream_1 = require("../src/stream");
class MockSocket {
constructor() {
this.listeners = {};
}
addEventListener(event, cb) {
if (!this.listeners[event])
this.listeners[event] = [];
this.listeners[event].push(cb);
}
connect() { }
isOpen() {
return true;
}
onMessage(callback) {
this.addEventListener('message', callback);
}
onOpen(callback) {
this.addEventListener('open', callback);
}
onClose(callback) {
this.addEventListener('close', callback);
}
send(data) {
this.listeners['message']?.forEach((cb) => cb(data));
}
close() {
this.listeners = {};
}
}
function getSocket(stream) {
return stream.ws;
}
function makeGroupMessage(type, data, timestamp = Date.now()) {
return {
streamName: 'test-stream',
groupId: 'test-group',
timestamp,
event: { type, ...(type === 'event' ? { event: data } : { data }) },
};
}
function makeItemMessage(type, data, timestamp = Date.now()) {
return {
streamName: 'test-stream',
groupId: 'test-group',
id: '1',
timestamp,
event: { type, ...(type === 'event' ? { event: data } : { data }) },
};
}
describe('Stream', () => {
beforeEach(() => {
global.WebSocket = MockSocket;
});
afterEach(() => {
delete global.WebSocket;
});
it('should sync group events', () => {
const stream = new stream_1.Stream(() => new MockSocket());
const socket = getSocket(stream);
const sub = stream.subscribeGroup('test-stream', 'test-group');
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
socket.send(JSON.stringify(makeGroupMessage('sync', syncData)));
expect(sub.getState()).toEqual(syncData);
});
it('should not sync item events in group subscriptions', () => {
const stream = new stream_1.Stream(() => new MockSocket());
const socket = getSocket(stream);
const sub = stream.subscribeGroup('test-stream', 'test-group');
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
socket.send(JSON.stringify(makeGroupMessage('sync', syncData)));
socket.send(JSON.stringify(makeItemMessage('sync', syncData[0])));
expect(sub.getState()).toEqual(syncData);
});
});

View File

@@ -0,0 +1,7 @@
export { Stream } from './src/stream';
export { StreamItemSubscription } from './src/stream-item';
export { StreamGroupSubscription } from './src/stream-group';
export { StreamSubscription } from './src/stream-subscription';
export { SocketAdapter } from './src/socket-adapter';
export { SocketAdapterFactory } from './src/adapter-factory';
export * from './src/stream.types';

View File

@@ -0,0 +1,26 @@
"use strict";
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __exportStar = (this && this.__exportStar) || function(m, exports) {
for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p);
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.StreamSubscription = exports.StreamGroupSubscription = exports.StreamItemSubscription = exports.Stream = void 0;
var stream_1 = require("./src/stream");
Object.defineProperty(exports, "Stream", { enumerable: true, get: function () { return stream_1.Stream; } });
var stream_item_1 = require("./src/stream-item");
Object.defineProperty(exports, "StreamItemSubscription", { enumerable: true, get: function () { return stream_item_1.StreamItemSubscription; } });
var stream_group_1 = require("./src/stream-group");
Object.defineProperty(exports, "StreamGroupSubscription", { enumerable: true, get: function () { return stream_group_1.StreamGroupSubscription; } });
var stream_subscription_1 = require("./src/stream-subscription");
Object.defineProperty(exports, "StreamSubscription", { enumerable: true, get: function () { return stream_subscription_1.StreamSubscription; } });
__exportStar(require("./src/stream.types"), exports);

View File

@@ -0,0 +1,2 @@
import { SocketAdapter } from './socket-adapter';
export type SocketAdapterFactory = () => SocketAdapter;

View File

@@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

View File

@@ -0,0 +1,9 @@
export interface SocketAdapter {
connect(): void;
close(): void;
send(message: string): void;
isOpen(): boolean;
onMessage(callback: (message: string) => void): void;
onOpen(callback: () => void): void;
onClose(callback: () => void): void;
}

View File

@@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

View File

@@ -0,0 +1,13 @@
import { StreamSubscription } from './stream-subscription';
import { GroupEventMessage, JoinMessage } from './stream.types';
export declare class StreamGroupSubscription<TData extends {
id: string;
}> extends StreamSubscription<TData[], GroupEventMessage<TData>> {
private sortKey?;
private lastTimestamp;
private lastTimestampMap;
constructor(sub: JoinMessage, sortKey?: keyof TData | undefined);
private sort;
protected setState(state: TData[]): void;
listener(message: GroupEventMessage<TData>): void;
}

View File

@@ -0,0 +1,69 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.StreamGroupSubscription = void 0;
const stream_subscription_1 = require("./stream-subscription");
class StreamGroupSubscription extends stream_subscription_1.StreamSubscription {
constructor(sub, sortKey) {
super(sub, []);
this.sortKey = sortKey;
this.lastTimestamp = 0;
this.lastTimestampMap = new Map();
}
sort(state) {
const sortKey = this.sortKey;
if (sortKey) {
return state.sort((a, b) => {
const aValue = a[sortKey];
const bValue = b[sortKey];
if (aValue && bValue) {
return aValue.toString().localeCompare(bValue.toString());
}
return 0;
});
}
return state;
}
setState(state) {
super.setState(this.sort(state));
}
listener(message) {
if (message.event.type === 'sync') {
if (message.timestamp < this.lastTimestamp) {
return;
}
this.lastTimestampMap = new Map();
this.lastTimestamp = message.timestamp;
this.setState(message.event.data);
}
else if (message.event.type === 'create') {
const id = message.event.data.id;
const state = this.getState();
if (!state.find((item) => item.id === id)) {
this.setState([...state, message.event.data]);
}
}
else if (message.event.type === 'update') {
const messageData = message.event.data;
const messageDataId = messageData.id;
const state = this.getState();
const currentItemTimestamp = this.lastTimestampMap.get(messageDataId);
if (currentItemTimestamp && currentItemTimestamp >= message.timestamp) {
return;
}
this.lastTimestamp = message.timestamp;
this.lastTimestampMap.set(messageDataId, message.timestamp);
this.setState(state.map((item) => (item.id === messageDataId ? messageData : item)));
}
else if (message.event.type === 'delete') {
const messageDataId = message.event.data.id;
const state = this.getState();
this.lastTimestamp = message.timestamp;
this.lastTimestampMap.set(messageDataId, message.timestamp);
this.setState(state.filter((item) => item.id !== messageDataId));
}
else if (message.event.type === 'event') {
this.onEventReceived(message.event.event);
}
}
}
exports.StreamGroupSubscription = StreamGroupSubscription;

View File

@@ -0,0 +1,9 @@
import { StreamSubscription } from './stream-subscription';
import { ItemEventMessage, JoinMessage } from './stream.types';
export declare class StreamItemSubscription<TData extends {
id: string;
}> extends StreamSubscription<TData | null, ItemEventMessage<TData>> {
private lastEventTimestamp;
constructor(sub: JoinMessage);
listener(message: ItemEventMessage<TData>): void;
}

View File

@@ -0,0 +1,26 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.StreamItemSubscription = void 0;
const stream_subscription_1 = require("./stream-subscription");
class StreamItemSubscription extends stream_subscription_1.StreamSubscription {
constructor(sub) {
super(sub, null);
this.lastEventTimestamp = 0;
}
listener(message) {
if (message.timestamp <= this.lastEventTimestamp) {
return;
}
this.lastEventTimestamp = message.timestamp;
if (message.event.type === 'sync' || message.event.type === 'create' || message.event.type === 'update') {
this.setState(message.event.data);
}
else if (message.event.type === 'delete') {
this.setState(null);
}
else if (message.event.type === 'event') {
this.onEventReceived(message.event.event);
}
}
}
exports.StreamItemSubscription = StreamItemSubscription;

View File

@@ -0,0 +1,36 @@
import { CustomEvent, JoinMessage, Listener } from './stream.types';
type CustomEventListener = (event: any) => void;
export declare abstract class StreamSubscription<TData = unknown, TEventData = unknown> {
private customEventListeners;
private closeListeners;
private onChangeListeners;
private state;
readonly sub: JoinMessage;
constructor(sub: JoinMessage, state: TData);
abstract listener(message: TEventData): void;
protected onEventReceived(event: CustomEvent): void;
/**
* Add a custom event listener. This listener will be called whenever the custom event is received.
*/
onEvent(type: string, listener: CustomEventListener): void;
/**
* Remove a custom event listener.
*/
offEvent(type: string, listener: CustomEventListener): void;
onClose(listener: () => void): void;
close(): void;
/**
* Add a change listener. This listener will be called whenever the state of the group changes.
*/
addChangeListener(listener: Listener<TData>): void;
/**
* Remove a change listener.
*/
removeChangeListener(listener: Listener<TData>): void;
/**
* Get the current state of the group.
*/
getState(): TData;
protected setState(state: TData): void;
}
export {};

View File

@@ -0,0 +1,63 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.StreamSubscription = void 0;
class StreamSubscription {
constructor(sub, state) {
this.customEventListeners = new Map();
this.closeListeners = new Set();
this.onChangeListeners = new Set();
this.sub = sub;
this.state = state;
}
onEventReceived(event) {
const customEventListeners = this.customEventListeners.get(event.type);
if (customEventListeners) {
const eventData = event.data;
customEventListeners.forEach((listener) => listener(eventData));
}
}
/**
* Add a custom event listener. This listener will be called whenever the custom event is received.
*/
onEvent(type, listener) {
const listeners = this.customEventListeners.get(type) || [];
this.customEventListeners.set(type, [...listeners, listener]);
}
/**
* Remove a custom event listener.
*/
offEvent(type, listener) {
const listeners = this.customEventListeners.get(type) || [];
this.customEventListeners.set(type, listeners.filter((l) => l !== listener));
}
onClose(listener) {
this.closeListeners.add(listener);
}
close() {
this.closeListeners.forEach((listener) => listener());
this.closeListeners.clear();
}
/**
* Add a change listener. This listener will be called whenever the state of the group changes.
*/
addChangeListener(listener) {
this.onChangeListeners.add(listener);
}
/**
* Remove a change listener.
*/
removeChangeListener(listener) {
this.onChangeListeners.delete(listener);
}
/**
* Get the current state of the group.
*/
getState() {
return this.state;
}
setState(state) {
this.state = state;
this.onChangeListeners.forEach((listener) => listener(state));
}
}
exports.StreamSubscription = StreamSubscription;

View File

@@ -0,0 +1,38 @@
import { StreamGroupSubscription } from './stream-group';
import { StreamItemSubscription } from './stream-item';
import { SocketAdapter } from './socket-adapter';
import { SocketAdapterFactory } from './adapter-factory';
export declare class Stream {
private adapterFactory;
private ws;
private listeners;
constructor(adapterFactory: SocketAdapterFactory);
createSocket(): SocketAdapter;
/**
* Subscribe to an item in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
* @argument id - The id of the item to subscribe to.
*/
subscribeItem<TData extends {
id: string;
}>(streamName: string, groupId: string, id: string): StreamItemSubscription<TData>;
/**
* Subscribe to a group in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
*/
subscribeGroup<TData extends {
id: string;
}>(streamName: string, groupId: string, sortKey?: keyof TData): StreamGroupSubscription<TData>;
close(): void;
private onSocketClose;
private onSocketOpen;
messageListener(event: string): void;
private subscribe;
private join;
private leave;
private roomName;
}

View File

@@ -0,0 +1,102 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Stream = void 0;
const uuid_1 = require("uuid");
const stream_group_1 = require("./stream-group");
const stream_item_1 = require("./stream-item");
class Stream {
constructor(adapterFactory) {
this.adapterFactory = adapterFactory;
this.listeners = {};
this.ws = this.createSocket();
}
createSocket() {
this.ws = this.adapterFactory();
this.ws.onMessage((message) => this.messageListener(message));
this.ws.onOpen(() => this.onSocketOpen());
this.ws.onClose(() => this.onSocketClose());
return this.ws;
}
/**
* Subscribe to an item in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
* @argument id - The id of the item to subscribe to.
*/
subscribeItem(streamName, groupId, id) {
const subscriptionId = (0, uuid_1.v4)();
const sub = { streamName, groupId, id, subscriptionId };
const subscription = new stream_item_1.StreamItemSubscription(sub);
this.subscribe(subscription);
return subscription;
}
/**
* Subscribe to a group in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
*/
subscribeGroup(streamName, groupId, sortKey) {
const subscriptionId = (0, uuid_1.v4)();
const sub = { streamName, groupId, subscriptionId };
const subscription = new stream_group_1.StreamGroupSubscription(sub, sortKey);
this.subscribe(subscription);
return subscription;
}
close() {
this.listeners = {}; // clean up all listeners
this.ws.close();
}
onSocketClose() {
// retry to connect
setTimeout(() => this.createSocket(), 2000);
}
onSocketOpen() {
Object.values(this.listeners).forEach((listeners) => {
listeners.forEach((subscription) => this.join(subscription));
});
}
messageListener(event) {
const message = JSON.parse(event);
const room = this.roomName(message);
this.listeners[room]?.forEach((listener) => listener.listener(message));
// we need to discard sync to group subs when it's an item event
if (message.id && message.event.type !== 'sync') {
const groupRoom = this.roomName({
streamName: message.streamName,
groupId: message.groupId,
});
this.listeners[groupRoom]?.forEach((listener) => listener.listener(message));
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
subscribe(subscription) {
const room = this.roomName(subscription.sub);
if (!this.listeners[room]) {
this.listeners[room] = new Set();
}
this.listeners[room].add(subscription);
this.join(subscription);
subscription.onClose(() => {
this.listeners[room]?.delete(subscription);
this.leave(subscription);
});
}
join(subscription) {
if (this.ws.isOpen()) {
this.ws.send(JSON.stringify({ type: 'join', data: subscription.sub }));
}
}
leave(subscription) {
if (this.ws.isOpen()) {
this.ws.send(JSON.stringify({ type: 'leave', data: subscription.sub }));
}
}
roomName(message) {
return message.id
? `${message.streamName}:group:${message.groupId}:item:${message.id}`
: `${message.streamName}:group:${message.groupId}`;
}
}
exports.Stream = Stream;

View File

@@ -0,0 +1,56 @@
export type BaseMessage = {
streamName: string;
groupId: string;
id?: string;
timestamp: number;
};
export type JoinMessage = Omit<BaseMessage, 'timestamp'> & {
subscriptionId: string;
};
export type CustomEvent = {
type: string;
data: any;
};
export type StreamEvent<TData extends {
id: string;
}> = {
type: 'create';
data: TData;
} | {
type: 'update';
data: TData;
} | {
type: 'delete';
data: TData;
} | {
type: 'event';
event: CustomEvent;
};
export type ItemStreamEvent<TData extends {
id: string;
}> = StreamEvent<TData> | {
type: 'sync';
data: TData;
};
export type GroupStreamEvent<TData extends {
id: string;
}> = StreamEvent<TData> | {
type: 'sync';
data: TData[];
};
export type ItemEventMessage<TData extends {
id: string;
}> = BaseMessage & {
event: ItemStreamEvent<TData>;
};
export type GroupEventMessage<TData extends {
id: string;
}> = BaseMessage & {
event: GroupStreamEvent<TData>;
};
export type Message = {
type: 'join' | 'leave';
data: JoinMessage;
};
export type Listener<TData> = (state: TData | null) => void;
export type CustomEventListener<TData> = (event: TData) => void;

View File

@@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

View File

@@ -0,0 +1,105 @@
import { StreamGroupSubscription } from '../src/stream-group';
describe('StreamGroupSubscription', () => {
const joinMessage = {
streamName: 'test-stream',
groupId: 'test-group',
subscriptionId: 'sub-1',
};
function makeMessage(type, data, timestamp = Date.now()) {
return {
streamName: 'test-stream',
groupId: 'test-group',
timestamp,
event: { type, ...(type === 'event' ? { event: data } : { data }) },
};
}
it('notifies change listeners on state change', () => {
const sub = new StreamGroupSubscription(joinMessage);
const listener = jest.fn();
sub.addChangeListener(listener);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
expect(listener).toHaveBeenCalledWith(syncData);
});
it('should be able to sort by a key', () => {
const sub = new StreamGroupSubscription(joinMessage, 'name');
const listener = jest.fn();
sub.addChangeListener(listener);
const syncData = [
{ id: '1', name: 'B' },
{ id: '2', name: 'A' },
];
sub.listener(makeMessage('sync', syncData));
expect(listener).toHaveBeenCalledWith(syncData);
expect(sub.getState()).toEqual([
{ id: '2', name: 'A' },
{ id: '1', name: 'B' },
]);
});
it('should discard old events', () => {
const sub = new StreamGroupSubscription(joinMessage);
const listener = jest.fn();
sub.addChangeListener(listener);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
const oldSyncData = [
{ id: '3', name: 'C' },
{ id: '4', name: 'D' },
];
sub.listener(makeMessage('sync', oldSyncData, Date.now() - 1000));
expect(sub.getState()).toEqual(syncData);
});
it('should add items', () => {
const sub = new StreamGroupSubscription(joinMessage);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('create', { id: '3', name: 'C' }));
expect(sub.getState()).toEqual([...syncData, { id: '3', name: 'C' }]);
});
it('should update items', () => {
const sub = new StreamGroupSubscription(joinMessage);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('update', { id: '1', name: 'A1' }));
expect(sub.getState()).toEqual([
{ id: '1', name: 'A1' },
{ id: '2', name: 'B' },
]);
});
it('should remove items', () => {
const sub = new StreamGroupSubscription(joinMessage);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('delete', { id: '1' }));
expect(sub.getState()).toEqual([{ id: '2', name: 'B' }]);
});
it('should discard old on event on a particular item', () => {
const sub = new StreamGroupSubscription(joinMessage);
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('update', { id: '1', name: 'A1' }));
sub.listener(makeMessage('update', { id: '1', name: 'A2' }, Date.now() - 1000));
expect(sub.getState()).toEqual([
{ id: '1', name: 'A1' },
{ id: '2', name: 'B' },
]);
});
});

View File

@@ -0,0 +1,46 @@
import { StreamItemSubscription } from '../src/stream-item';
describe('StreamItemSubscription', () => {
const joinMessage = {
streamName: 'test-stream',
groupId: 'test-group',
subscriptionId: 'sub-1',
};
function makeMessage(type, data, timestamp = Date.now()) {
return {
streamName: 'test-stream',
groupId: 'test-group',
timestamp,
event: { type, ...(type === 'event' ? { event: data } : { data }) },
};
}
it('notifies change listeners on state change', () => {
const sub = new StreamItemSubscription(joinMessage);
const listener = jest.fn();
sub.addChangeListener(listener);
const syncData = { id: '1', name: 'A', value: 1 };
sub.listener(makeMessage('sync', syncData));
expect(listener).toHaveBeenCalledWith(syncData);
});
it('should discard old events', () => {
const sub = new StreamItemSubscription(joinMessage);
const syncData = { id: '1', name: 'A', value: 1 };
const oldSyncData = { id: '1', name: 'B', value: 3 };
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('sync', oldSyncData, Date.now() - 1000));
expect(sub.getState()).toEqual(syncData);
});
it('should update item', () => {
const sub = new StreamItemSubscription(joinMessage);
const syncData = { id: '1', name: 'A', value: 1 };
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('update', { id: '1', name: 'A1', value: 2 }, Date.now() + 1000));
expect(sub.getState()).toEqual({ id: '1', name: 'A1', value: 2 });
});
it('should remove item', () => {
const sub = new StreamItemSubscription(joinMessage);
const syncData = { id: '1', name: 'A', value: 1 };
sub.listener(makeMessage('sync', syncData));
sub.listener(makeMessage('delete', { id: '1' }, Date.now() + 1000));
expect(sub.getState()).toEqual(null);
});
});

View File

@@ -0,0 +1 @@
export {};

View File

@@ -0,0 +1,81 @@
import { Stream } from '../src/stream';
class MockSocket {
constructor() {
this.listeners = {};
}
addEventListener(event, cb) {
if (!this.listeners[event])
this.listeners[event] = [];
this.listeners[event].push(cb);
}
connect() { }
isOpen() {
return true;
}
onMessage(callback) {
this.addEventListener('message', callback);
}
onOpen(callback) {
this.addEventListener('open', callback);
}
onClose(callback) {
this.addEventListener('close', callback);
}
send(data) {
this.listeners['message']?.forEach((cb) => cb(data));
}
close() {
this.listeners = {};
}
}
function getSocket(stream) {
return stream.ws;
}
function makeGroupMessage(type, data, timestamp = Date.now()) {
return {
streamName: 'test-stream',
groupId: 'test-group',
timestamp,
event: { type, ...(type === 'event' ? { event: data } : { data }) },
};
}
function makeItemMessage(type, data, timestamp = Date.now()) {
return {
streamName: 'test-stream',
groupId: 'test-group',
id: '1',
timestamp,
event: { type, ...(type === 'event' ? { event: data } : { data }) },
};
}
describe('Stream', () => {
beforeEach(() => {
global.WebSocket = MockSocket;
});
afterEach(() => {
delete global.WebSocket;
});
it('should sync group events', () => {
const stream = new Stream(() => new MockSocket());
const socket = getSocket(stream);
const sub = stream.subscribeGroup('test-stream', 'test-group');
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
socket.send(JSON.stringify(makeGroupMessage('sync', syncData)));
expect(sub.getState()).toEqual(syncData);
});
it('should not sync item events in group subscriptions', () => {
const stream = new Stream(() => new MockSocket());
const socket = getSocket(stream);
const sub = stream.subscribeGroup('test-stream', 'test-group');
const syncData = [
{ id: '1', name: 'A' },
{ id: '2', name: 'B' },
];
socket.send(JSON.stringify(makeGroupMessage('sync', syncData)));
socket.send(JSON.stringify(makeItemMessage('sync', syncData[0])));
expect(sub.getState()).toEqual(syncData);
});
});

View File

@@ -0,0 +1,7 @@
export { Stream } from './src/stream';
export { StreamItemSubscription } from './src/stream-item';
export { StreamGroupSubscription } from './src/stream-group';
export { StreamSubscription } from './src/stream-subscription';
export { SocketAdapter } from './src/socket-adapter';
export { SocketAdapterFactory } from './src/adapter-factory';
export * from './src/stream.types';

View File

@@ -0,0 +1,5 @@
export { Stream } from './src/stream';
export { StreamItemSubscription } from './src/stream-item';
export { StreamGroupSubscription } from './src/stream-group';
export { StreamSubscription } from './src/stream-subscription';
export * from './src/stream.types';

View File

@@ -0,0 +1,2 @@
import { SocketAdapter } from './socket-adapter';
export type SocketAdapterFactory = () => SocketAdapter;

View File

@@ -0,0 +1 @@
export {};

View File

@@ -0,0 +1,9 @@
export interface SocketAdapter {
connect(): void;
close(): void;
send(message: string): void;
isOpen(): boolean;
onMessage(callback: (message: string) => void): void;
onOpen(callback: () => void): void;
onClose(callback: () => void): void;
}

View File

@@ -0,0 +1 @@
export {};

View File

@@ -0,0 +1,13 @@
import { StreamSubscription } from './stream-subscription';
import { GroupEventMessage, JoinMessage } from './stream.types';
export declare class StreamGroupSubscription<TData extends {
id: string;
}> extends StreamSubscription<TData[], GroupEventMessage<TData>> {
private sortKey?;
private lastTimestamp;
private lastTimestampMap;
constructor(sub: JoinMessage, sortKey?: keyof TData | undefined);
private sort;
protected setState(state: TData[]): void;
listener(message: GroupEventMessage<TData>): void;
}

View File

@@ -0,0 +1,65 @@
import { StreamSubscription } from './stream-subscription';
export class StreamGroupSubscription extends StreamSubscription {
constructor(sub, sortKey) {
super(sub, []);
this.sortKey = sortKey;
this.lastTimestamp = 0;
this.lastTimestampMap = new Map();
}
sort(state) {
const sortKey = this.sortKey;
if (sortKey) {
return state.sort((a, b) => {
const aValue = a[sortKey];
const bValue = b[sortKey];
if (aValue && bValue) {
return aValue.toString().localeCompare(bValue.toString());
}
return 0;
});
}
return state;
}
setState(state) {
super.setState(this.sort(state));
}
listener(message) {
if (message.event.type === 'sync') {
if (message.timestamp < this.lastTimestamp) {
return;
}
this.lastTimestampMap = new Map();
this.lastTimestamp = message.timestamp;
this.setState(message.event.data);
}
else if (message.event.type === 'create') {
const id = message.event.data.id;
const state = this.getState();
if (!state.find((item) => item.id === id)) {
this.setState([...state, message.event.data]);
}
}
else if (message.event.type === 'update') {
const messageData = message.event.data;
const messageDataId = messageData.id;
const state = this.getState();
const currentItemTimestamp = this.lastTimestampMap.get(messageDataId);
if (currentItemTimestamp && currentItemTimestamp >= message.timestamp) {
return;
}
this.lastTimestamp = message.timestamp;
this.lastTimestampMap.set(messageDataId, message.timestamp);
this.setState(state.map((item) => (item.id === messageDataId ? messageData : item)));
}
else if (message.event.type === 'delete') {
const messageDataId = message.event.data.id;
const state = this.getState();
this.lastTimestamp = message.timestamp;
this.lastTimestampMap.set(messageDataId, message.timestamp);
this.setState(state.filter((item) => item.id !== messageDataId));
}
else if (message.event.type === 'event') {
this.onEventReceived(message.event.event);
}
}
}

View File

@@ -0,0 +1,9 @@
import { StreamSubscription } from './stream-subscription';
import { ItemEventMessage, JoinMessage } from './stream.types';
export declare class StreamItemSubscription<TData extends {
id: string;
}> extends StreamSubscription<TData | null, ItemEventMessage<TData>> {
private lastEventTimestamp;
constructor(sub: JoinMessage);
listener(message: ItemEventMessage<TData>): void;
}

View File

@@ -0,0 +1,22 @@
import { StreamSubscription } from './stream-subscription';
export class StreamItemSubscription extends StreamSubscription {
constructor(sub) {
super(sub, null);
this.lastEventTimestamp = 0;
}
listener(message) {
if (message.timestamp <= this.lastEventTimestamp) {
return;
}
this.lastEventTimestamp = message.timestamp;
if (message.event.type === 'sync' || message.event.type === 'create' || message.event.type === 'update') {
this.setState(message.event.data);
}
else if (message.event.type === 'delete') {
this.setState(null);
}
else if (message.event.type === 'event') {
this.onEventReceived(message.event.event);
}
}
}

View File

@@ -0,0 +1,36 @@
import { CustomEvent, JoinMessage, Listener } from './stream.types';
type CustomEventListener = (event: any) => void;
export declare abstract class StreamSubscription<TData = unknown, TEventData = unknown> {
private customEventListeners;
private closeListeners;
private onChangeListeners;
private state;
readonly sub: JoinMessage;
constructor(sub: JoinMessage, state: TData);
abstract listener(message: TEventData): void;
protected onEventReceived(event: CustomEvent): void;
/**
* Add a custom event listener. This listener will be called whenever the custom event is received.
*/
onEvent(type: string, listener: CustomEventListener): void;
/**
* Remove a custom event listener.
*/
offEvent(type: string, listener: CustomEventListener): void;
onClose(listener: () => void): void;
close(): void;
/**
* Add a change listener. This listener will be called whenever the state of the group changes.
*/
addChangeListener(listener: Listener<TData>): void;
/**
* Remove a change listener.
*/
removeChangeListener(listener: Listener<TData>): void;
/**
* Get the current state of the group.
*/
getState(): TData;
protected setState(state: TData): void;
}
export {};

View File

@@ -0,0 +1,59 @@
export class StreamSubscription {
constructor(sub, state) {
this.customEventListeners = new Map();
this.closeListeners = new Set();
this.onChangeListeners = new Set();
this.sub = sub;
this.state = state;
}
onEventReceived(event) {
const customEventListeners = this.customEventListeners.get(event.type);
if (customEventListeners) {
const eventData = event.data;
customEventListeners.forEach((listener) => listener(eventData));
}
}
/**
* Add a custom event listener. This listener will be called whenever the custom event is received.
*/
onEvent(type, listener) {
const listeners = this.customEventListeners.get(type) || [];
this.customEventListeners.set(type, [...listeners, listener]);
}
/**
* Remove a custom event listener.
*/
offEvent(type, listener) {
const listeners = this.customEventListeners.get(type) || [];
this.customEventListeners.set(type, listeners.filter((l) => l !== listener));
}
onClose(listener) {
this.closeListeners.add(listener);
}
close() {
this.closeListeners.forEach((listener) => listener());
this.closeListeners.clear();
}
/**
* Add a change listener. This listener will be called whenever the state of the group changes.
*/
addChangeListener(listener) {
this.onChangeListeners.add(listener);
}
/**
* Remove a change listener.
*/
removeChangeListener(listener) {
this.onChangeListeners.delete(listener);
}
/**
* Get the current state of the group.
*/
getState() {
return this.state;
}
setState(state) {
this.state = state;
this.onChangeListeners.forEach((listener) => listener(state));
}
}

View File

@@ -0,0 +1,38 @@
import { StreamGroupSubscription } from './stream-group';
import { StreamItemSubscription } from './stream-item';
import { SocketAdapter } from './socket-adapter';
import { SocketAdapterFactory } from './adapter-factory';
export declare class Stream {
private adapterFactory;
private ws;
private listeners;
constructor(adapterFactory: SocketAdapterFactory);
createSocket(): SocketAdapter;
/**
* Subscribe to an item in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
* @argument id - The id of the item to subscribe to.
*/
subscribeItem<TData extends {
id: string;
}>(streamName: string, groupId: string, id: string): StreamItemSubscription<TData>;
/**
* Subscribe to a group in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
*/
subscribeGroup<TData extends {
id: string;
}>(streamName: string, groupId: string, sortKey?: keyof TData): StreamGroupSubscription<TData>;
close(): void;
private onSocketClose;
private onSocketOpen;
messageListener(event: string): void;
private subscribe;
private join;
private leave;
private roomName;
}

View File

@@ -0,0 +1,98 @@
import { v4 as uuidv4 } from 'uuid';
import { StreamGroupSubscription } from './stream-group';
import { StreamItemSubscription } from './stream-item';
export class Stream {
constructor(adapterFactory) {
this.adapterFactory = adapterFactory;
this.listeners = {};
this.ws = this.createSocket();
}
createSocket() {
this.ws = this.adapterFactory();
this.ws.onMessage((message) => this.messageListener(message));
this.ws.onOpen(() => this.onSocketOpen());
this.ws.onClose(() => this.onSocketClose());
return this.ws;
}
/**
* Subscribe to an item in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
* @argument id - The id of the item to subscribe to.
*/
subscribeItem(streamName, groupId, id) {
const subscriptionId = uuidv4();
const sub = { streamName, groupId, id, subscriptionId };
const subscription = new StreamItemSubscription(sub);
this.subscribe(subscription);
return subscription;
}
/**
* Subscribe to a group in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
*/
subscribeGroup(streamName, groupId, sortKey) {
const subscriptionId = uuidv4();
const sub = { streamName, groupId, subscriptionId };
const subscription = new StreamGroupSubscription(sub, sortKey);
this.subscribe(subscription);
return subscription;
}
close() {
this.listeners = {}; // clean up all listeners
this.ws.close();
}
onSocketClose() {
// retry to connect
setTimeout(() => this.createSocket(), 2000);
}
onSocketOpen() {
Object.values(this.listeners).forEach((listeners) => {
listeners.forEach((subscription) => this.join(subscription));
});
}
messageListener(event) {
const message = JSON.parse(event);
const room = this.roomName(message);
this.listeners[room]?.forEach((listener) => listener.listener(message));
// we need to discard sync to group subs when it's an item event
if (message.id && message.event.type !== 'sync') {
const groupRoom = this.roomName({
streamName: message.streamName,
groupId: message.groupId,
});
this.listeners[groupRoom]?.forEach((listener) => listener.listener(message));
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
subscribe(subscription) {
const room = this.roomName(subscription.sub);
if (!this.listeners[room]) {
this.listeners[room] = new Set();
}
this.listeners[room].add(subscription);
this.join(subscription);
subscription.onClose(() => {
this.listeners[room]?.delete(subscription);
this.leave(subscription);
});
}
join(subscription) {
if (this.ws.isOpen()) {
this.ws.send(JSON.stringify({ type: 'join', data: subscription.sub }));
}
}
leave(subscription) {
if (this.ws.isOpen()) {
this.ws.send(JSON.stringify({ type: 'leave', data: subscription.sub }));
}
}
roomName(message) {
return message.id
? `${message.streamName}:group:${message.groupId}:item:${message.id}`
: `${message.streamName}:group:${message.groupId}`;
}
}

View File

@@ -0,0 +1,56 @@
export type BaseMessage = {
streamName: string;
groupId: string;
id?: string;
timestamp: number;
};
export type JoinMessage = Omit<BaseMessage, 'timestamp'> & {
subscriptionId: string;
};
export type CustomEvent = {
type: string;
data: any;
};
export type StreamEvent<TData extends {
id: string;
}> = {
type: 'create';
data: TData;
} | {
type: 'update';
data: TData;
} | {
type: 'delete';
data: TData;
} | {
type: 'event';
event: CustomEvent;
};
export type ItemStreamEvent<TData extends {
id: string;
}> = StreamEvent<TData> | {
type: 'sync';
data: TData;
};
export type GroupStreamEvent<TData extends {
id: string;
}> = StreamEvent<TData> | {
type: 'sync';
data: TData[];
};
export type ItemEventMessage<TData extends {
id: string;
}> = BaseMessage & {
event: ItemStreamEvent<TData>;
};
export type GroupEventMessage<TData extends {
id: string;
}> = BaseMessage & {
event: GroupStreamEvent<TData>;
};
export type Message = {
type: 'join' | 'leave';
data: JoinMessage;
};
export type Listener<TData> = (state: TData | null) => void;
export type CustomEventListener<TData> = (event: TData) => void;

View File

@@ -0,0 +1 @@
export {};

View File

@@ -0,0 +1,7 @@
export { Stream } from './src/stream';
export { StreamItemSubscription } from './src/stream-item';
export { StreamGroupSubscription } from './src/stream-group';
export { StreamSubscription } from './src/stream-subscription';
export { SocketAdapter } from './src/socket-adapter';
export { SocketAdapterFactory } from './src/adapter-factory';
export * from './src/stream.types';

View File

@@ -0,0 +1,2 @@
import { SocketAdapter } from './socket-adapter';
export type SocketAdapterFactory = () => SocketAdapter;

View File

@@ -0,0 +1,9 @@
export interface SocketAdapter {
connect(): void;
close(): void;
send(message: string): void;
isOpen(): boolean;
onMessage(callback: (message: string) => void): void;
onOpen(callback: () => void): void;
onClose(callback: () => void): void;
}

View File

@@ -0,0 +1,13 @@
import { StreamSubscription } from './stream-subscription';
import { GroupEventMessage, JoinMessage } from './stream.types';
export declare class StreamGroupSubscription<TData extends {
id: string;
}> extends StreamSubscription<TData[], GroupEventMessage<TData>> {
private sortKey?;
private lastTimestamp;
private lastTimestampMap;
constructor(sub: JoinMessage, sortKey?: keyof TData | undefined);
private sort;
protected setState(state: TData[]): void;
listener(message: GroupEventMessage<TData>): void;
}

View File

@@ -0,0 +1,9 @@
import { StreamSubscription } from './stream-subscription';
import { ItemEventMessage, JoinMessage } from './stream.types';
export declare class StreamItemSubscription<TData extends {
id: string;
}> extends StreamSubscription<TData | null, ItemEventMessage<TData>> {
private lastEventTimestamp;
constructor(sub: JoinMessage);
listener(message: ItemEventMessage<TData>): void;
}

View File

@@ -0,0 +1,36 @@
import { CustomEvent, JoinMessage, Listener } from './stream.types';
type CustomEventListener = (event: any) => void;
export declare abstract class StreamSubscription<TData = unknown, TEventData = unknown> {
private customEventListeners;
private closeListeners;
private onChangeListeners;
private state;
readonly sub: JoinMessage;
constructor(sub: JoinMessage, state: TData);
abstract listener(message: TEventData): void;
protected onEventReceived(event: CustomEvent): void;
/**
* Add a custom event listener. This listener will be called whenever the custom event is received.
*/
onEvent(type: string, listener: CustomEventListener): void;
/**
* Remove a custom event listener.
*/
offEvent(type: string, listener: CustomEventListener): void;
onClose(listener: () => void): void;
close(): void;
/**
* Add a change listener. This listener will be called whenever the state of the group changes.
*/
addChangeListener(listener: Listener<TData>): void;
/**
* Remove a change listener.
*/
removeChangeListener(listener: Listener<TData>): void;
/**
* Get the current state of the group.
*/
getState(): TData;
protected setState(state: TData): void;
}
export {};

View File

@@ -0,0 +1,38 @@
import { StreamGroupSubscription } from './stream-group';
import { StreamItemSubscription } from './stream-item';
import { SocketAdapter } from './socket-adapter';
import { SocketAdapterFactory } from './adapter-factory';
export declare class Stream {
private adapterFactory;
private ws;
private listeners;
constructor(adapterFactory: SocketAdapterFactory);
createSocket(): SocketAdapter;
/**
* Subscribe to an item in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
* @argument id - The id of the item to subscribe to.
*/
subscribeItem<TData extends {
id: string;
}>(streamName: string, groupId: string, id: string): StreamItemSubscription<TData>;
/**
* Subscribe to a group in a stream.
*
* @argument streamName - The name of the stream to subscribe to.
* @argument groupId - The id of the group to subscribe to.
*/
subscribeGroup<TData extends {
id: string;
}>(streamName: string, groupId: string, sortKey?: keyof TData): StreamGroupSubscription<TData>;
close(): void;
private onSocketClose;
private onSocketOpen;
messageListener(event: string): void;
private subscribe;
private join;
private leave;
private roomName;
}

View File

@@ -0,0 +1,56 @@
export type BaseMessage = {
streamName: string;
groupId: string;
id?: string;
timestamp: number;
};
export type JoinMessage = Omit<BaseMessage, 'timestamp'> & {
subscriptionId: string;
};
export type CustomEvent = {
type: string;
data: any;
};
export type StreamEvent<TData extends {
id: string;
}> = {
type: 'create';
data: TData;
} | {
type: 'update';
data: TData;
} | {
type: 'delete';
data: TData;
} | {
type: 'event';
event: CustomEvent;
};
export type ItemStreamEvent<TData extends {
id: string;
}> = StreamEvent<TData> | {
type: 'sync';
data: TData;
};
export type GroupStreamEvent<TData extends {
id: string;
}> = StreamEvent<TData> | {
type: 'sync';
data: TData[];
};
export type ItemEventMessage<TData extends {
id: string;
}> = BaseMessage & {
event: ItemStreamEvent<TData>;
};
export type GroupEventMessage<TData extends {
id: string;
}> = BaseMessage & {
event: GroupStreamEvent<TData>;
};
export type Message = {
type: 'join' | 'leave';
data: JoinMessage;
};
export type Listener<TData> = (state: TData | null) => void;
export type CustomEventListener<TData> = (event: TData) => void;