import Rx from "rx";
import _ from "lodash";
import ssgCrypto from "ssg.crypto";

import ClientServerRPC from "./client.server.rpc.js";

class ClientServerRPCObservables extends ClientServerRPC {
	constructor( ...params ) {
		super( ...params );

		this._remoteObservableMethods = Object.create( null );
		this._remoteMethods.onNext = this._remote_onNext;
		this._remoteMethods.onCompleted = this._remote_onCompleted;
		this._remoteMethods.onError = this._remote_onError;
		this._remoteMethods.disposeObservable = this._remote_disposeObservable;
		this._observableId2Subject = Object.create( null );
		this._servingObservableId2Subject = Object.create( null );
	}

	_processJsonMessage( json ) {
		let method = this._remoteObservableMethods[ json.name ];
		if ( !method ) {
			if ( json.name === "onNext" ) {
				//This method can be called multiple times with the same callId
				delete this._receivedCalls[ json.callId ];
			}
			return super._processJsonMessage( json );
		}
		//TODO: error handling
		let result = method.call( this, json.param, json );

		this._servingObservableId2Subject[ json.callId ] = (
			result
				.tapOnCompleted( () => {
					this._sendToClientAsync( {
						name: "onCompleted",
						callId: json.callId
					} );
				} )
				.map( item => ( {
					name: "onNext",
					param: { item },
					callId: json.callId
				} ) )
				.catch( error => Rx.Observable.just( {
					name: "onError",
					param: { error },
					callId: json.callId
				} ) )
				.flatMap( message => this._sendToClientAsync( message ) )
				.subscribe()
		);
		return Rx.Observable.just();
	}

	_remote_onNext( { item }, { callId } ) {
		if ( !this._observableId2Subject ) {
			return;
		}
		this._observableId2Subject[ callId ].observer.onNext( item );
	}

	_remote_onCompleted( param, { callId } ) {
		if ( !this._observableId2Subject ) {
			return;
		}
		this._observableId2Subject[ callId ].observer.onCompleted();
		delete this._observableId2Subject[ callId ];
	}

	_remote_onError( { error }, { callId } ) {
		if ( !this._observableId2Subject ) {
			return;
		}
		this._observableId2Subject[ callId ].observer.onError( error );
	}

	_remote_disposeObservable( param, { callId } ) {
		if ( !this._servingObservableId2Subject[ callId ] ) {
			return;
		}
		this._servingObservableId2Subject[ callId ].dispose();
		delete this._servingObservableId2Subject[ callId ];
	}

	_retryPendingRemoteCalls() {
		Rx.Observable.fromArray( _.values( this._observableId2Subject ) )
			.concatMap( ( { message } ) =>
				this._sendToClientAsync( message )
			)
			.subscribe();
		super._retryPendingRemoteCalls();
	}

	_callRemoteObservable( name, param ) {
		let respSubj = new Rx.ReplaySubject();
		return (
			Rx.Observable.fromPromise( ssgCrypto.createRandomBase64StringThen( 32 ) )
				.flatMap( callId => {
					let message = {
						name, param, callId
					};
					if ( !this._observableId2Subject ) {
						return Rx.Observable.empty();
					}
					return Rx.Observable.combineLatest(
						Rx.Observable.create( observer => {
							if ( this._observableId2Subject[ callId ] ) {
								throw new Error( "Double subscription on remote observables are not allowed" );
							}
							this._observableId2Subject[ callId ] = { observer, message };
							return () => this._disposeRemote( callId );
						} ),
						this._sendToClientAsync( message ),
						resultItem => resultItem
					);
				} )
		);
	}

	_disposeRemote( callId ) {
		this._sendToClientAsync( {
			name: "disposeObservable",
			callId
		} ).subscribe();
	}

	dispose( ) {
		for( let callId in this._observableId2Subject ) {
			this._observableId2Subject[ callId ].observer.dispose();
			delete this._observableId2Subject[ callId ];
		}
		delete this._observableId2Subject;
		super.dispose();
	}
}

export default ClientServerRPCObservables;
