import Rx from "rx";
import _ from "lodash";

import ContactServiceCommon from "./contact.common.js";
import HistoryContainer from "../transport/history.container.js";
import configuration from "../../common/configuration.js";
import Transaction from "../transport/transaction.js";
import {ChatMessageText,ChatMessageBase,ChatMessageDelete,MESSAGE_TYPE} from "../models/chat.message.js";
import timeServiceLocator from "./locators/time.js";
import profileServiceLocator from "./locators/profile.js";

class ContactWithHistoryService extends ContactServiceCommon {
	constructor( contactType, sharedcontactsService ) {
		super( contactType );
		this._historyContainers = Object.create( null );
		this._messagesWithHistoryObservable = Object.create( null );
		// this._historySynchronizers = Object.create( null );
		this._sendingMessages = Object.create( null );
		this._autocleanTimeSubjs = Object.create( null );
		//TODO: smells?
		this._sharedcontactsService = sharedcontactsService;
		this._onMessageRead = this._onMessageRead.bind( this );
		this._timeService = timeServiceLocator();
	}

	initAsync( ...params ) {
		this._monitoringMessages = {};
		return super.initAsync( ...params );
	}

	_observeBaseAutocleanTime( contactId ) {
		let contact = this._findContactById( contactId );
		if ( !contact ) {
			return Rx.Observable.empty();
		}
		if ( contact.multidescriptionId === -1 ) {
			let profileService = profileServiceLocator();
			return (
				profileService.mutationObservable.startWith( null )
					.flatMap( () => profileService.getProfileAsync() )
					.map( profile => profile && profile.autoclean )
			);
		}
		return (
			this._sharedcontactsService.observeAutocleanTime( contact.multidescriptionId )
		);
	}

	observeAutocleanTime( contactId ) {
		return this._observeAutocleanTime( contactId );
	}

	_observeAutocleanTime( contactId ) {
		if ( !this._autocleanTimeSubjs[ contactId ] ) {
			this._autocleanTimeSubjs[ contactId ] = new Rx.ReplaySubject( 1 );
				Rx.Observable.combineLatest(
					this._observeBaseAutocleanTime( contactId ),
					this._getMulticastOrNullAsync( contactId, true )
						.filter( m => !!m )
						.flatMap( multicast => multicast.observeMetaLatest()
							.map( meta => meta[
								"autocleanTimeMax-" + multicast.getRootAliasPid( multicast.getPid() )
							] )
							.distinctUntilChanged()
						),
					( baseAutoclean, autocleanTimeMax ) => {
						if ( !autocleanTimeMax ) {
							return baseAutoclean;
						}
						if ( !baseAutoclean ) {
							return autocleanTimeMax;
						}
						return Math.min( baseAutoclean, autocleanTimeMax );
					}
				).distinctUntilChanged().subscribe( autocleanTime => {
					this._autocleanTimeSubjs[ contactId ].onNext( autocleanTime );
					this._updateHistoryTtlOnServer( contactId, autocleanTime );
				} );
		}
		return this._autocleanTimeSubjs[ contactId ];
	}

	_updateHistoryTtlOnServer( contactId, autocleanTime ) {
		if ( !this._historyContainers[ contactId ] ) {
			this._historyContainers[ contactId ] = new Rx.ReplaySubject();
		}
		let historyContainer;
		this._historyContainers[ contactId ]
			.flatMap( hc => hc.updateTtlAsync( autocleanTime || 0 ) )
			.subscribe();
	}

	filterOutOldMessagesAsync( contactId, messages ) {
		let contact = this._findContactById( contactId );
		if ( !contact ) {
			return Rx.Observable.empty();
		}
		return (
			this._observeAutocleanTime( contactId ).take( 1 )
				.flatMap( autoclean =>
					this._timeService.local2ServerTimeAsync(
						+new Date - ( autoclean || 10 * 365 * 24 * 3600 ) * 1000
					)
				)
				.map( skipOlderThan => {
					messages = _.pickBy( messages );
					if ( !_.find( messages, ( message, index ) =>
						message.ttlBaseTimestamp
						&& ( message.ttlBaseTimestamp < skipOlderThan )
					) ) {
						return messages;
					}
					let newMessages = Object.create( null );
					for( let index in messages ) {
						let value = messages[ index ];
						if ( !value.ttlBaseTimestamp || ( value.ttlBaseTimestamp >= skipOlderThan ) ) {
							newMessages[ index ] = value;
						}
					}
					//TODO: remove from historyContainer?
					return newMessages;
				} )
		);
	}

	getMessageMaxIndexOrNullAsync( contactId ) {
		//TODO: duplicate method?
		if ( this._findContactById( contactId ).status === "failed" ) {
			console.warn( "Trying to get max message index for failed contact" );
			return Rx.Observable.just( null );
		}
		return (
			this._getMulticastOrNullAsync( contactId )
				.flatMap( multicast => multicast
					? multicast.getMessageMaxIndexAsync()
					: Rx.Observable.just( null )
				)
		);
	}

	_pushMessageAsync( contactId, message ) {
		return (
			this.getDetailedContactAsync( contactId )
				.flatMap( contact => {
					if ( !contact.cachedMessages ) {
						contact.cachedMessages = [];
					}
					contact.cachedMessages[ message.index ] = message.toJson();
					contact.forceUpdateDetails();
					return Rx.Observable.combineLatest(
						this.storeContactAsync( contact ),
						this.putMessageToHistoryAsync( message, contact ),
						() => null
					);
				} )
				.flatMap( () => super._pushMessageAsync( contactId, message ) )
		);
	}

	putMessageToHistoryAsync( messageModel, contact ) {
		let { index } = messageModel;
		if ( index !== ( index | 0 ) ) {
			console.warn( "Message is not connected to message connection" );
			return Rx.Observable.just();
		}
		if ( messageModel.isInHistory ) {
			return Rx.Observable.just();
		}
		return this.updateMessageInHistoryAsync( messageModel, contact, true );
	}

	updateMessageInHistoryAsync( messageModel, contact, isNewMessage ) {
		if ( !this._historyContainers[ contact.id ] ) {
			this._historyContainers[ contact.id ] = new Rx.ReplaySubject();
		}
		messageModel.isInHistory = true;
		let historyContainer;
		return (
			this._historyContainers[ contact.id ]
			.tap( hc => { historyContainer = hc; } )
			.tap( () => {
				if ( !contact.cachedMessages || !contact.cachedMessages[ messageModel.index ] ) {
					return;
				}
				contact.cachedMessages[ messageModel.index ] = messageModel.toJson();
				contact.forceUpdateDetails();
				this.storeContactLazily( contact );
			} )
			.flatMap( () =>
				this._setMessageInHistoryAsync( historyContainer, messageModel, isNewMessage )
			)
			// .flatMap( () => {
			// 	//TODO: make single update for all new message data
			// 	if ( !isNewMessage ) {
			// 		return Rx.Observable.just();
			// 	}
			// 	return Rx.Observable.create( observer => {
			// 		historyContainer.getUKeyDataThen()
			// 			.then( ( { lastIndex, lastUKey } ) => {
			// 				this.updateAsync( contactId, {
			// 					mainIndex - need to be updated when p2pIndexes are updated: lastIndex + 1, historyLastUKey: lastUKey
			// 				}, "lazy" ).subscribe( observer );
			// 			} )
			// 	} );
			// } )
			// .tap( () => {
			// 	let subj = this._messagesSubjectPerContact[ contactId ];
			// 	if ( !subj ) {
			// 		return;
			// 	}
			// 	let messages = subj.getValue();
			// 	delete messages[ messageModel.index ];
			// 	subj.onNext( messages );
			// } )
		);
	}

	_setMessageInHistoryAsync( historyContainer, messageModel, isNewMessage ) {
		let { index } = messageModel;
		if ( index < historyContainer._fromIndex ) {
			debugger;
			return Rx.Observable.just();
		}
		if ( isNewMessage && ( historyContainer._lastIndex !== -1 ) && ( index <= historyContainer._lastIndex ) ) {
			console.warn( "Trying to add already exising message to history" );
			return Rx.Observable.just();
		}
		if ( !isNewMessage && index > historyContainer._lastIndex ) {
			console.warn( "Trying to update message before adding to history" );
			return Rx.Observable.just();
		}
		return (
			Transaction.runWithRetriesAsync( transaction => {
				let waitSubj = new Rx.Subject();
				transaction.waitFor( waitSubj, "Adding or updating history container" );
				if ( isNewMessage ) {
					historyContainer.addItemThen(
						index,
						messageModel.toJson(),
						transaction
					).then( () => {
						waitSubj.onCompleted();
					} );
				} else {
					historyContainer.updateItemThen(
						index,
						messageModel.toJson(),
						transaction
					).then( () => {
						waitSubj.onCompleted();
					} );
				}
				if ( messageModel.type === MESSAGE_TYPE.OUTGOING ) {
					historyContainer.startExpire(
						index,
						transaction
					);
				}
			}, "skipQueue" )
		);
	}

	_monitorMessage( contactId, message, onDelete ) {
		if ( !message.ttlBaseTimestamp || !this._findContactById( contactId ) ) {
			return;
		}
		let messageKey = contactId + "_" + message.index;
		if ( this._monitoringMessages[ messageKey ] ) {
			return;
		}
		this._monitoringMessages[ messageKey ] = true;

		let timeService = this._timeService;
		let subscription, timeoutHandler;

		let deleteFunc = () => {
			subscription.dispose();
			onDelete();
			onDelete = null;
		};

		subscription = (
			Rx.Observable.combineLatest(
				this._observeAutocleanTime( contactId ),
				timeService.monitorTimeResync(),//get serverTime-clientTime
				( autoclean, timeDiff ) => ( { autoclean, timeDiff } )
			)
				.subscribe( ( { autoclean, timeDiff } ) => {
					if ( timeoutHandler ) {
						clearTimeout( timeoutHandler );
						timeoutHandler = 0;
					}
					if ( !autoclean ) {
						timeoutHandler = 0;
						return;
					}
					let period = Math.min(
						message.ttlBaseTimestamp
						? Math.max( 0, message.ttlBaseTimestamp + autoclean * 1000 - ( +new Date ) - timeDiff )
						: autoclean * 1000
					, 2147483647 );
					timeoutHandler = setTimeout( deleteFunc, period );
				} )
		);
	}

	_removeMessageFromHistoryContainer( contactId, historyContainer, index ) {
		let historyFromIndex;
		this.getDetailedContactAsync( contactId )
			.flatMap( contact => {
				if ( !contact ) {
					return Rx.Observable.empty();
				}
				historyFromIndex = Math.max( index + 1, contact.historyFromIndex );
				return Rx.Observable.fromPromise( historyContainer.getDKeyDataThen( historyFromIndex ) )
					.map( historyFirstDKey => ( { historyFirstDKey, contact } ) )
				} )
			.flatMap( ( { historyFirstDKey, contact } ) => {
				if ( !contact || ( historyFromIndex === contact.historyFromIndex ) ) {
					return Rx.Observable.empty();
				}
				return this.updateAsync( contactId, {
					historyFromIndex, historyFirstDKey: historyFirstDKey || undefined
				}, "lazy" );
			} ).subscribe();

		Transaction.runWithRetriesAsync( transaction => {
			let waitSubj = new Rx.Subject();
			transaction.waitFor( waitSubj, "Deleting history item" );
			historyContainer.updateItemThen(
				index,
				null,
				transaction
			).then( () => {
				waitSubj.onCompleted();
			} );
		}, "skipQueue" ).subscribe();
	}

	_monitorHistoryMessage( contactId, historySubject, message ) {
		this._historyContainers[ contactId ].delay( 1000 ).subscribe( historyContainer => {
			this._monitorMessage( contactId, message, () => {
				let history = historySubject.getValue() || {};
				let { index } = message;
				if ( !history[ message.index ] ) {
					return;
				}

				delete history[ message.index ];
				historySubject.onNext( history );
				this._removeMessageFromHistoryContainer( contactId, historyContainer, index );

	// Removing current if duplicated
				let messageSubject = this._messagesSubjectPerContact[ contactId ];
				if ( !messageSubject ) {
					return;
				}
				let messages = messageSubject.getValue();
				if ( !~index || !messages[ index ] ) {
					return;
				}
				delete messages[ index ];
				messageSubject.onNext( messages );
			} );
		} );
	}

	_createObservableDataWithHistory( contact ) {
		let initialMessages = null;
		if ( contact.cachedMessages ) {
			initialMessages = Object.create( null );
			for ( let index in contact.cachedMessages ) {
				if ( !contact.cachedMessages[ index ] ) {
					continue;
				}
				let message = ChatMessageBase.fromJson( contact.cachedMessages[ index ] );
				if ( !message ) {
					continue;
				}
				initialMessages[ index ] = message;
			}
		}
		let historySubject = new Rx.BehaviorSubject( initialMessages );
		let subscriptions = [];
		let messagesSubject = new Rx.ReplaySubject( 1 );
		if ( !this._historyContainers[ contact.id ] ) {
			this._historyContainers[ contact.id ] = new Rx.ReplaySubject();
		}
		let subscription = (
			Rx.Observable.combineLatest(
				super.observeMessages( contact ),
					historySubject
						.filter( messages => {
							if ( !messages ) {
								return false;
							}
							_.forEach(
								messages,
								message => {
									message && this._monitorHistoryMessage(
										contact.id,
										historySubject,
										message
									);
								}
							);
							return true;
						} )
						.flatMap( messages =>
							this.filterOutOldMessagesAsync( contact.id, messages )
						)
						,
				this.observeContactList()
					.map( contacts => _.find( contacts, { id: contact.id } ) )
					.flatMap( contact => contact
						? this.getDetailedContactAsync( contact.id )
						: Rx.Observable.just( null )
					)
					.map( contact => contact && contact.historyFromIndex )
					.distinctUntilChanged(),
				Rx.Observable.create( observer => { //For monitoring subscription dispose
					observer.onNext();
					return () => {
						subscriptions.forEach( s => { s.dispose(); } );
						historySubject.onCompleted();
					};
				} ),
				( currentMessages, historyMessages, historyFromIndex ) => {
					let messages = Object.create( null );
					for( let indexStr in currentMessages ) {
						if ( ( indexStr | 0 ) >= ( historyFromIndex | 0 ) ) {
							if ( currentMessages[ indexStr ] ) {
								messages[ indexStr ] = currentMessages[ indexStr ];
							}
						}
					}
					for( let indexStr in historyMessages ) {
						if ( ( indexStr | 0 ) >= ( historyFromIndex | 0 ) ) {
							if ( historyMessages[ indexStr ] ) {
								messages[ indexStr ] = historyMessages[ indexStr ];
							}
						}
					}
					return messages;
				}
			)
		)
		.subscribe( messages => {
			messagesSubject.onNext( messages );
			let indexes2Store = _.sortBy( _.keys( messages ), index => index - 0 ).slice( -10 );
			if ( _.isEqual( indexes2Store, _.keys( contact.cachedMessages ) ) ) {
				return;
			}
			let cachedMessages = Object.create( null );
			for ( let i = 0; i < indexes2Store.length; i++ ) {
				let index = indexes2Store[ i ];
				cachedMessages[ index ] = messages[ index ].toJson();
			}
			cachedMessages = this._applyEditedMessages( cachedMessages );
			this.updateAsync( contact.id, { cachedMessages }, "lazy" ).subscribe();
		} );
		subscriptions.push( subscription );

		return {
			messagesObservable: messagesSubject,
			historySubject,
			subscriptions
		};
	}

	observeMessages( contact ) {
		if ( !this._messagesWithHistoryObservable[ contact.id ] ) {
			this._messagesWithHistoryObservable[ contact.id ] =
				this._createObservableDataWithHistory( contact );
		}
		let { historySubject, messagesObservable } = this._messagesWithHistoryObservable[ contact.id ];
		return (
			messagesObservable
				.map( messages => this._applyEditedMessages( messages ) )
		);
	}

	_applyEditedMessages( messages ) {
		let editedMessages = null;
		let indexes = _.map( _.keys( messages ), sindex => sindex | 0 );
		indexes.sort( ( i1, i2 ) => i1 - i2 );

		for ( let i = 0; i < indexes.length; i++ ) {
			let message = messages[ indexes[ i ] ];
			let {replaceIndex} = message;
			if ( replaceIndex === undefined ) {
				continue;
			}
			editedMessages = editedMessages || _.clone( messages );
			if ( messages[ replaceIndex ] &&
				this._isMessageCanBeReplaced( message, messages[ replaceIndex ] ) ) {
					editedMessages[ replaceIndex ] = message;
			}
			delete editedMessages[ indexes[ i ] ];
		}
		return editedMessages || messages;
	}

	sendEditTextMessageAsync( contact, replaceIndex, text, id ) {
		return this._sendMessageAsync( contact, {
			type: "textEdit",
			text,
			id,
			replaceIndex
		} );
	}

	sendDeleteMessageAsync( contact, replaceIndex, id ) {
		return this._sendMessageAsync( contact, {
			type: "delete",
			id,
			replaceIndex
		} );
	}

	_isMessageCanBeReplaced( message, replaceMessage ) {
		//TODO: security check
		return true;
	}

	clearHistory( ) {
		for( let i = 0; i < this._contacts.length; i++ ) {
			this.clearContactHistory( this._contacts[ i ].id );
		}
	}

	clearContactHistory( contactId ) {
		if ( !this._historyContainers[ contactId ] ) {
			this._historyContainers[ contactId ] = new Rx.ReplaySubject();
		}
		this.getDetailedContactAsync( contactId )
			.map( contact => ( { contact, historyFromIndex: Math.max(
				contact.mainIndex|0,
				contact.mainIndexInaccurate|0,
				contact.historyFromIndex|0
			) } ) )
			.flatMap( ( { contact, historyFromIndex } ) =>
				this._historyContainers[ contact.id ]
					.flatMap( historyContainer => Rx.Observable.fromPromise(
					historyContainer.getDKeyDataThen( historyFromIndex )
				) )
				.map( historyFirstDKey => ( { historyFirstDKey, contact, historyFromIndex } ) )
			).flatMap( ( { historyFirstDKey, contact, historyFromIndex } ) => contact
				? this.updateAsync(
					contactId,
					{ historyFromIndex, historyFirstDKey: historyFirstDKey || undefined },
					"lazy"
				)
				: Rx.Observable.empty()
			)
			.subscribe( () => {
				super.clearContactHistory( contactId );
			} );
	}

	requestMoreHistory( contactId ) {
		if ( !this._messagesWithHistoryObservable[ contactId ] ) {
			throw new Error( "Messages observable not found" );
		}
		if ( !this._historyContainers[ contactId ] ) {
			this._historyContainers[ contactId ] = new Rx.ReplaySubject();
		}
		let data = this._messagesWithHistoryObservable[ contactId ];
		if ( !data ) {
			throw new Error( "Messages observable not found" );
		}

		if ( data.isRequestingHistory || !~data.historyNextToIndex ) {
			return;
		}

		data.isRequestingHistory = true;
		data.subscriptions.push(
		this._historyContainers[ contactId ]
			.flatMap( hc => {
				let toIndexAsync = (
					"historyNextToIndex" in data
					? Rx.Observable.just( data.historyNextToIndex )
					: hc.getMaxIndexAsync()
				);

				return (
					toIndexAsync.flatMap( toIndex => {
						let fromIndex = Math.max( 0, toIndex - 10 );
						return (
							hc.getItemsAsync( fromIndex, toIndex )
								.flatMap( messages => Rx.Observable.create( observer => {
									for( let index in messages ) {
										observer.onNext( { index, value: messages[ index ] } );
									}
									observer.onCompleted();
								} ) )
								.tapOnCompleted( () => {
									data.isRequestingHistory = false;
									data.historyNextToIndex = fromIndex - 1;
								} )
								.concat(
									hc.observeChanges()
								)
								.flatMap( m => !m.value
									? Rx.Observable.empty()
									: this.filterOutOldMessagesAsync( contactId, { [ m.index ]: m.value } )
										.flatMap( messages => {
											if ( messages[ m.index ] ) {
												return Rx.Observable.just(
													{ index: m.index, value: messages[ m.index ] }
												);
											}
											if ( m.value ) {
												this._removeMessageFromHistoryContainer(
													contactId,
													hc,
													m.index | 0
												);
											}
											return Rx.Observable.empty();
										} )
								)
								.map( ( { index, value } ) => {
									let model = ChatMessageBase.fromJson( value );
									return { index, value: model };
								} )
						);
					} )
				);
			} )
			.subscribe( ( { index, value } ) => {
				if ( !value ) {
					return;
				}
				let iv = data.historySubject.getValue() || {};
				iv[ index ] = value;
				data.historySubject.onNext( iv );
			} )
		);
	}

	_onMulticastCreated( contact, multicast ) {
		if ( !this._historyContainers[ contact.id ] ) {
			this._historyContainers[ contact.id ] = new Rx.ReplaySubject();
		}
		let hcSubj = this._historyContainers[ contact.id ];
		let historyContainer = new HistoryContainer(
			configuration.getApiBase(),
			contact.historyContainerId,
			contact.historyFirstDKey,
			contact.seedKey,
			contact.historyLastUKey,
			contact.historyEncryptionSaltKey,
			contact.historyMacSaltKey,
			contact.historyFromIndex,
			contact.mainIndex - 1,
			configuration.getDefaultEncryptionConfig(),
			contact.historyUseUKey
		);
		hcSubj.onNext( historyContainer );
		hcSubj.onCompleted();

		multicast.onSelfMessage( index => {
			this._onSelfMessage( contact, multicast, index );
		} );
		super._onMulticastCreated( contact, multicast );
	}

	_onSelfMessage( contact, multicast, index ) {
		if ( !this._sendingMessages[ contact.id ] ) {
			return;
		}
		let sendingMessages = this._sendingMessages[ contact.id ].getValue();
		if ( !sendingMessages || !sendingMessages.length ) {
			return;
		}
		let json = sendingMessages.splice( 0, 1 )[ 0 ];
		let from = contact.pid.toString( "base64" );
		let msg = { json, from, index };
		this._sendingMessages[ contact.id ].onNext( sendingMessages );

		this._timeService.local2ServerTimeThen( +new Date )
			.then( timestamp => {
				json.timestamp = timestamp;
				this._enqueueProcessMessage(
					msg, contact, multicast, index, from, false, null, {}
				);
			} );
	}

	_sendMessageThen( contact, message ) {
		if ( !this._sendingMessages[ contact.id ] ) {
			this._sendingMessages[ contact.id ] = new Rx.BehaviorSubject( [] );
		}
		let sending = this._sendingMessages[ contact.id ].getValue();
		sending.push( message );
		this._sendingMessages[ contact.id ].onNext( sending );
		return super._sendMessageThen( contact, message );
	}

	observeSendingMessages( contactId ) {
		if ( !this._sendingMessages[ contactId ] ) {
			this._sendingMessages[ contactId ] = new Rx.BehaviorSubject( [] );
		}
		return this._sendingMessages[ contactId ];
	}

	getHistoryContainerAsync( contactId ) {
		if ( !this._historyContainers[ contactId ] ) {
			this._historyContainers[ contactId ] = new Rx.ReplaySubject( 1 );
		}
		return this._historyContainers[ contactId ];
	}

	setReadAll( contact ) {
		super.setReadAll( contact );
		let data = this._messagesWithHistoryObservable[ contact.id ];
		if ( !data ) {
			return;
		}
		if ( !this._historyContainers[ contact.id ] ) {
			this._historyContainers[ contact.id ] = new Rx.ReplaySubject();
		}

		Rx.Observable.combineLatest(
			this._historyContainers[ contact.id ],
			this._timeService.local2ServerTimeAsync( +new Date ),
			( historyContainer, serverTime ) => ( { historyContainer, serverTime } )
		).subscribe( ( { historyContainer, serverTime } ) => {
			if ( !this._findContactById( contact.id ) ) {
				return;//contact deleted
			}
			let messages = data.historySubject.getValue() || {};
			let indexStrs = _.keys( messages );
			let maxIndex = _.maxBy( indexStrs, indexStr => {
				let message = messages[ indexStr ];
				if ( !message.id || ( message.type !== MESSAGE_TYPE.INCOMING ) || message.isRead ) {
					return -1;
				}
				return indexStr | 0;
			} );

			_.forEach( messages, ( message, indexStr ) => {
				if ( message.id && ( message.type === MESSAGE_TYPE.INCOMING ) && !message.isRead ) {
					message.isRead = true;
					if ( ( maxIndex == indexStr ) && this.isSendReadMessage ) {
						this.sendMessageRead( contact.id, message );
					}
				}
				if ( message.ttlBaseTimestamp ) {
					return;
				}
				message.ttlBaseTimestamp = serverTime;
				this.updateMessageInHistoryAsync( message, contact ).subscribe();
				this._monitorHistoryMessage(
					contact.id,
					data.historySubject,
					message
				);
			} );
		} );
	}

	_processMessageJsonAndGetModel( msg, fromContact, multicast, index ) {
		let { json } = msg;
		switch( json.type ) {
			case "textEdit":
				return (
					multicast.isOutgoingParticipant( msg.from )
						? new ChatMessageText(
							MESSAGE_TYPE.OUTGOING,
							json.text,
							json.timestamp,
							msg.from,
							index,
							msg.p2pIndex,
							json.id,
							json.replaceIndex,
							json.replyTo
						)
						: new ChatMessageText(
							MESSAGE_TYPE.INCOMING,
							json.text,
							json.timestamp,
							msg.from,
							index,
							msg.p2pIndex,
							json.id,
							json.replaceIndex,
							json.replyTo
						)
				);
			case "delete":
				return (
					multicast.isOutgoingParticipant( msg.from )
						? new ChatMessageDelete(
							MESSAGE_TYPE.OUTGOING,
							json.timestamp,
							msg.from,
							index,
							msg.p2pIndex,
							json.id,
							json.replaceIndex
						)
						: new ChatMessageDelete(
							MESSAGE_TYPE.INCOMING,
							json.timestamp,
							msg.from,
							index,
							msg.p2pIndex,
							json.id,
							json.replaceIndex
						)
				);
			default:
				return super._processMessageJsonAndGetModel( msg, fromContact, multicast, index );
		}
	}

	_onMessageRead( multicast, pid, json, p2pIndex ) {
		let { id } = json;
		for( let sContactId in this._contactId2MulticastThen ) {
			let contactId = sContactId | 0;
			this._contactId2MulticastThen[ sContactId ].then( m => {
				if ( m !== multicast ) {
					return;
				}
				let contact = this._findContactById( contactId );
				if ( !contact ) {
					return;
				}
				this.observeMessages( contact ).take( 1 )
					.subscribe( messages => {
						let foundIndex = _.findKey( messages, { id } );
						if ( foundIndex === -1 ) {
							return;
						}
						for ( let indexStr in messages ) {
							let message = messages[ indexStr ];
							if ( ( ( indexStr | 0 ) <= foundIndex ) && !message.isRead ) {
								message.isRead = true;
								this.updateMessageInHistoryAsync( message, contact ).subscribe();
							}
						}
					} );
			} );
		}
	}

	_getP2pPrivateHandlers( ) {
		return {
			"read": this._onMessageRead
		};
	}
}

export default ContactWithHistoryService;
