Update gitignore (sorry)

This commit is contained in:
olcxja 2026-05-10 14:02:17 +02:00
commit cca8b02fea
6604 changed files with 1219661 additions and 4 deletions

View file

@ -0,0 +1,397 @@
import { Subject, AnonymousSubject } from '../../Subject';
import { Subscriber } from '../../Subscriber';
import { Observable } from '../../Observable';
import { Subscription } from '../../Subscription';
import { Operator } from '../../Operator';
import { ReplaySubject } from '../../ReplaySubject';
import { Observer, NextObserver } from '../../types';
/**
* WebSocketSubjectConfig is a plain Object that allows us to make our
* webSocket configurable.
*
* <span class="informal">Provides flexibility to {@link webSocket}</span>
*
* It defines a set of properties to provide custom behavior in specific
* moments of the socket's lifecycle. When the connection opens we can
* use `openObserver`, when the connection is closed `closeObserver`, if we
* are interested in listening for data coming from server: `deserializer`,
* which allows us to customize the deserialization strategy of data before passing it
* to the socket client. By default, `deserializer` is going to apply `JSON.parse` to each message coming
* from the Server.
*
* ## Examples
*
* **deserializer**, the default for this property is `JSON.parse` but since there are just two options
* for incoming data, either be text or binary data. We can apply a custom deserialization strategy
* or just simply skip the default behaviour.
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* //Apply any transformation of your choice.
* deserializer: ({ data }) => data
* });
*
* wsSubject.subscribe(console.log);
*
* // Let's suppose we have this on the Server: ws.send('This is a msg from the server')
* //output
* //
* // This is a msg from the server
* ```
*
* **serializer** allows us to apply custom serialization strategy but for the outgoing messages.
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* // Apply any transformation of your choice.
* serializer: msg => JSON.stringify({ channel: 'webDevelopment', msg: msg })
* });
*
* wsSubject.subscribe(() => subject.next('msg to the server'));
*
* // Let's suppose we have this on the Server:
* // ws.on('message', msg => console.log);
* // ws.send('This is a msg from the server');
* // output at server side:
* //
* // {"channel":"webDevelopment","msg":"msg to the server"}
* ```
*
* **closeObserver** allows us to set a custom error when an error raises up.
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* closeObserver: {
* next() {
* const customError = { code: 6666, reason: 'Custom evil reason' }
* console.log(`code: ${ customError.code }, reason: ${ customError.reason }`);
* }
* }
* });
*
* // output
* // code: 6666, reason: Custom evil reason
* ```
*
* **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
* webSocket or sending notification that the connection was successful, this is when
* openObserver is useful for.
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const wsSubject = webSocket({
* url: 'ws://localhost:8081',
* openObserver: {
* next: () => {
* console.log('Connection ok');
* }
* }
* });
*
* // output
* // Connection ok
* ```
*/
export interface WebSocketSubjectConfig<T> {
/** The url of the socket server to connect to */
url: string;
/** The protocol to use to connect */
protocol?: string | Array<string>;
/** @deprecated Will be removed in v8. Use {@link deserializer} instead. */
resultSelector?: (e: MessageEvent) => T;
/**
* A serializer used to create messages from passed values before the
* messages are sent to the server. Defaults to JSON.stringify.
*/
serializer?: (value: T) => WebSocketMessage;
/**
* A deserializer used for messages arriving on the socket from the
* server. Defaults to JSON.parse.
*/
deserializer?: (e: MessageEvent) => T;
/**
* An Observer that watches when open events occur on the underlying web socket.
*/
openObserver?: NextObserver<Event>;
/**
* An Observer that watches when close events occur on the underlying web socket
*/
closeObserver?: NextObserver<CloseEvent>;
/**
* An Observer that watches when a close is about to occur due to
* unsubscription.
*/
closingObserver?: NextObserver<void>;
/**
* A WebSocket constructor to use. This is useful for situations like using a
* WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
* for testing purposes
*/
WebSocketCtor?: { new (url: string, protocols?: string | string[]): WebSocket };
/** Sets the `binaryType` property of the underlying WebSocket. */
binaryType?: 'blob' | 'arraybuffer';
}
const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
url: '',
deserializer: (e: MessageEvent) => JSON.parse(e.data),
serializer: (value: any) => JSON.stringify(value),
};
const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
export class WebSocketSubject<T> extends AnonymousSubject<T> {
// @ts-ignore: Property has no initializer and is not definitely assigned
private _config: WebSocketSubjectConfig<T>;
/** @internal */
// @ts-ignore: Property has no initializer and is not definitely assigned
_output: Subject<T>;
private _socket: WebSocket | null = null;
constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
super();
if (urlConfigOrSource instanceof Observable) {
this.destination = destination;
this.source = urlConfigOrSource as Observable<T>;
} else {
const config = (this._config = { ...DEFAULT_WEBSOCKET_CONFIG });
this._output = new Subject<T>();
if (typeof urlConfigOrSource === 'string') {
config.url = urlConfigOrSource;
} else {
for (const key in urlConfigOrSource) {
if (urlConfigOrSource.hasOwnProperty(key)) {
(config as any)[key] = (urlConfigOrSource as any)[key];
}
}
}
if (!config.WebSocketCtor && WebSocket) {
config.WebSocketCtor = WebSocket;
} else if (!config.WebSocketCtor) {
throw new Error('no WebSocket constructor can be found');
}
this.destination = new ReplaySubject();
}
}
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, this.destination as any);
sock.operator = operator;
sock.source = this;
return sock;
}
private _resetState() {
this._socket = null;
if (!this.source) {
this.destination = new ReplaySubject();
}
this._output = new Subject<T>();
}
/**
* Creates an {@link Observable}, that when subscribed to, sends a message,
* defined by the `subMsg` function, to the server over the socket to begin a
* subscription to data over that socket. Once data arrives, the
* `messageFilter` argument will be used to select the appropriate data for
* the resulting Observable. When finalization occurs, either due to
* unsubscription, completion, or error, a message defined by the `unsubMsg`
* argument will be sent to the server over the WebSocketSubject.
*
* @param subMsg A function to generate the subscription message to be sent to
* the server. This will still be processed by the serializer in the
* WebSocketSubject's config. (Which defaults to JSON serialization)
* @param unsubMsg A function to generate the unsubscription message to be
* sent to the server at finalization. This will still be processed by the
* serializer in the WebSocketSubject's config.
* @param messageFilter A predicate for selecting the appropriate messages
* from the server for the output stream.
*/
multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
const self = this;
return new Observable((observer: Observer<T>) => {
try {
self.next(subMsg());
} catch (err) {
observer.error(err);
}
const subscription = self.subscribe({
next: (x) => {
try {
if (messageFilter(x)) {
observer.next(x);
}
} catch (err) {
observer.error(err);
}
},
error: (err) => observer.error(err),
complete: () => observer.complete(),
});
return () => {
try {
self.next(unsubMsg());
} catch (err) {
observer.error(err);
}
subscription.unsubscribe();
};
});
}
private _connectSocket() {
const { WebSocketCtor, protocol, url, binaryType } = this._config;
const observer = this._output;
let socket: WebSocket | null = null;
try {
socket = protocol ? new WebSocketCtor!(url, protocol) : new WebSocketCtor!(url);
this._socket = socket;
if (binaryType) {
this._socket.binaryType = binaryType;
}
} catch (e) {
observer.error(e);
return;
}
const subscription = new Subscription(() => {
this._socket = null;
if (socket && socket.readyState === 1) {
socket.close();
}
});
socket.onopen = (evt: Event) => {
const { _socket } = this;
if (!_socket) {
socket!.close();
this._resetState();
return;
}
const { openObserver } = this._config;
if (openObserver) {
openObserver.next(evt);
}
const queue = this.destination;
this.destination = Subscriber.create<T>(
(x) => {
if (socket!.readyState === 1) {
try {
const { serializer } = this._config;
socket!.send(serializer!(x!));
} catch (e) {
this.destination!.error(e);
}
}
},
(err) => {
const { closingObserver } = this._config;
if (closingObserver) {
closingObserver.next(undefined);
}
if (err && err.code) {
socket!.close(err.code, err.reason);
} else {
observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
}
this._resetState();
},
() => {
const { closingObserver } = this._config;
if (closingObserver) {
closingObserver.next(undefined);
}
socket!.close();
this._resetState();
}
) as Subscriber<any>;
if (queue && queue instanceof ReplaySubject) {
subscription.add((queue as ReplaySubject<T>).subscribe(this.destination));
}
};
socket.onerror = (e: Event) => {
this._resetState();
observer.error(e);
};
socket.onclose = (e: CloseEvent) => {
if (socket === this._socket) {
this._resetState();
}
const { closeObserver } = this._config;
if (closeObserver) {
closeObserver.next(e);
}
if (e.wasClean) {
observer.complete();
} else {
observer.error(e);
}
};
socket.onmessage = (e: MessageEvent) => {
try {
const { deserializer } = this._config;
observer.next(deserializer!(e));
} catch (err) {
observer.error(err);
}
};
}
/** @internal */
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const { source } = this;
if (source) {
return source.subscribe(subscriber);
}
if (!this._socket) {
this._connectSocket();
}
this._output.subscribe(subscriber);
subscriber.add(() => {
const { _socket } = this;
if (this._output.observers.length === 0) {
if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
_socket.close();
}
this._resetState();
}
});
return subscriber;
}
unsubscribe() {
const { _socket } = this;
if (_socket && (_socket.readyState === 1 || _socket.readyState === 0)) {
_socket.close();
}
this._resetState();
super.unsubscribe();
}
}

View file

@ -0,0 +1,132 @@
import { Observable } from '../../Observable';
import { TimestampProvider } from '../../types';
import { performanceTimestampProvider } from '../../scheduler/performanceTimestampProvider';
import { animationFrameProvider } from '../../scheduler/animationFrameProvider';
/**
* An observable of animation frames
*
* Emits the amount of time elapsed since subscription and the timestamp on each animation frame.
* Defaults to milliseconds provided to the requestAnimationFrame's callback. Does not end on its own.
*
* Every subscription will start a separate animation loop. Since animation frames are always scheduled
* by the browser to occur directly before a repaint, scheduling more than one animation frame synchronously
* should not be much different or have more overhead than looping over an array of events during
* a single animation frame. However, if for some reason the developer would like to ensure the
* execution of animation-related handlers are all executed during the same task by the engine,
* the `share` operator can be used.
*
* This is useful for setting up animations with RxJS.
*
* ## Examples
*
* Tweening a div to move it on the screen
*
* ```ts
* import { animationFrames, map, takeWhile, endWith } from 'rxjs';
*
* function tween(start: number, end: number, duration: number) {
* const diff = end - start;
* return animationFrames().pipe(
* // Figure out what percentage of time has passed
* map(({ elapsed }) => elapsed / duration),
* // Take the vector while less than 100%
* takeWhile(v => v < 1),
* // Finish with 100%
* endWith(1),
* // Calculate the distance traveled between start and end
* map(v => v * diff + start)
* );
* }
*
* // Setup a div for us to move around
* const div = document.createElement('div');
* document.body.appendChild(div);
* div.style.position = 'absolute';
* div.style.width = '40px';
* div.style.height = '40px';
* div.style.backgroundColor = 'lime';
* div.style.transform = 'translate3d(10px, 0, 0)';
*
* tween(10, 200, 4000).subscribe(x => {
* div.style.transform = `translate3d(${ x }px, 0, 0)`;
* });
* ```
*
* Providing a custom timestamp provider
*
* ```ts
* import { animationFrames, TimestampProvider } from 'rxjs';
*
* // A custom timestamp provider
* let now = 0;
* const customTSProvider: TimestampProvider = {
* now() { return now++; }
* };
*
* const source$ = animationFrames(customTSProvider);
*
* // Log increasing numbers 0...1...2... on every animation frame.
* source$.subscribe(({ elapsed }) => console.log(elapsed));
* ```
*
* @param timestampProvider An object with a `now` method that provides a numeric timestamp
*/
export function animationFrames(timestampProvider?: TimestampProvider) {
return timestampProvider ? animationFramesFactory(timestampProvider) : DEFAULT_ANIMATION_FRAMES;
}
/**
* Does the work of creating the observable for `animationFrames`.
* @param timestampProvider The timestamp provider to use to create the observable
*/
function animationFramesFactory(timestampProvider?: TimestampProvider) {
return new Observable<{ timestamp: number; elapsed: number }>((subscriber) => {
// If no timestamp provider is specified, use performance.now() - as it
// will return timestamps 'compatible' with those passed to the run
// callback and won't be affected by NTP adjustments, etc.
const provider = timestampProvider || performanceTimestampProvider;
// Capture the start time upon subscription, as the run callback can remain
// queued for a considerable period of time and the elapsed time should
// represent the time elapsed since subscription - not the time since the
// first rendered animation frame.
const start = provider.now();
let id = 0;
const run = () => {
if (!subscriber.closed) {
id = animationFrameProvider.requestAnimationFrame((timestamp: DOMHighResTimeStamp | number) => {
id = 0;
// Use the provider's timestamp to calculate the elapsed time. Note that
// this means - if the caller hasn't passed a provider - that
// performance.now() will be used instead of the timestamp that was
// passed to the run callback. The reason for this is that the timestamp
// passed to the callback can be earlier than the start time, as it
// represents the time at which the browser decided it would render any
// queued frames - and that time can be earlier the captured start time.
const now = provider.now();
subscriber.next({
timestamp: timestampProvider ? now : timestamp,
elapsed: now - start,
});
run();
});
}
};
run();
return () => {
if (id) {
animationFrameProvider.cancelAnimationFrame(id);
}
};
});
}
/**
* In the common case, where the timestamp provided by the rAF API is used,
* we use this shared observable to reduce overhead.
*/
const DEFAULT_ANIMATION_FRAMES = animationFramesFactory();

View file

@ -0,0 +1,180 @@
import { createOperatorSubscriber } from '../../operators/OperatorSubscriber';
import { Observable } from '../../Observable';
import { innerFrom } from '../../observable/innerFrom';
import { ObservableInput } from '../../types';
export function fromFetch<T>(
input: string | Request,
init: RequestInit & {
selector: (response: Response) => ObservableInput<T>;
}
): Observable<T>;
export function fromFetch(input: string | Request, init?: RequestInit): Observable<Response>;
/**
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
* make an HTTP request.
*
* **WARNING** Parts of the fetch API are still experimental. `AbortController` is
* required for this implementation to work and use cancellation appropriately.
*
* Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController)
* in order to finalize the internal `fetch` when the subscription tears down.
*
* If a `signal` is provided via the `init` argument, it will behave like it usually does with
* `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with
* in that scenario will be emitted as an error from the observable.
*
* ## Examples
*
* Basic use
*
* ```ts
* import { fromFetch } from 'rxjs/fetch';
* import { switchMap, of, catchError } from 'rxjs';
*
* const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe(
* switchMap(response => {
* if (response.ok) {
* // OK return data
* return response.json();
* } else {
* // Server is returning a status requiring the client to try something else.
* return of({ error: true, message: `Error ${ response.status }` });
* }
* }),
* catchError(err => {
* // Network or other error, handle appropriately
* console.error(err);
* return of({ error: true, message: err.message })
* })
* );
*
* data$.subscribe({
* next: result => console.log(result),
* complete: () => console.log('done')
* });
* ```
*
* ### Use with Chunked Transfer Encoding
*
* With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
* the promise returned by `fetch` will resolve as soon as the response's headers are
* received.
*
* That means the `fromFetch` observable will emit a `Response` - and will
* then complete - before the body is received. When one of the methods on the
* `Response` - like `text()` or `json()` - is called, the returned promise will not
* resolve until the entire body has been received. Unsubscribing from any observable
* that uses the promise as an observable input will not abort the request.
*
* To facilitate aborting the retrieval of responses that use chunked transfer encoding,
* a `selector` can be specified via the `init` parameter:
*
* ```ts
* import { of } from 'rxjs';
* import { fromFetch } from 'rxjs/fetch';
*
* const data$ = fromFetch('https://api.github.com/users?per_page=5', {
* selector: response => response.json()
* });
*
* data$.subscribe({
* next: result => console.log(result),
* complete: () => console.log('done')
* });
* ```
*
* @param input The resource you would like to fetch. Can be a url or a request object.
* @param initWithSelector A configuration object for the fetch.
* [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters)
* @returns An Observable, that when subscribed to, performs an HTTP request using the native `fetch`
* function. The {@link Subscription} is tied to an `AbortController` for the fetch.
*/
export function fromFetch<T>(
input: string | Request,
initWithSelector: RequestInit & {
selector?: (response: Response) => ObservableInput<T>;
} = {}
): Observable<Response | T> {
const { selector, ...init } = initWithSelector;
return new Observable<Response | T>((subscriber) => {
// Our controller for aborting this fetch.
// Any externally provided AbortSignal will have to call
// abort on this controller when signaled, because the
// signal from this controller is what is being passed to `fetch`.
const controller = new AbortController();
const { signal } = controller;
// This flag exists to make sure we don't `abort()` the fetch upon tearing down
// this observable after emitting a Response. Aborting in such circumstances
// would also abort subsequent methods - like `json()` - that could be called
// on the Response. Consider: `fromFetch().pipe(take(1), mergeMap(res => res.json()))`
let abortable = true;
// If the user provided an init configuration object,
// let's process it and chain our abort signals, if necessary.
// If a signal is provided, just have it finalized. It's a cancellation token, basically.
const { signal: outerSignal } = init;
if (outerSignal) {
if (outerSignal.aborted) {
controller.abort();
} else {
// We got an AbortSignal from the arguments passed into `fromFetch`.
// We need to wire up our AbortController to abort when this signal aborts.
const outerSignalHandler = () => {
if (!signal.aborted) {
controller.abort();
}
};
outerSignal.addEventListener('abort', outerSignalHandler);
subscriber.add(() => outerSignal.removeEventListener('abort', outerSignalHandler));
}
}
// The initialization object passed to `fetch` as the second
// argument. This ferries in important information, including our
// AbortSignal. Create a new init, so we don't accidentally mutate the
// passed init, or reassign it. This is because the init passed in
// is shared between each subscription to the result.
const perSubscriberInit: RequestInit = { ...init, signal };
const handleError = (err: any) => {
abortable = false;
subscriber.error(err);
};
fetch(input, perSubscriberInit)
.then((response) => {
if (selector) {
// If we have a selector function, use it to project our response.
// Note that any error that comes from our selector will be
// sent to the promise `catch` below and handled.
innerFrom(selector(response)).subscribe(
createOperatorSubscriber(
subscriber,
// Values are passed through to the subscriber
undefined,
// The projected response is complete.
() => {
abortable = false;
subscriber.complete();
},
handleError
)
);
} else {
abortable = false;
subscriber.next(response);
subscriber.complete();
}
})
.catch(handleError);
return () => {
if (abortable) {
controller.abort();
}
};
});
}

View file

@ -0,0 +1,161 @@
import { WebSocketSubject, WebSocketSubjectConfig } from './WebSocketSubject';
/**
* Wrapper around the w3c-compatible WebSocket object provided by the browser.
*
* <span class="informal">{@link Subject} that communicates with a server via WebSocket</span>
*
* `webSocket` is a factory function that produces a `WebSocketSubject`,
* which can be used to make WebSocket connection with an arbitrary endpoint.
* `webSocket` accepts as an argument either a string with url of WebSocket endpoint, or an
* {@link WebSocketSubjectConfig} object for providing additional configuration, as
* well as Observers for tracking lifecycle of WebSocket connection.
*
* When `WebSocketSubject` is subscribed, it attempts to make a socket connection,
* unless there is one made already. This means that many subscribers will always listen
* on the same socket, thus saving resources. If however, two instances are made of `WebSocketSubject`,
* even if these two were provided with the same url, they will attempt to make separate
* connections. When consumer of a `WebSocketSubject` unsubscribes, socket connection is closed,
* only if there are no more subscribers still listening. If after some time a consumer starts
* subscribing again, connection is reestablished.
*
* Once connection is made, whenever a new message comes from the server, `WebSocketSubject` will emit that
* message as a value in the stream. By default, a message from the socket is parsed via `JSON.parse`. If you
* want to customize how deserialization is handled (if at all), you can provide custom `resultSelector`
* function in {@link WebSocketSubject}. When connection closes, stream will complete, provided it happened without
* any errors. If at any point (starting, maintaining or closing a connection) there is an error,
* stream will also error with whatever WebSocket API has thrown.
*
* By virtue of being a {@link Subject}, `WebSocketSubject` allows for receiving and sending messages from the server. In order
* to communicate with a connected endpoint, use `next`, `error` and `complete` methods. `next` sends a value to the server, so bear in mind
* that this value will not be serialized beforehand. Because of This, `JSON.stringify` will have to be called on a value by hand,
* before calling `next` with a result. Note also that if at the moment of nexting value
* there is no socket connection (for example no one is subscribing), those values will be buffered, and sent when connection
* is finally established. `complete` method closes socket connection. `error` does the same,
* as well as notifying the server that something went wrong via status code and string with details of what happened.
* Since status code is required in WebSocket API, `WebSocketSubject` does not allow, like regular `Subject`,
* arbitrary values being passed to the `error` method. It needs to be called with an object that has `code`
* property with status code number and optional `reason` property with string describing details
* of an error.
*
* Calling `next` does not affect subscribers of `WebSocketSubject` - they have no
* information that something was sent to the server (unless of course the server
* responds somehow to a message). On the other hand, since calling `complete` triggers
* an attempt to close socket connection. If that connection is closed without any errors, stream will
* complete, thus notifying all subscribers. And since calling `error` closes
* socket connection as well, just with a different status code for the server, if closing itself proceeds
* without errors, subscribed Observable will not error, as one might expect, but complete as usual. In both cases
* (calling `complete` or `error`), if process of closing socket connection results in some errors, *then* stream
* will error.
*
* **Multiplexing**
*
* `WebSocketSubject` has an additional operator, not found in other Subjects. It is called `multiplex` and it is
* used to simulate opening several socket connections, while in reality maintaining only one.
* For example, an application has both chat panel and real-time notifications about sport news. Since these are two distinct functions,
* it would make sense to have two separate connections for each. Perhaps there could even be two separate services with WebSocket
* endpoints, running on separate machines with only GUI combining them together. Having a socket connection
* for each functionality could become too resource expensive. It is a common pattern to have single
* WebSocket endpoint that acts as a gateway for the other services (in this case chat and sport news services).
* Even though there is a single connection in a client app, having the ability to manipulate streams as if it
* were two separate sockets is desirable. This eliminates manually registering and unregistering in a gateway for
* given service and filter out messages of interest. This is exactly what `multiplex` method is for.
*
* Method accepts three parameters. First two are functions returning subscription and unsubscription messages
* respectively. These are messages that will be sent to the server, whenever consumer of resulting Observable
* subscribes and unsubscribes. Server can use them to verify that some kind of messages should start or stop
* being forwarded to the client. In case of the above example application, after getting subscription message with proper identifier,
* gateway server can decide that it should connect to real sport news service and start forwarding messages from it.
* Note that both messages will be sent as returned by the functions, they are by default serialized using JSON.stringify, just
* as messages pushed via `next`. Also bear in mind that these messages will be sent on *every* subscription and
* unsubscription. This is potentially dangerous, because one consumer of an Observable may unsubscribe and the server
* might stop sending messages, since it got unsubscription message. This needs to be handled
* on the server or using {@link publish} on a Observable returned from 'multiplex'.
*
* Last argument to `multiplex` is a `messageFilter` function which should return a boolean. It is used to filter out messages
* sent by the server to only those that belong to simulated WebSocket stream. For example, server might mark these
* messages with some kind of string identifier on a message object and `messageFilter` would return `true`
* if there is such identifier on an object emitted by the socket. Messages which returns `false` in `messageFilter` are simply skipped,
* and are not passed down the stream.
*
* Return value of `multiplex` is an Observable with messages incoming from emulated socket connection. Note that this
* is not a `WebSocketSubject`, so calling `next` or `multiplex` again will fail. For pushing values to the
* server, use root `WebSocketSubject`.
*
* ## Examples
*
* Listening for messages from the server
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const subject = webSocket('ws://localhost:8081');
*
* subject.subscribe({
* next: msg => console.log('message received: ' + msg), // Called whenever there is a message from the server.
* error: err => console.log(err), // Called if at any point WebSocket API signals some kind of error.
* complete: () => console.log('complete') // Called when connection is closed (for whatever reason).
* });
* ```
*
* Pushing messages to the server
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const subject = webSocket('ws://localhost:8081');
*
* subject.subscribe();
* // Note that at least one consumer has to subscribe to the created subject - otherwise "nexted" values will be just buffered and not sent,
* // since no connection was established!
*
* subject.next({ message: 'some message' });
* // This will send a message to the server once a connection is made. Remember value is serialized with JSON.stringify by default!
*
* subject.complete(); // Closes the connection.
*
* subject.error({ code: 4000, reason: 'I think our app just broke!' });
* // Also closes the connection, but let's the server know that this closing is caused by some error.
* ```
*
* Multiplexing WebSocket
*
* ```ts
* import { webSocket } from 'rxjs/webSocket';
*
* const subject = webSocket('ws://localhost:8081');
*
* const observableA = subject.multiplex(
* () => ({ subscribe: 'A' }), // When server gets this message, it will start sending messages for 'A'...
* () => ({ unsubscribe: 'A' }), // ...and when gets this one, it will stop.
* message => message.type === 'A' // If the function returns `true` message is passed down the stream. Skipped if the function returns false.
* );
*
* const observableB = subject.multiplex( // And the same goes for 'B'.
* () => ({ subscribe: 'B' }),
* () => ({ unsubscribe: 'B' }),
* message => message.type === 'B'
* );
*
* const subA = observableA.subscribe(messageForA => console.log(messageForA));
* // At this moment WebSocket connection is established. Server gets '{"subscribe": "A"}' message and starts sending messages for 'A',
* // which we log here.
*
* const subB = observableB.subscribe(messageForB => console.log(messageForB));
* // Since we already have a connection, we just send '{"subscribe": "B"}' message to the server. It starts sending messages for 'B',
* // which we log here.
*
* subB.unsubscribe();
* // Message '{"unsubscribe": "B"}' is sent to the server, which stops sending 'B' messages.
*
* subA.unsubscribe();
* // Message '{"unsubscribe": "A"}' makes the server stop sending messages for 'A'. Since there is no more subscribers to root Subject,
* // socket connection closes.
* ```
*
* @param urlConfigOrSource The WebSocket endpoint as an url or an object with configuration and additional Observers.
* @return Subject which allows to both send and receive messages via WebSocket connection.
*/
export function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T> {
return new WebSocketSubject<T>(urlConfigOrSource);
}