Tuesday, 18 August 2015

A piped stream implementation

Java IO has the concept of PipedInputStream and PipedOutputStream from JDK 1.0 onwards, to be used as communication channels amongst threads in the same JVM. You write to the OutputStream from a particular thread, and read from the InputStream from another thread, thus creating a 'pipe' of communication.

Creating a pipe using Java IO is done via the PipedOutputStream and PipedInputStream classes. A PipedInputStream should be connected to a PipedOutputStream. The data written to the PipedOutputStream by one thread can be read from the connected PipedInputStream by another thread. 

Under the hood, the PipedInputStream has a synchronized receive(int) and receive(byte[], int, int) method which work on a consumer-producer model to that of the connected PipedOutputStream. As in any producer-consumer scenario there is a wait-notify system in place.

We try to improve this scenario, by using a ring buffer in the form of an ArrayBlockingQueue and thus leveraging the java.util.concurrent improvements.


public class NPipedOutputStream extends OutputStream implements InterruptibleChannel {
    private BlockingQueue<Byte> circularBuffer;
    private int bufferSize = Constants.READ_CHUNK_SIZE_BYTES;
    private long position = 0;
    private boolean connected;
   
    public long getPosition() {
        return position;
    }
    /**
     *
     */
    public NPipedOutputStream()
    {
       
    }
    /**
     *
     * @param sink
     * @param size
     * @throws IOException
     */
    public NPipedOutputStream(NPipedInputStream sink, int size) throws IOException
    {
        bufferSize = size;
        connect(sink);
    }
    public NPipedOutputStream(NPipedInputStream sink) throws IOException {
        connect(sink);
    }
    /**
     * Connect to sink
     * @param sink
     * @throws IOException
     */
    public void connect(NPipedInputStream sink) throws IOException {
        if (sink == null) {
            throw new NullPointerException();
        } else if (sink.connected) {
            throw new IOException("SynchronousInputStream provided is already connected to a source ");
        }
        if(connected)
            throw new IOException("SynchronousOutputStream already connected to a sink SynchronousInputStream");
       
        circularBuffer = new ArrayBlockingQueue<>(bufferSize);
        sink.circularBuffer = this.circularBuffer;
        sink.setConnected(true);
        connected = true;
    }
    @Override
    public void write(int b) throws IOException {
        writeByte((byte) b);
    }
    private void writeByte(byte b) throws IOException
    {
        try {
            writeToChannel(b);
        } catch (InterruptedException e) {
            throw new InterruptedIOException();
        }
    }
    public boolean isBlocked()
    {
        while(!b1.compareAndSet(false, true));
        try {
            return writer != null && writer.isAlive();
        } finally {
            b1.compareAndSet(true, false);
        }
       
    }
    private Thread writer;
    /**
     * blocking call
     * @throws InterruptedException
     */
    private void writeToChannel(byte b) throws InterruptedException {
        try {
           
            writer = Thread.currentThread();
            circularBuffer.put(b);
            position++;
           
        } finally {
            while(!b1.compareAndSet(false, true));
            writer = null;
            b1.compareAndSet(true, false);
        }
       
    }
    public int getBufferSize() {
        return bufferSize;
    }
    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }
    private final AtomicBoolean b1 = new AtomicBoolean();
    public void close() throws IOException {
        while(!b1.compareAndSet(false, true));
        if(writer != null)
        {
            writer.interrupt();
        }
        b1.compareAndSet(true, false);
        closed = true;       
    }
    private boolean closed = false;
    @Override
    public boolean isOpen() {
        return !closed;
    }
}


public class NPipedInputStream extends InputStream implements InterruptibleChannel {
    BlockingQueue<Byte> circularBuffer;
    boolean connected = false;
    void setConnected(boolean connected) {
        this.connected = connected;
    }
    public NPipedInputStream()
    {
       
    }
    NPipedInputStream(BlockingQueue<Byte> commChannel)
    {
        this.circularBuffer = commChannel;
    }
    @Override
    public int read() throws IOException {
        if(!connected)
            throw new IOException("SynchronousInputStream not connected to any source");
        return readByte();
    }
    private byte readByte() throws InterruptedIOException
    {
        try {
            return readFromChannel();
        } catch (InterruptedException e) {
            throw new InterruptedIOException();
        }
    }
    public boolean isBlocked()
    {
        while(!b1.compareAndSet(false, true));
        try {
            return reader != null && reader.isAlive();
        } finally {
            b1.compareAndSet(true, false);
        }
       
    }
    private final AtomicBoolean b1 = new AtomicBoolean();
    private Thread reader;
    private Byte readFromChannel() throws InterruptedException
    {
        Byte bytes;
        try {
            reader = Thread.currentThread();
            bytes = circularBuffer.take();
        } finally {
            while(!b1.compareAndSet(false, true));
            reader = null;
            b1.compareAndSet(true, false);
        }
        return bytes;
    }
    private boolean closed = false;
    public void close() throws IOException {
        while(!b1.compareAndSet(false, true));
        if(reader != null)
        {
            reader.interrupt();
        }
        b1.compareAndSet(true, false);
        closed = true;
    }
    @Override
    public boolean isOpen() {
        return !closed;
    }
}