import Rx from "rx";
import _ from "lodash";
import Queue from "promise-queue";
import ssgCrypto, {Key, KEY_KINDS, Config} from "ssg.crypto";

import configuration from "../../common/configuration.js";

import Transaction from "./transaction.js";
import MetaConnection from "./meta.connection.js";
import MetaConnectionWithRights from "./meta.connection.with.rights.js";
import P2PConnection from "./p2p.connection.js";
import ClientServerJSONMEK from "./client.server.json.mek.js";
import {callUntilSuccessAsync, callUntilSuccessThen} from "../../common/utils.js";
import ClientServerSequential from "./client.server.sequential.js";

export function getMetaConnectionIdBySharedId( sharedId ) {
	if ( configuration.getIsHashConnectionIds() ) {
		return ssgCrypto.hash( new Buffer( 1 ).fill( 0 ), sharedId ).toString( "base64" );
	}
	return "meta_" + sharedId.toString( "base64" );
}

export function getMessageConnectionIdBySharedId( sharedId ) {
	if ( configuration.getIsHashConnectionIds() ) {
		return ssgCrypto.hash( new Buffer( 1 ).fill( 1 ), sharedId ).toString( "base64" );
	}
	return "message_" + sharedId.toString( "base64" );
}

export function getP2PConnectionId( pid1, pid2 ) {
	// if ( pid1 > pid2 ) {
	// 	let tmp = pid1;
	// 	pid1 = pid2;
	// 	pid2 = tmp;
	// }
	let connectionId;
	if ( configuration.getIsHashConnectionIds() ) {
		connectionId = ssgCrypto.hash(
			new Buffer( pid1, "base64" ),
			new Buffer( pid2, "base64" )
		).toString( "base64" );
	} else {
	 	connectionId = "p2p_" + pid1 + "_" + pid2;
	}
	return connectionId;
}

class Multicast {
	constructor( json, privateMessagesHandlers, useMetaWithRights ) {
		if ( !json.privateKey ) {
			throw new Error( "privateKey required" );
		}

		if ( !json.pid ) {
			throw new Error( "pid required" );
		}

		if ( !json.creatorPublicKey ) {
			throw new Error( "creatorPublicKey required" );
		}

		if ( !json.creatorPid ) {
			throw new Error( "creatorPid required" );
		}

		if ( !json.seedKey ) {
			throw new Error( "seedKey required" );
		}

		if ( !json.dhPrivKey ) {
			throw new Error( "dhPrivKey required" );
		}

		if ( !json.sharedId ) {
			throw new Error( "sharedId required" );
		}

		if( !json.apiUrlBase ) {
			throw new Error( "apiUrlBase required" );
		}

		this._privateMessagesHandlers = _.clone( privateMessagesHandlers );
		this._privateMessagesHandlers[ "message" ] = ( p2p, from, data, index ) => {
			if ( !this._onAsyncMessage ) {
				this._incomingMesageQueue.push( [ Rx.Observable.just( data ), null, from, null ] );
				return;
			}
			this._onAsyncMessage( Rx.Observable.just( data ), null, from, null )
		};


		this._privateKeyHandle = json.privateKey.addKeyUse( "multicast" );
		this._seedHandle = json.seedKey.addKeyUse( "multicast" );
		this._creatorPublicKeyHandle = json.creatorPublicKey.addKeyUse( "multicast" );

		if ( Buffer.isBuffer( json.pid ) ) {
			json.pid = json.pid.toString( "base64" );
		}

		if ( Buffer.isBuffer( json.creatorPid ) ) {
			json.creatorPid = json.creatorPid.toString( "base64" );
		}

		this._econfig = json.econfig;
		if ( !( this._econfig instanceof Config ) ) {
			this._econfig = new Config( this._econfig );
		}

		let signer = ssgCrypto.createSigner( json.privateKey, this._econfig );

		this._metaConnectionId = getMetaConnectionIdBySharedId( json.sharedId );
		this._messageConnectionId = getMessageConnectionIdBySharedId( json.sharedId );

		this._data = json;
		if ( !this._data.perParticipant ) {
			this._data.perParticipant = [];
		}
		this._addedParticipants = [];

		this._signer = signer;
		this._p2pConnections = Object.create( null );
		this._p2pConnectionsWaitings = Object.create( null );
		this._errorsSubj = new Rx.ReplaySubject();
		this._transactionsInProgressSubj = new Rx.BehaviorSubject( 0 );
		this._messageQueue = new Queue( 1 );

		this._setUpMetaConnection( json, useMetaWithRights );
		this._setUpMessageConnection( json );
	}

	_setUpMetaConnection( json, useMetaWithRights ) {
		let Connection = useMetaWithRights ? MetaConnectionWithRights : MetaConnection;
		this._metaConnection = new Connection(
			json.apiUrlBase,
			this._metaConnectionId,
			json.pid,
			json.seedKey,
			this._signer,
			json.creatorPid,
			json.creatorPublicKey,
			this._econfig,
			json.metaState
		);

		this._metaConnection.observeParticipantsLatest().subscribe( this._participantsChanged.bind( this ) );
		this._metaConnection.observeErrors().subscribe( error => {
			this._errorsSubj.onNext( error );
		} );
	}

	hookMetaFullStateChange( funcAsync ) {
		this._metaConnection.hookFullStateChange( funcAsync );
		this._metaConnection.onConnected( () => {
			Multicast._tryListenUntilSuccessAsync( this._metaConnection ).subscribe();
		} );
	}

	_setUpMessageConnection( json ) {
		//TODO: make different multicast types
		let Connection = ClientServerJSONMEK;
		let {messageIndex} = json;

		this._messageProcessedSubj = new Rx.BehaviorSubject( {} );

		let zeroKeyThen = ssgCrypto.createKeyFromBufferThen(
			new Buffer( 0 ),
			KEY_KINDS.INTERMEDIATE,
			this._econfig
		);
		let encryptionKeyThen = zeroKeyThen.then( zeroKey =>
			ssgCrypto.createDerivedKeyFromKeysThen(
				json.seedKey,
				zeroKey,
				KEY_KINDS.SYMMETRIC_ENCRYPTION,
				this._econfig
			)
		);
		this._messageConnection = new Connection(
			json.apiUrlBase,
			this._messageConnectionId,
			json.pid,
			messageIndex,
			this._signer,
			this._econfig,
			encryptionKeyThen
		);

		//TODO: remove
		this._messageConnection._multicast = this;

		this._receivedKeyDataUpdatePerMessage = Object.create( null );
		this._messageConnection.name = "MEK";
		this._messageConnection.onSendPrivate( ( transaction, privateData ) => {
			for ( let pid in this._p2pConnections ) {
				this._p2pConnections[ pid ].subscribe( c => {
					c.sendPrivate( "message", privateData, transaction );
				} );
			}
		} );

		this._messageConnection.onEncrypt( ( hash, key, transaction ) => {
			for ( let pid in this._p2pConnections ) {
				let waitSubj = new Rx.ReplaySubject();
				let keyHandle = key.addKeyUse( "mek add to transaction" );
				transaction.waitFor( waitSubj, "adding MEK keys to transaction" );
				this._p2pConnections[ pid ].subscribe( c => {
					let mekData = { hash, key };
					c.sendMEK( mekData, transaction );
					key.removeKeyUse( keyHandle );
				}, error => {
					//TODO: handle
					console.error( error );
					waitSubj.onCompleted();
				}, () => {
					waitSubj.onCompleted();
				} );
			}
		} );

		this._incomingMesageQueue = [];
		this._onAsyncMessage = null;

		this._messageConnection.onAsyncMessage( ( asyncMsg, index, from, hashStr ) => {
			if ( !asyncMsg ) {
				this._errorsSubj.onNext( "Got failed message" );
				return;
			}
			if ( !this._receivedKeyDataUpdatePerMessage[ hashStr ] ) {
				this._receivedKeyDataUpdatePerMessage[ hashStr ] = new Rx.ReplaySubject( 1 );
			}
			let asyncMsgData = Rx.Observable.combineLatest(
				asyncMsg, this._receivedKeyDataUpdatePerMessage[ hashStr ],
				( asyncMsg, keyUpdatesPerPid ) => ( { keyUpdatesPerPid, ...asyncMsg } )
			);
			if ( !this._onAsyncMessage ) {
				this._incomingMesageQueue.push( [ asyncMsgData, index, from, hashStr ] );
				return;
			}
			this._onAsyncMessage( asyncMsgData, index, from, hashStr )
		} );
		this._messageConnection.onConnected( () => {
			Multicast._tryListenUntilSuccessAsync( this._messageConnection ).subscribe();
		} );
	}

	runTransactionAsync( func, skipQueue, name ) {
		return Rx.Observable.fromPromise( this.runTransactionThen( func, skipQueue, name ) );
	}

	runTransactionThen( func, skipQueue, name ) {
		let count = this._transactionsInProgressSubj.getValue() + 1;
		this._transactionsInProgressSubj.onNext( count );
		return (
			Transaction.runWithRetriesThen( func, skipQueue, name )
				.then( res => {
					let count = this._transactionsInProgressSubj.getValue() - 1;
					this._transactionsInProgressSubj.onNext( count );
					return res;
				} )
		);
	}

	queryLatestMekKeyAsync( ) {
		return (
			this.runTransactionAsync( ( transaction, prevResult ) => {
				let waitSubj = new Rx.Subject();
				transaction.waitFor( waitSubj, "latest key" );
				this._metaConnection.observeParticipantsLatest()
					.first( { defaultValue: {} } )
					.map( ps => _.keys( ps ) )
					.flatMap( pids => Rx.Observable.fromArray( pids )
						.flatMap( pid => this.waitForParticipantAsync( pid )
							.flatMap( () => this._p2pConnections[ pid ] )
						)
					)
					.tap( connection => {
						this._sendPrivateMessage(
							"querylatestmek",
							{},
							connection,
							transaction
						);
					} )
					.toArray()
					.subscribe( () => waitSubj.onCompleted() )
			} )
		);
	}

	observeErrors( ) {
		return this._errorsSubj;
	}

	getLatestKeyAsync( ) {
		return this._messageConnection.getLatestKeyAsync();
	}

	getPid( ) {
		return this._data.pid;
	}

	onSelfMessage( func ) {
		this._messageConnection.onSelfMessage( func );
	}

	onAsyncMessage( func ) {
		if ( typeof func !== "function" ) {
			throw new Error( "onAsyncMessage must be a function" );
		}
		if ( this._onAsyncMessage ) {
			throw new Error( "onAsyncMessage already set" );
		}
		this._onAsyncMessage = func;
		this._incomingMesageQueue.forEach( ( [ msg, index, from, hashStr ] ) =>
			func( msg, index, from, hashStr )
		);
		delete this._incomingMesageQueue;
	}

	onNewParticipant( func ) {
		if ( this._onNewParticipant ) {
			throw new Error( "_onNewParticipant already set" );
		}
		this._onNewParticipant = func;

		for ( let i = 0; i < this._addedParticipants.length; i++ ) {
			let pData = this._addedParticipants[ i ];
			this._onNewParticipant( pData ).subscribe();
		}
	}

	static _tryListenUntilSuccessAsync( connection ) {
		return callUntilSuccessAsync( connection.tryListen.bind( connection ) );
	}

	_listenAsync( ) {
		return Rx.Observable.combineLatest(
			Multicast._tryListenUntilSuccessAsync( this._messageConnection ),
			Multicast._tryListenUntilSuccessAsync( this._metaConnection ),
			( res1, res2 ) => this
		);
	}

	_getP2pConnectionAsync( pid ) {
		if ( this._p2pConnections[ pid ] ) {
			return this._p2pConnections[ pid ];
		}
		if ( !this._p2pConnectionsWaitings[ pid ] ) {
			this._p2pConnectionsWaitings[ pid ] = new Rx.ReplaySubject( 1 );
		}
		return this._p2pConnectionsWaitings[ pid ];
	}

	_initNewP2p( pid, p ) {
		if ( this._p2pConnections[ pid ] ) {
			throw new Error( "p2p connection alredy initialized" );
		}
		let pData = _.find( this._data.perParticipant, { pid } );
		if ( !pData ) {
			pData = {
				pid,
				dhPrivKey: this._data.dhPrivKey,
				dhPubKey: p.dhPubKey
			};
			this._data.perParticipant.push( pData );
		}
		let subj = this._p2pConnections[ pid ] = this._p2pConnectionsWaitings[ pid ] || new Rx.ReplaySubject();
		let timeout = setInterval( () => {
			console.warn( "_p2pConnections[pid] timeout", pid );
		}, 10000 );

		subj.subscribeOnCompleted( () => {
			clearInterval( timeout );
		} );
		( () => {
			if ( !this._p2pConnections[ pid ] || this._isDisposed ) {
				subj.onCompleted();
				return Promise.reject();
			}

			let wait = this._onNewParticipant && this._onNewParticipant( pData );
			if ( !wait ) {
				console.warn( "Add participant ignored", this._onNewParticipant );
			}
			if ( !this._onNewParticipant ) {
				this._addedParticipants.push( pData );
			}
			if ( wait ) {
				return new Promise( resolve => {
					wait.subscribeOnCompleted( () => {
						resolve( true );
					} );
				} );
			}
			return Promise.resolve();
		} )()
		.then( res => {
			let sConnectionId = getP2PConnectionId( pid, this._data.pid );
			let rConnectionId = getP2PConnectionId( this._data.pid, pid );
			let permKeys = {
				seedKey: this._data.seedKey,
				dhPrivKey: this._data.dhPrivKey,
				dhPubKey: p.dhPubKey
			};
			let p2p;
			try {
				p2p = new P2PConnection(
					this._data.apiUrlBase,
					sConnectionId,
					rConnectionId,
					pData,
					permKeys,
					this._econfig,
					pid, this._data.pid
				);
			} catch( error ) {
				this._errorsSubj.onNext( "P2p connection create error: " + error );
				return;
			}
			p2p.observeErrors().subscribe( error => {
				this._errorsSubj.onNext( "P2p error: " + error );
			} );

			p2p.onMEKMessage( ( message, index, keyData ) => {
				if ( typeof index !== "number" ) {
					throw new Error( "p2p index required" );
				}
				let hashStr = message.hash.toString( "base64" );
				this._messageConnection.addKey( pid, message.hash, message.key );
				for ( let name in keyData ) {
					if ( keyData[ name ] instanceof Key ) {
						keyData[ name + "Handle" ] = keyData[ name ].addKeyUse( "Pre store" );
					}
				}
				if ( !this._receivedKeyDataUpdatePerMessage[ hashStr ] ) {
					this._receivedKeyDataUpdatePerMessage[ hashStr ] = Rx.Observable.just( { [ pid ]: keyData } )
				} else {
					this._receivedKeyDataUpdatePerMessage[ hashStr ].onNext( { [ pid ]: keyData } );
					this._receivedKeyDataUpdatePerMessage[ hashStr ].onCompleted();
				}
				return (
					this._messageProcessedSubj
						.filter( hashes => !!hashes[ hashStr ] ).take( 1 )
				);
			} );

			p2p.onKeyDataUpdate( keyData => {
				return (
					this._onP2pKeyDataUpdate
					? this._onP2pKeyDataUpdate( pid, keyData )
					: Promise.resolve()
				);
			} );

			_.forEach(
				this._privateMessagesHandlers,
				( handler, type ) =>
					p2p.onPrivateMessage(
						type,
						handler.bind( this, this, pid )
					)
			);

			p2p.listenAsync().subscribe();
			p2p.onConnected( () => {
				//Reconnect
				p2p.listenAsync().subscribe();
			} );
			subj.onNext( p2p );
			subj.onCompleted();
		} );
	}

	setMessageProcessed( hashStr ) {
		let obj = this._messageProcessedSubj.getValue();
		obj[ hashStr ] = true;
		this._messageProcessedSubj.onNext( obj );
	}

	_participantsChanged( participants ) {
		for( let pid in participants ) {
			let p = participants[ pid ];
			if ( this._p2pConnections[ pid ] ) {
				continue;
			}

			//In case of exited participant it must stay for ability to verify it's messages
			ssgCrypto.createVerifierThen( p.publicKey, this._econfig )
				.then( verifier => {
					if ( !this._messageConnection.hasParticipant( pid ) ) {
						this._messageConnection.addParticipant( pid, verifier );
					}
				} );
			if ( p.isExited || ( pid === this.getPid() ) ) {
				continue;
			}
			this._initNewP2p( pid, p );
		}

		for( let pid in this._p2pConnections ) {
			let p = participants[ pid ];
			if ( p && !p.isExited ) {
				continue;
			}
			//It is possible that some sending is in progress
			this._p2pConnections[ pid ].subscribe( p2p => { p2p.dispose() } );
			delete this._p2pConnections[ pid ];
		}
	}

	static createNewAsync(
		apiUrlBase,
		nickname,
		sharedId,
		pid,
		seedKey,
		dhPrivKey,
		signPrivKey,
		econfig,
		privateMessagesHandlers,
		useMetaWithRights
	) {
		return (
			Rx.Observable.fromPromise( Promise.all( [
				ssgCrypto.createSignaturePublicKeyThen( signPrivKey, econfig ),
				ssgCrypto.createKeyExchangePublicKeyThen( dhPrivKey, econfig )
			] ).then( ( [ verifyKey, dhPubKey ] ) => ( {
					m: new Multicast( {
						apiUrlBase,
						privateKey: signPrivKey,
						pid,
						creatorPublicKey: verifyKey,
						creatorPid: pid,
						dhPrivKey: dhPrivKey,
						seedKey,
						sharedId,
						econfig
					}, privateMessagesHandlers, useMetaWithRights ),
					dhPubKey
				} )
			) )
				.flatMap( p => Rx.Observable.combineLatest(
					p.m._listenAsync(),
					Transaction.runWithRetriesAsync( transaction => {
						p.m._metaConnection.sendCreateMessage( {
							dhPubKey: p.dhPubKey,
							nickname
						}, transaction );
					} ),
					() => p.m
				) )
		);
	}

	static joinAsync( nickname, invite, privateMessagesHandlers, useMetaWithRights ) {
		return (
			Multicast.tryJoinAsync( nickname, invite, privateMessagesHandlers, useMetaWithRights )
				.flatMap( m => {
					if ( !m ) {
						return Rx.Observable.throw( new Error( "Invite already used" ) );
					}
					return Rx.Observable.just( m );
				} )
			);
	}

	static createJoinDataAsync( nickname, invite, privateMessagesHandlers, useMetaWithRights ) {
		let tmpMessageConnection = new ClientServerSequential(
			invite.apiUrlBase,
			getMessageConnectionIdBySharedId( invite.sharedId )
		);
		return Rx.Observable.fromPromise( Promise.all( [
			callUntilSuccessThen( cb => tmpMessageConnection.tryGetMaxIndex( cb ) ),
			ssgCrypto.createSignaturePublicKeyThen( invite.signKey, invite.econfig ),
			ssgCrypto.createKeyExchangePublicKeyThen( invite.dhPrivKey, invite.econfig )
		] ).then( ( [ { maxIndex }, publicKey, dhPubKey ] ) => ( {
				joinMetaMessage: { publicKey, dhPubKey, nickname, pid: invite.pid },
				m: new Multicast( {
					apiUrlBase: invite.apiUrlBase,
					privateKey: invite.signKey,
					publicKey,
					pid: invite.pid,
					creatorPublicKey: invite.creatorPublicKey,
					creatorPid: invite.creatorPid,
					dhPrivKey: invite.dhPrivKey,
					dhPubKey,
					seedKey: invite.seedKey,
					sharedId: invite.sharedId,
					messageIndex: maxIndex + 1,
					econfig: invite.econfig,
					metaState: invite.metaState
				}, privateMessagesHandlers, useMetaWithRights )
			} )
		) );
	}

	/*
		tmpPrivateKey,
		pid,
		nickname,
		seedKey: this._data.seedKey,
		creatorPid: this._data.creatorPid,
		creatorPublicKey: this._data.creatorPrivateKey,
		verifiedBeforeIndex: 0, //TODO: current index
		verifiedHash: null, //TODO: current hash
		sharedId: this._data.sharedId,
		apiUrlBase: this._data.apiUrlBase
	*/
	static tryJoinAsync( nickname, invite, privateMessagesHandlers, useMetaWithRights ) {
		if ( !invite.apiUrlBase ) {
			throw new Error( "invite.apiUrlBase required" );
		}
		if ( !invite.creatorPid ) {
			throw new Error( "invite.creatorPid required" );
		}
		if ( !invite.tmpPid ) {
			throw new Error( "invite.tmpPid required" );
		}
		if ( typeof invite.pid !== "string" ) {
			throw new Error( "Pid expected to be a string" );
		}
		let { econfig } = invite;
		let Connection = useMetaWithRights ? MetaConnectionWithRights : MetaConnection;
		let tmpMeta = new Connection(
				invite.apiUrlBase,
				getMetaConnectionIdBySharedId( invite.sharedId ),
				invite.tmpPid,
				invite.seedKey,
				ssgCrypto.createSigner( invite.tmpPrivateKey, econfig ),
				invite.creatorPid,
				invite.creatorPublicKey,
				econfig,
				_.cloneDeep( invite.metaState )
			);

		tmpMeta.onConnected( () => {
			Multicast._tryListenUntilSuccessAsync( tmpMeta ).subscribe();
		} );
		return (
			callUntilSuccessAsync( tmpMeta.tryListen.bind( tmpMeta ) )
				.flatMap( () => tmpMeta.observeParticipantsLatest().take( 1 ) )
				.flatMap( () => {
					if ( tmpMeta.hasUsedInvite( invite.tmpPid ) ) {
						return Rx.Observable.just( null );
					}
					return (
						Multicast.createJoinDataAsync( nickname, invite,
							privateMessagesHandlers, useMetaWithRights
						)
						.flatMap( o => o.m._listenAsync().map( res => o ) )
						.flatMap( ( { joinMetaMessage, m } ) => {
							return Transaction.runWithRetriesAsync( transaction => {
								if ( m.hasUsedInvite( invite.tmpPid ) ) {
									//TODO: specific error
									m.dispose();
									tmpMeta.dispose();
									return;
								}
								tmpMeta.sendAcceptMessage( joinMetaMessage, transaction );
							} ).map( res => res.empty ? null : m );
						} )
					);
				} )
		);
	}

	static fromJSONAsync( json, privateMessagesHandlers, useMetaWithRights ) {
		let m = new Multicast( json, privateMessagesHandlers, useMetaWithRights );
		m._metaConnection.getMaxIndexAsync()
			.subscribe( index => {
				if ( !~index ) {
					m._errorsSubj.onNext( "No meta messages" );
				}
			} );
		return m._listenAsync();
	}

	onP2pKeyDataUpdate( func ) {
		if ( this._onP2pKeyDataUpdate ) {
			throw new Error( "this._onP2pKeyDataUpdate already set" );
		}
		this._onP2pKeyDataUpdate = func;
	}

	createInviteAsync( nickname, alias = false, rights = undefined ) {
		return this.conditionalyCreateInviteAsync( nickname, alias, null, undefined, rights );
	}

	conditionalyCreateInviteAsync( nickname, alias, condition, privateData, rights, inviteAction ) {
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}
		//TODO: make one queue for receive and send message, so no simultaneous sending and receiveing
		//This will ensure absence of duplicate invites
		if ( typeof nickname !== "string" ) {
			throw new Error( "nickname string required" );
		}
		return Rx.Observable.fromPromise( Promise.all( [
			ssgCrypto.createRandomSignatureKeyPairThen( this._econfig ),
			ssgCrypto.createRandomBufferThen( configuration.getIdsLength() )
				.then( buf => buf.toString( "base64" ) )
		] ).then( ( [ { publicKey: tmpPublicKey, privateKey: tmpPrivateKey }, tmpPid ] ) => ( {
				inviteJSON: {
					tmpPrivateKey,
					tmpPid,
					nickname,
					seedKey: this._data.seedKey,
					creatorPid: this._data.creatorPid,
					creatorPublicKey: this._data.creatorPublicKey,
					sharedId: this._data.sharedId,
					apiUrlBase: this._data.apiUrlBase,
					metaState: this._metaConnection.getFullState(),
					econfig: this._econfig.toJson()
				},
				inviteMetaMessage: {
					nickname,
					tmpPublicKey,
					tmpPid,
					privateData,
					alias,
					rights
				}
			} )
		) )
		.flatMap( ( { inviteJSON, inviteMetaMessage } ) => {
			return this.runTransactionAsync( ( transaction, prevResult ) => {
				if ( condition ) {
					let res = condition();
					if ( res.subscribe ) {
						let waitSubj = new Rx.Subject();
						transaction.waitFor( waitSubj, "Condition to calculate" );
						res.subscribe( resBool => {
							if ( resBool ) {
								this._metaConnection.sendInviteMessage( inviteMetaMessage, transaction );
								inviteAction && inviteAction( inviteJSON, transaction );
							}
							waitSubj.onCompleted();
						} );
						return;
					}
					if ( !res ) {
						return;
					}
				}
				this._metaConnection.sendInviteMessage( inviteMetaMessage, transaction );
				inviteAction && inviteAction( inviteJSON, transaction );
			} )
			.map( res => {
				inviteMetaMessage.tmpPublicKey.dispose();
				if ( res.empty ) {
					//Condition not met
					inviteJSON.tmpPrivateKey.dispose();
					return null;
				}
				return inviteJSON;
			} );
		} );
	}

	changeRightsAsync( changes ) {
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}
		return this.runTransactionAsync( transaction => {
			this._metaConnection.sendChangeRights( {changes}, transaction )
		} );
	}

	removeAsync( pid ) {
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}
		let removeMessage = { pid: pid.toString( "base64" ) };
		return this.runTransactionAsync( transaction => {
			let p = this._metaConnection.getCurrentParticipants()[ pid ];
			if ( !p || p.isExited ) {
				return;
			}
			this._metaConnection.sendRemoveMessage( removeMessage, transaction );
		} );
	}

	removeInviteAsync( tmpPid ) {
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}
		let removeMessage = { tmpPid: tmpPid.toString( "base64" ) };
		return this.runTransactionAsync( transaction => {
			let waitSubj = new Rx.Subject();
			transaction.waitFor( waitSubj, "Invite list" );
				this._metaConnection.observeInvites()
					.first( { defaultValue: {} } )
					.subscribe( invites => {
						if ( invites[ tmpPid ] ) {
							this._metaConnection.sendRemoveInviteMessage( removeMessage, transaction );
						}
						waitSubj.onCompleted();
					} );
			} );
	}

	renameAsync( nickname ) {
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}
		let renameMessage = { nickname };
		return this.runTransactionAsync( transaction => {
			this._metaConnection.sendRenameMessage( renameMessage, transaction );
		} );
	}


	sendUpdateMessageAsync( update ) {
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}
		return this.runTransactionAsync( transaction => {
			this._metaConnection.sendUpdateMessage( update, transaction );
		} );
	}

	sendLatestMessageAsync( json, ifIndex ) {
		return Rx.Observable.fromPromise( this.sendLatestMessageThen( json, ifIndex ) );
	}

	sendLatestMessageThen( json ) {
		let index;
		return this._messageQueue.add( () =>
			this.waitParticipantsLatestThen()
			 	.then( () => this.runTransactionThen( transaction => {
					index = this._messageConnection.getSendIndex();
					this._metaConnection.checkIndexIsLatest( transaction );
					this._messageConnection.send( json, transaction, index );
				}, true, "Send latest message " + json.text ) )
				.then( resp => ( { json, from: this.getPid(), resp, index } ) )
		);
	}

	sendMessageAsync( json, ifIndex ) {
		return Rx.Observable.fromPromise( this.sendMessageThen( json, ifIndex ) );
	}

	sendMessageThen( json, ifIndex ) {
		// return this.sendMessagePrivatelyThen( json, ifIndex );
		let startAt = +new Date;
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}
		console.log( "sending message", new Date);

		return this._messageQueue.add( () =>
			// this.waitParticipantsLatestThen()
			// 	.then( () => {
			// 		console.log( "waitParticipantsLatestThen took", (+new Date - startAt ) + " ms");
			// 	} )
			 	/*.then( () => */this.runTransactionThen( transaction => {
					this._metaConnection.checkIndexIsLatest( transaction );
					this._messageConnection.send( json, transaction, ifIndex );
				}, true, "Send message " + json.text ) //)
				.then( resp => {
					console.log( "sentMessage in ms", +new Date - startAt);
					return ( { json, from: this.getPid(), resp } );
				} )
		);
	}

	sendMessagePrivatelyThen( json, ifIndex ) {
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}

		return this._messageQueue.add( () =>
			this.waitParticipantsLatestThen()
			 	.then( () => this.runTransactionThen( transaction => {
					this._metaConnection.checkIndexIsLatest( transaction );
					this._messageConnection.sendPrivately( json, transaction, ifIndex );
				}, true ) )
				.then( resp => ( { json, from: this.getPid(), resp } ) )
		);
	}

	sendPrivateMessageAsync( type, data, pid, transaction = null ) {
		return Rx.Observable.fromPromise( this.sendPrivateMessageThen( type, data, pid, transaction ) );
	}

	sendPrivateMessageThen( type, data, pid, transaction = null ) {
		//if transaction is set, waiting is assumed
		if ( this._isDisposed ) {
			throw new Error( "Disposed multicast call" );
		}
		return this._messageQueue.add( () =>
			this.waitParticipantsLatestThen()
				.then( () => new Promise( ( resolve, reject ) => {
					let cAsync = this._getP2pConnectionAsync( pid );
					if ( !cAsync ) {
						debugger;
						return Rx.Observable.throw( new Error( "Participant deleted" ) );
					}
					return cAsync.subscribe( connection => transaction
						? resolve( this._sendPrivateMessage( type, data, connection, transaction ) )
						: this.runTransactionThen( transaction => {
							this._sendPrivateMessage( type, data, connection, transaction );
						} ).then( resolve, reject )
					);
				} ) )
		);
	}

	_sendPrivateMessage( type, data, connection, transaction ) {
		connection.sendPrivate( type, data, transaction );
	}

	observeExit( func ) {
		return this._metaConnection.observeExit();
	}

	waitForParticipantAsync( pid ) {
		return (
			this._metaConnection.observeParticipantsLatest().flatMap( ps =>
				ps[ pid ] ? Rx.Observable.just( this ) : Rx.Observable.empty()
			)
			.first( { defaultValue: null } )
		);
	}

	get sharedId( ) {
		return this._data.sharedId;
	}

	get dhPrivKey( ) {
		return this._data.dhPrivKey;
	}

	get privateKey( ) {
		return this._data.privateKey;
	}

	get apiUrlBase( ) {
		return this._data.apiUrlBase;
	}

	get seedKey( ) {
		return this._data.seedKey;
	}

	get econfig( ) {
		return this._econfig;
	}

	observeParticipants( ) {
		return this._metaConnection.observeParticipants();
	}

	waitParticipantsLatestThen() {
		return this._metaConnection.waitParticipantsLatestThen();
	}

	observeParticipantsLatest( ) {
		return this._metaConnection.observeParticipantsLatest();
	}

	observeMeta( ) {
		return this._metaConnection.observeMeta();
	}

	observeMetaLatest( ) {
		return this._metaConnection.observeMetaLatest();
	}

	waitForMetaIndexThen( index ) {
		return new Promise( ( resolve, reject ) => {
			this.waitForMetaIndexAsync( index ).subscribe( resolve, reject );
		} );
	}

	waitForMetaIndexAsync( index ) {
		return (
			this._metaConnection.observeIndexes( )
				.filter( currentIndex => currentIndex >= index )
				.first( { defaultValue: null } )
		);
	}

	observeMetaIndex( ) {
		return this._metaConnection.observeIndexes();
	}

	getP2PConnectionIds( pids ) {
		return _.map( pids, pid => getP2PConnectionId( pid, this._data.pid ) );
	}

	observeInvites( ) {
		return this._metaConnection.observeInvites();
	}

	observeInvitesLatest( ) {
		return this._metaConnection.observeInvitesLatest();
	}

	isOutgoingParticipant( pid ) {
		return (
			this._metaConnection.getRootAliasPid( this.getPid() )
				=== this._metaConnection.getRootAliasPid( pid )
		);
	}

	observePrivateDataLatest( ) {
		return this._metaConnection.observePrivateDataLatest();
	}

	publishPrivateDataAsync( privateData ) {
		return this.runTransactionAsync( transaction => {
			this._metaConnection.sendPrivateDataMessage( { privateData }, transaction );
		} );
	}

	getRootAliasPid( pid ) {
		return this._metaConnection.getRootAliasPid( pid );
	}

	observeRights( ) {
		return this._metaConnection.observeRights();
	}

	dispose( ) {
		this.disposeAsync().subscribe();
	}

	disposeAsync( ) {
		if ( this._isDisposed ) {
			return Rx.Observable.just();
		}
		this._isDisposed = true;

		return (
			this._transactionsInProgressSubj
				.filter( count => count === 0 )
				.first()
				.tap( () => { this._dispose() } )
		);
	}

	_dispose( ) {
		this._metaConnection.dispose();
		this._messageConnection.dispose();

		for( let pid in this._p2pConnections ) {
			this._p2pConnections[ pid ].subscribe( c => c.dispose() );
		}
		this._data.privateKey.removeKeyUse( this._privateKeyHandle );
		this._data.seedKey.removeKeyUse( this._seedHandle );
		this._data.creatorPublicKey.removeKeyUse( this._creatorPublicKeyHandle );
		this._messageProcessedSubj.onCompleted();
	}

	isDisposed( ) {
		return this._isDisposed;
	}

	getMessageMaxIndexAsync( ) {
		return this._messageConnection.getMaxIndexAsync();
	}

	hasUsedInvite( pid ) {
		return this._metaConnection.hasUsedInvite( pid );
	}

	getInvitePidByParticipantPid( pid ) {
		return this._metaConnection.getInvitePidByParticipantPid( pid );
	}

	static getMetaConnectionIdBySharedId( sharedId ) {
		return getMetaConnectionIdBySharedId( sharedId );
	}

	static getMessageConnectionIdBySharedId( sharedId ) {
		return getMessageConnectionIdBySharedId( sharedId );
	}
}

export default Multicast;
