import _ from "lodash";
import ssgCrypto from "ssg.crypto";
import Queue from "promise-queue";

import BinarySource, {CHUNK_SIZE} from "./binary.source.js";

class BinaryStream {
	constructor( getNextBytesCb, size, dispose ) {
		if ( typeof getNextBytesCb !== "function" ) {
			throw new Error( "getNextBytesCb required" );
		}
		if ( ( typeof size !== "number" ) || isNaN( size ) ) {
			throw new Error( "size number required" );
		}
		if ( dispose && ( typeof dispose !== "function" ) ) {
			throw new Error( "dispose must be a function" );
		}
		this._dispose = dispose;
		this._size = size;
		this._position = 0;
		this._getNextBytesCb = getNextBytesCb;
	}

	get position( ) {
		return this._position;
	}

	get size( ) {
		return this._size;
	}

	dispose( ) {
		this._dispose && this._dispose();
	}

	getAllBytesCb( cb ) {
		this.getNextBytesCb( this.size - this.position, cb );
	}

	getNextBytesCb( count, cb ) {
		if ( ( typeof count !== "number" ) || isNaN( count ) ) {
			throw new Error( "count expected to be number" );
		}
		if ( this._position + count > this._size ) {
			throw new Error( "Not enough data to read in binary stream" );
		}
		if ( this._isInProgress ) {
			throw new Error( "Another call in progress" );
		}
		if ( count > 32768 ) {
			console.warn( `Requesting too much data: ${count}` );
		}
		this._isInProgress = true;
		if ( this._pushedBack && ( this._pushedBack.length >= count ) ) {
			this._isInProgress = false;
			this._position += count;
			let buffer = this._pushedBack.slice( 0, count );
			this._pushedBack = this._pushedBack.slice( count );
			cb( buffer );
			return;
		}
		let requestCount;
		if ( this._pushedBack ) {
			requestCount = count - this._pushedBack.length;
		} else {
			requestCount = count;
		}

		let timeout = setTimeout( () => {
			console.error("getNextBytesCb binary stream timeout: " + this + cb );
			debugger;
		}, 5000 );
		// console.log( "Binary stream getNextBytes start: " + timeout );
		this._getNextBytesCb(
			requestCount,
			buffer => {
				// console.log( "Binary stream getNextBytes end: " + timeout );
				clearTimeout( timeout );
				if ( buffer.length !== requestCount ) {
					throw new Error( `Expected ${count} bytes but got ${buffer.length}` );
				}
				this._isInProgress = false;
				this._position += count;
				if ( this._pushedBack ) {
					buffer = Buffer.concat( [ this._pushedBack, buffer ] )
					this._pushedBack = null;
				}
				cb( buffer );
			}
		);
	}

	pushBack( buffer ) {
		if ( this._isInProgress ) {
			throw new Error( "getNextBytes in progress" );
		}
		if ( this._position < buffer.length ) {
			throw new Error( "Push back too much data" );
		}
		this._position -= buffer.length;
		if ( !this._pushedBack ) {
			this._pushedBack = Buffer.concat( [ buffer ] );
			return;
		}
		this._pushedBack = Buffer.concat( [ this._pushedBack, buffer ] );
	}

	skipCb( byteCount, cb ) {
		if ( ( byteCount < 0 ) || ( byteCount % 1 ) || ( typeof byteCount !== "number" ) ) {
			throw new Error( `Invalid byte count: ${byteCount}` );
		}
		this._skipRecursive( byteCount, cb );
	}

	_skipRecursive( byteCount, cb ) {
		if ( !byteCount ) {
			cb();
			return;
		}
		this.getNextBytesCb( Math.min( byteCount, CHUNK_SIZE ), buffer => {
			setTimeout( () => {
				this._skipRecursive( byteCount - buffer.length, cb ); //Stack overflow prevention
			}, 0 );
		} );
	}

	static fromBinarySource( binarySource, monitorCb ) {
		if ( !( binarySource instanceof BinarySource ) ) {
			throw new Error( "BinarySource required" );
		}
		let newStream = new BinaryStream(
			function( count, cb ) {
				if ( monitorCb ) {
					binarySource.getBufferCb( this._position, this._position + count, buffer => {
						monitorCb.call( newStream, buffer );
						cb( buffer );
					} );
					return;
				}
				binarySource.getBufferCb( this._position, this._position + count, cb );
			},
			binarySource.size,
			() => binarySource.dispose()
		);
		return newStream;
	}

	static fromBuffer( buffer ) {
		return BinaryStream.fromBinarySource( BinarySource.fromBuffer( buffer ) );
	}

	static pseudoRandom( size ) {
		return new BinaryStream(
			( count, cb ) => {
				ssgCrypto.createRandomBufferThen( count ).then( cb );
			},
			size
		);
	}

	static fromChunked( getNextChunksCb, chunkSize, chunkCount, dispose ) {
		if ( ( typeof chunkSize !== "number" ) || isNaN( chunkSize ) ) {
			throw new Error( "chunkSize number required" );
		}
		if ( ( typeof chunkCount !== "number" ) || isNaN( chunkCount ) ) {
			throw new Error( "chunkCount number required" );
		}

		let prevData = new Buffer( 0 );

		return new BinaryStream(
			( count, cb ) => {
				if ( count <= prevData.length ) {
					let buffer = prevData.slice( 0, count );
					prevData = prevData.slice( count );
					cb( buffer );
					return;
				}
				let resultBuffer = new Buffer( count );
				prevData.copy( resultBuffer );
				getNextChunksCb(
					( ( count - prevData.length + chunkSize - 1 ) / chunkSize ) | 0,
					buffer => {
						buffer.copy( resultBuffer, prevData.length );
						prevData = buffer.slice( count - prevData.length );
						cb( resultBuffer );
					}
				);
			},
			chunkSize * chunkCount,
			dispose
		);
	}

	static createMonitoredStream( baseStream, monitorCb ) {
		let newStream = new BinaryStream(
			( byteCount, cb ) => {
				baseStream.getNextBytesCb( byteCount, buffer => {
					monitorCb.call( newStream, buffer );
					cb( buffer );
				} );
			},
			baseStream.size,
			() => {
				baseStream.dispose();
			}
		);
		return newStream;
	}

	static createConcat( ...streams ) {
		let currentStreamIndex = 0;
		let resultingStream;
		let getDataRecursive = ( byteCount, cb ) => {
			let stream = streams[ currentStreamIndex ];
			let rs = resultingStream;
			stream.getNextBytesCb(
				Math.min( byteCount, stream.size - stream.position ),
				buffer => {
					if ( stream.position === stream.size ) {
						currentStreamIndex++;
					}
					if ( byteCount === buffer.length ) {
						cb( buffer );
						return;
					}
					getDataRecursive(
						byteCount - buffer.length,
						newBuffer => {
							cb( Buffer.concat( [ buffer, newBuffer ] ) );
						}
					);
				}
			);
		};

		return resultingStream = new BinaryStream(
			getDataRecursive,
			_.reduce( streams, ( acc, stream ) => acc + stream.size, 0 ),
			() => {
				_.forEach( streams, stream => stream.dispose() );
			}
		);
	}

	static fromFile( file, onError, onDispose ) {
		return BinaryStream.fromBinarySource(
			BinarySource.fromFile( file, onError, onDispose )
		);
	}

	static createForwardPreRead( baseSource, readChunkSize, bufferMaxSize, concurency, monitorCb ) {
		let buffersSubj = new Rx.BehaviorSubject( [] );
		let readQueue = new Queue();
		let readCount = 0;
		let position = 0;
		buffersSubj.subscribe( bufs => {
			if ( readCount >= concurency ) {
				return;
			}
			let bufsLength = _.sumBy( bufs, "length" );
			let readLength = Math.min(
				bufferMaxSize - bufsLength,
				baseSource.size - position,
				readChunkSize
			);
			if ( readLength <= 0 ) {
				return;
			}
			readCount++;
			let prevPosition = position;
			let newPosition = position += readLength;
			let data = { length: readLength };
			let buffers = buffersSubj.getValue();
			buffers.push( data );
			buffersSubj.onNext( buffers );
			baseSource.getBufferCb( prevPosition, newPosition, buffer => {
				data.content = buffer;
				readCount--;
				if ( monitorCb ) {
						monitorCb.call( { size: baseSource.size, position: newPosition }, buffer );
				}
				buffersSubj.onNext( buffersSubj.getValue() );
			} );
		} );

		return new BinaryStream( ( count, cb ) => {
			if ( bufferMaxSize < count ) {
				bufferMaxSize = count;
				buffersSubj.onNext( buffersSubj.getValue() );
			}
			readQueue.add( () => new Promise( ( resolve, reject ) => {
				buffersSubj
					.filter( buffers => _.sumBy(
						_.takeWhile( buffers, "content" ),
						"length"
					) >= count )
					.take( 1 )
					.subscribe( buffers => {
						let buf2Get = [];
						let totalGotSize = 0;
						while( totalGotSize < count ) {
							totalGotSize += buffers[ 0 ].length;
							buf2Get.push( buffers[ 0 ].content );
							buffers.splice( 0, 1 );
						}
						let concated = Buffer.concat( buf2Get );
						let result = concated.slice( 0, count );
						let left = concated.slice( count );
						if ( left.length > 0 ) {
							buffers.unshift( {
								length: left.length,
								content: left
							} );
						}
						buffersSubj.onNext( buffers );
						cb( result );
						resolve();
					} );
			} ) );
		}, baseSource.size, () => {
			baseSource.dispose();
		} );
	}
}

export default BinaryStream;
