import Rx from "rx";
import _ from "lodash";
import ssgCrypto, {Key, KEY_KINDS} from "ssg.crypto";

import ClientServerJSONMeta from "./client.server.json.meta.js";

function key2StringAsync( key ) {
	let resSubj = new Rx.ReplaySubject();
	key.postponeManagedBuffer( mb => mb.useAsBuffer( b => {
		resSubj.onNext( b.toString( "base64" ) );
		resSubj.onCompleted();
	} ) );

	return resSubj;
}

class MetaConnection {
	constructor( apiUrlBase, connectionId, pid, seedKey, signer, creatorPid,
		creatorPublicKey, econfig, fullState ) {
		if ( !econfig ) {
			throw new Error( "econfig parameter required" );
		}

		if ( !( seedKey instanceof Key ) ) {
			throw new Error( "Invalid key type" );
		}

		if ( !fullState ) {
			fullState = {
				nextIndex: 0,
				ps: Object.create( null ),
				invites: Object.create( null ),
				private: Object.create( null ),
				meta: {}
			};
		}

		this._pid = pid;
		this._fullState = fullState;

		this._connection = new ClientServerJSONMeta(
			apiUrlBase,
			connectionId,
			pid,
			fullState.nextIndex,
			signer,
			econfig
		);
		this._indexSubj = (
			fullState.nextIndex
			? new Rx.BehaviorSubject( fullState.nextIndex - 1 )
			: new Rx.ReplaySubject( 1 )
		);
		this._exitSubj = new Rx.ReplaySubject( 1 );
		if ( fullState.ps[ this._pid ] && fullState.ps[ this._pid ].isExited ) {
			this._exitSubj.onNext();
			this._exitSubj.onCompleted();
		}
		this._metaSubj = new Rx.BehaviorSubject( fullState.meta );
		this._privateDataSubject = new Rx.BehaviorSubject( fullState.private );

		this._connection.name = "MetaConnection";
		this._econfig = econfig;
		this._connection.onJsonMessage( this._gotMessage.bind( this ) );

		ssgCrypto.createKeyFromBufferThen( new Buffer( 0 ), KEY_KINDS.INTERMEDIATE, econfig )
			.then( emptyKey =>
				ssgCrypto.createDerivedKeyFromKeysThen( seedKey, emptyKey, KEY_KINDS.SYMMETRIC_ENCRYPTION, econfig )
					.then( encryptionKey => {
						emptyKey.dispose();
						return encryptionKey;
					} )
			)
			.then( encryptionKey => {
				this._connection.setEncryptionKey( encryptionKey );
			} );
		this._creatorPid = creatorPid;
		this._creatorPublicKey = creatorPublicKey;
		this._creatorPublicKeyHandle = creatorPublicKey.addKeyUse( "MetaConnection creator pub" );

		this._gotMessageBodies = [];
		this._invitesSubject = new Rx.BehaviorSubject( fullState.invites );
		this._seedKey = seedKey;
		this._seedKeyHandle = seedKey.addKeyUse( "MetaConnection seedKey" );
		this._pid = pid;
		this._decryptSubject = new Rx.Subject();
		this._decryptSubject.concatMap( func => Rx.Observable.defer( func ) ).subscribe();
		this._participantsSubject = new Rx.BehaviorSubject( fullState.ps );
		this._errorsSubj = new Rx.ReplaySubject();
		this._addInitialParticipants( creatorPublicKey, creatorPid, fullState );

		this._latestSubj = new Rx.ReplaySubject();
		this._latestThen = new Promise( ( resolve, reject ) => {
			this._latestSubj
				.flatMap( maxIndex => {
					return (
						this._indexSubj
							.filter( index => index >= maxIndex )
							.first( { defaultValue: -1 } )
					);
				} ) // Wait until latest message on the moment of listen arrive
				.subscribe( index => {
					resolve();
				}, reject );
		} )
	}

	_onMaxIndex( maxIndex ) {
		if ( this.isGotMaxIndex ) {
			return;
		}
		this.isGotMaxIndex = true;
		this._latestSubj.onNext( maxIndex );
		this._latestSubj.onCompleted();
	}

	_addInitialParticipants( creatorPublicKey, creatorPid, fullState ) {
		ssgCrypto.createVerifierThen( creatorPublicKey, this._econfig )
			.then( verifier => {
				this._connection.addParticipant( creatorPid, verifier );
			} );
		for ( let pid in fullState.invites ) {
			let { tmpPublicKey, used } = fullState.invites[ pid ];
			// if ( used ) {
			// 	continue;
			// }
			ssgCrypto.createVerifierThen( tmpPublicKey, this._econfig )
				.then( verifier => {
					if ( this._isDisposed ) {
						return;
					}
					this._connection.addParticipant( pid, verifier )
				} );
		}

		for ( let pid in fullState.ps ) {
			if ( pid === creatorPid ) {
				continue;
			}
			let { publicKey } = fullState.ps[ pid ];
			ssgCrypto.createVerifierThen( publicKey, this._econfig )
				.then( verifier => {
					if ( this._isDisposed ) {
						return;
					}
					this._connection.addParticipant( pid, verifier )
				} );
		}
	}

	observeErrors( ) {
		return this._errorsSubj;
	}

	onConnected( func ) {
		this._connection.onConnected( func );
	}

	tryListen( cb ) {
		this._connection.tryListen( result => {
			if ( result && ( "maxIndex" in result ) ) {
				this._onMaxIndex( result.maxIndex );
			}
			cb( result );
		} );
	}

	tryStopListen( cb ) {
		this._connection.tryStopListen( cb );
	}

	observeInvites( ) {
		return this._invitesSubject.map( invites => _.reduce( invites, ( acc, invite, tmpPid ) => {
			if ( !invite.used ) {
				acc[ tmpPid ] = invite;
			}
			return acc;
		}, Object.create( null ) ) );
	}

	observeParticipants( startIndex ) {
		if ( startIndex === undefined ) {
			return this._participantsSubject;
		}
		return Rx.Observable.combineLatest(
			this._participantsSubject,
			this._indexSubj
				.filter( index => index >= startIndex )
				.first( { defaultValue: -1 } ),
			ps => ps
		);
	}

	observeParticipantsLatest( ) {
		return this._latestSubj.flatMap( () => this._participantsSubject );
	}

	waitParticipantsLatestThen() {
		return this._latestThen;
	}

	observeInvitesLatest( ) {
		return this._latestSubj.flatMap( () => this.observeInvites() );
	}

	getMaxIndexAsync( ) {
		return this._latestSubj;
	}

	observePrivateDataLatest( ) {
		return this._latestSubj.flatMap( () => this._privateDataSubject );
	}

	checkIndexIsLatest( transaction ) {
		this._connection.check(
			transaction,
			this._fullState.nextIndex
		);
	}

	sendCreateMessage( content, transaction ) {
		if ( content.publicKey ) {
			throw new Error( "Public key must not be in a content" );
		}

		if ( content.pid ) {
			throw new Error( "Pid key must not be in a content" );
		}

		if ( !( content.dhPubKey instanceof Key ) ) {
			throw new Error( "DH public key not found in a content" );
		}

		let publicKeyHandle = this._creatorPublicKey.addKeyUse( "send create meta message" );
		let message = {
			type: "create",
			content: _.clone( content )
		};
		message.content.pid = this._creatorPid;
		message.content.publicKey = this._creatorPublicKey;
		this._sendMessage( message, transaction, {
			"content.publicKey": KEY_KINDS.SIGNATURE_PUBLIC,
			"content.dhPubKey": KEY_KINDS.KEY_EXCHANGE_PUBLIC,
		} );
	}

	sendInviteMessage( content, transaction ) {
		if ( ( typeof content.nickname !== "string" ) && ( content.alias !== true ) ) {
			throw new Error( "nickname or alias requred" );
		}
		this._sendMessage( {
			type: "invite",
			content
		}, transaction, {
			"content.tmpPublicKey": KEY_KINDS.SIGNATURE_PUBLIC
		} );
	}

	sendPrivateDataMessage( content, transaction ) {
		this._sendMessage( {
			type: "privatedata",
			content
		}, transaction );
	}

	sendAcceptMessage( content, transaction ) {
		let invite = this._invitesSubject.getValue()[ this._pid ];
		if ( !invite || invite.used ) {
			return false;
		}
		this._sendMessage( {
			type: "accept",
			content
		}, transaction, {
			"content.publicKey": KEY_KINDS.SIGNATURE_PUBLIC,
			"content.dhPubKey": KEY_KINDS.KEY_EXCHANGE_PUBLIC
		} );
		return true;
	}

	sendRenameMessage( content, transaction ) {
		this._sendMessage( {
			type: "rename",
			content
		}, transaction );
	}

	sendUpdateMessage( content, transaction ) {
		this._sendMessage( {
			type: "update",
			content
		}, transaction );
	}

	sendRemoveMessage( content, transaction ) {
		this._sendMessage( {
			type: "remove",
			content
		}, transaction );
	}

	sendRemoveInviteMessage( content, transaction ) {
		this._sendMessage( {
			type: "removeinvite",
			content
		}, transaction );
	}

	_sendMessage( json, transaction, keyMap ) {
		this._connection.send( json, transaction, keyMap );
	}

	_gotMessage( from, json, index ) {
		return this._onJsonMessage( json, from, index );
	}

	_onJsonMessage( json, from, index ) {
		let interval = setInterval( () => {
			console.warn( "Meta process too long", json, from, index );
		}, 2000 );
		return (
			this._processMessageAsync( json, from, index )
				.concat( Rx.Observable.defer( () => {
					this._fullState.nextIndex = index + 1;
					if ( !this._fullStateChangeHook ) {
						return Rx.Observable.empty();
					}
					return (
						this._fullStateChangeHook( this._fullState )
							.flatMap( () => Rx.Observable.empty() )
					);
				} ) )
				.tapOnCompleted( () => {
					clearInterval( interval );
					this._connection.setProcessedMessage( index );
					this._indexSubj.onNext( index );
				} )
				.catch( error => {
					clearInterval( interval );
					console.error( error );
					this._errorsSubj.onNext( "Got invalid meta connection message" );
					return Rx.Observable.empty();
				} )
		);
	}

	_onNewPrivateData( pid, privateData, oldPid ) {
		let newPrivate = _.assign(
			{ [pid]: privateData },
			this._privateDataSubject.getValue()
		);
		delete newPrivate[ oldPid ];
		this._fullState.private = newPrivate;

		this._privateDataSubject.onNext( newPrivate );
	}

	_processCreateMessageAsync( content, from, index ) {
		if ( index ) {
			throw new Error( "Create meta message must be first" );
		}
		if ( !content.publicKey ) {
			throw new Error( "publicKey required in create message" );
		}

		if ( !content.dhPubKey ) {
			throw new Error( "dhPubKey required in create message" );
		}

		if ( this._isDisposed ) {
			content.publicKey.dispose();
			content.dhPubKey.dispose();
			return Rx.Observable.empty();
		}
		let ps = this._participantsSubject.getValue();

		ps[ content.pid ] = _.pick( content, [ "publicKey", "nickname", "dhPubKey" ] );
		this._fullState.ps = ps;

		this._participantsSubject.onNext( ps );
		return Rx.Observable.empty();
	}

	_processInviteMessageAsync( content, from, index ) {
		if ( this._invitesSubject.getValue()[ content.tmpPid ] ) {
			// return Rx.Observable.throw( new Error( "Duplicate invite tmpPid" ) ); //TODO: handle error
			console.error( "Duplicate invite tmpPid" );
			return Rx.Observable.just();
		}
		if ( this._isDisposed ) {
			content.tmpPublicKey.dispose();
			return Rx.Observable.empty();
		}
		if ( content.privateData ) {
			this._onNewPrivateData( content.tmpPid, content.privateData );
		}
		let invites = this._invitesSubject.getValue();
		invites[ content.tmpPid ] = {
			tmpPublicKey: content.tmpPublicKey,
			nickname: content.nickname,
			inviterPid: from,
			alias: content.alias,
			privateData: content.privateData
		};
		this._fullState.invites = invites;
		this._invitesSubject.onNext( invites );
		return Rx.Observable.fromPromise(
			ssgCrypto.createVerifierThen( content.tmpPublicKey, this._econfig )
				.then( verifier =>
					this._connection.addParticipant( content.tmpPid, verifier )
				)
		);
	}

	_processAcceptMessageAsync( content, from, index ) {
		let invites = this._invitesSubject.getValue();
		let invite = invites[ from ];
		if ( !invite ) {
			throw new Error( `No invite to accept from ${from}` );
		}

		if ( invite.used ) {
			console.warn( "Accept message for already used invite" );
			return Rx.Observable.just();
			// throw new Error( "Invite already used" );
		}

		if ( !content.publicKey ) {
			throw new Error( "publicKey required in accept message" );
		}

		if ( !content.dhPubKey ) {
			throw new Error( "dhPubKey required in accept message" );
		}

		if ( this._isDisposed ) {
			content.publicKey.dispose();
			content.dhPubKey.dispose();
			return Rx.Observable.empty();
		}
		let ps = this._participantsSubject.getValue();
		if ( ps[ content.pid ] ) {
			throw new Error( "Participant pid duplicate" );
		}
		if ( invite.privateData ) {
			this._onNewPrivateData( content.pid, invite.privateData, from );
		}

		invite.used = true;
		invite.newPid = content.pid;
		this._connection.removeParticipant( from );

		ps[ content.pid ] = _.pick( content, [ "publicKey", "dhPubKey" ] );
		if ( invite.alias ) {
			ps[ content.pid ].aliasTo = invite.inviterPid;
		} else {
			ps[ content.pid ].nickname = invite.nickname;
		}
		this._fullState.ps = ps;
		this._fullState.invites = invites;

		this._invitesSubject.onNext( invites );
		this._participantsSubject.onNext( ps );

		return Rx.Observable.fromPromise(
			ssgCrypto.createVerifierThen( content.publicKey, this._econfig )
				.then( verifier =>
					this._connection.addParticipant( content.pid, verifier )
				)
		);
	}

	_processRenameMessageAsync( content, from, index ) {
		let ps = this._participantsSubject.getValue();
		if ( !ps[ from ] ) {
			throw new Error( "No pid to rename" );
		}
		ps[ from ].nickname = content.nickname;
		this._fullState.ps = ps;
		this._participantsSubject.onNext( ps );
		return Rx.Observable.empty();
	}

	_processUpdateMessageAsync( content, from, index ) {
		let meta = _.assign( this._metaSubj.getValue(), content );
		this._fullState.meta = meta;
		this._metaSubj.onNext( meta );
		return Rx.Observable.empty();
	}

	_processRemoveMessageAsync( content, from, index ) {
		let ps = this._participantsSubject.getValue();
		if ( !ps[ content.pid ] ) {
			throw new Error( "No pid to remove" );
		}
		// Cannot remote participant as this will cause messages written by this
		// participant to be unable to be verified
		// delete ps[ content.pid ];
		// TODO: it must be possible to versionize participant list to check is
		// a particular participant presented at the multicast by the time it written
		// the message
		ps[ content.pid ].isExited = true;
		this._connection.removeParticipant( content.pid );
		this._fullState.ps = ps;
		this._participantsSubject.onNext( ps );
		if ( content.pid === this._pid ) {
			this._exitSubj.onNext();
			this._exitSubj.onCompleted();
		}
		return Rx.Observable.empty();
	}

	_processRemoveInviteMessageAsync( content, from, index ) {
		let invites = this._invitesSubject.getValue();
		if ( !invites[ content.tmpPid ] ) {
			throw new Error( "No tmpPid to remove" );
		}
		invites[ content.tmpPid ].used = true;
		this._connection.removeParticipant( content.tmpPid );
		this._fullState.invites = invites;
		this._invitesSubject.onNext( invites );
		return Rx.Observable.empty();
	}

	_processPrivateDataMessageAsync( content, from, index ) {
		this._onNewPrivateData( from, content.privateData );
		return Rx.Observable.empty();
	}

	_processMessageAsync( json, from, index ) {
		let content = json.content;
		if ( !json ) {
			this._errorsSubj.onNext( "Got meta connection failed message" );
			return Rx.Observable.empty();
		}
		console.log( "Meta message", json, this._connection._listenerId );
		switch( json.type ) {
			case "create":
				return this._processCreateMessageAsync( content, from, index );
			case "invite":
				return this._processInviteMessageAsync( content, from, index );
			case "accept":
				return this._processAcceptMessageAsync( content, from, index );
			case "rename":
				return this._processRenameMessageAsync( content, from, index );
			case "update":
				return this._processUpdateMessageAsync( content, from, index );
			case "remove":
				return this._processRemoveMessageAsync( content, from, index );
			case "removeinvite":
				return this._processRemoveInviteMessageAsync( content, from, index );
			case "privatedata":
				return this._processPrivateDataMessageAsync( content, from, index );
			default:
				debugger;
				console.error( `Unsupported message type: ${json.type}` );
				return Rx.Observable.throw( new Error( `Unsupported message type: ${json.type}` ) );
		}
	}

	hookFullStateChange( funcAsync ) {
		if ( this._fullStateChangeHook ) {
			throw new Error( "this._fullStateChangeHook already set" );
		}
		if ( typeof funcAsync !== "function" ) {
			throw new Error( "Function required" );
		}
		this._fullStateChangeHook = funcAsync;
	}

	get lastIndex( ) {
		return this._connection.lastIndex;
	}

	observeMeta( ) {
		return this._metaSubj;
	}

	observeMetaLatest( ) {
		return this._latestSubj.flatMap( () => this._metaSubj );
	}

	hasUsedInvite( tmpPid ) {
		let invites = this._invitesSubject.getValue();
		let invite = invites[ tmpPid ];
		return !!invite && !!invite.used;
	}

	observeExit( ) {
		return this._exitSubj;
	}

	observeIndexes( ) {
		return this._indexSubj;
	}

	getRootAliasPid( pid ) {
		let invites = this._invitesSubject.getValue();
		let invite;
		while ( invite = _.find( invites, { alias: true, newPid: pid, used: true } ) ) {
			pid = invite.inviterPid;
		}
		return pid;
	}

	getInvitePidByParticipantPid( pid ) {
		let invites = this._invitesSubject.getValue();
		return _.findKey( invites, { newPid: this.getRootAliasPid( pid ), used: true } );
	}

	waitForInvite( tmpPid ) {
		return this._invitesSubject.filter( invites => !!invites[ tmpPid ] ).first( { defaultValue: null } );
	}

	observeCachedMessages( ) {
		return this._connection.observeCachedMessages();
	}

	getCurrentParticipants( ) {
		return this._participantsSubject.getValue();
	}

	getFullState() {
		return this._fullState;
	}

	dispose( ) {
		if ( this._isDisposed ) {
			return;
		}
		this._isDisposed = true;
		this._creatorPublicKey.removeKeyUse( this._creatorPublicKeyHandle );
		this._seedKey.removeKeyUse( this._seedKeyHandle );

		this._connection.dispose();
		this._metaSubj.onCompleted();
		this._indexSubj.onCompleted();
		this._exitSubj.onCompleted();
		this._privateDataSubject.onCompleted();
		this._participantsSubject.onCompleted();
		this._invitesSubject.onCompleted();
	}
}

export default MetaConnection;
