lunes, 14 de julio de 2008

Buffers inteligentes con swap a disco, o "a prueba de tontos"

Entramos en modo paja mental, siguiendo lo que introduje en la anterior entrada "Buffers: por defecto y por exceso".

De forma recurrente, vienen a mi dos necesidades / problemas típicos, que pueden resolverse de forma elegante a la vez. Dos por uno. Mierda para cada uno.

A saber:

  1. Dado un OutputStream sobre el que mi código va escribiendo quiero ser capaz de obtener de forma elegante el InputStream. Sin mucha sintaxis, sin crear programáticamente ficheros temporales o hacer cosas raras con arrays de bytes. Ni con extraños modelos de pipelining. Pensando en un uso single-thread. ¿Por qué? Porque yo lo valgo. Y porque Murphy dice que si tienes un OutputStream invocarás a un método que necesite como parámetro un InputStream. Y viceversa.
  2. Y mucho más importante: ¿cuántas veces hemos dicho lo malos malísimos que son los arrays de bytes? ¿es que no somos conscientes de que potencialmetne son un cúmulo de bichos? ¿que se volverán contra nosotros en cualquier momento del futuro, cuando estamos pensando ya en otras cosas?

Es inimaginable cómo se abusa en general del uso de array de bytes. Y no me refiero sólo a los típicos gazapos de un programador que empieza. Me estoy refiriendo a diseños internos de productos de reconocido prestigio tanto a nivel nacional o internacional (y no señalaré a nadie).

En alguna otra ocasión hemos hablado de que sería maravilloso que la gestión de memoria en Java soportara algún tipo de nuevo modelo de memoria, separada de los heaps y permgens y todo eso, sobre el que pudiera hacerse swap a disco sin problema. Sé que ya es suficientemente compleja (eden, survivor space, permanent) pero sería genial que la propia JVM detecte el uso de arrays / buffers grandes, o que fuera configurable, y que permitiera hacer swapping a disco de todo o parte de él independientemente de en qué zona de memoria resida.

Pero mientras no sea así, también podemos hacerlo nosotros. La encapsulación al poder. Dejo aquí un código fuente que hice en 2003, allá por los tiempos del JDK 1.3 y 1.4 (creo :-), que básicamene encapsula eso: funciona como un ByteArrayOutputStream de esos que gustan tanto a los programadores, pero en cuanto pasa de un cierto tamaño máximo, swapea a disco. Y además permite recoger un InputStream para hacer pipelines dentro de un solo thread. Por cierto, sin regiones de exclusión mutua porque está pensado para ser usado en un solo thread.

Es decir, un mecanismo "automático" para balancear alto rendimiento y alta escalabilidad: rendimiento en el tratamiento de streams cuando estos son de un tamaño aceptable (todo en memoria), y escalabilidad porque limita el abuso de la memoria como recurso en caso de que algún elemento se nos vaya de las manos.

¿Usos? Más de los que pueden parecer en un principio, por ejemplo intercambio de información entre procesos/colaboraciones desacopladas, para tratar optimizadamente peticiones SOAP con grandes ficheros como parámetros, generación y tratamiento de imágenes o PDFs o Excels en servidor, uso como elemento temporal "optimizado" por ejemplo en un GZIPFilter (si el tamaño comprimido es pequeño, se usa buffer en memoria, si es grande se usa buffer en disco para no consumir excesiva), etc.

Ahí va el código fuente:


package org.serverperformance.io;

import java.io.File;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.FileOutputStream;

/**
* Equivale a un ByteArrayOutputStream con las siguientes características especiales:
* - El buffer sólo consume 32 KBytes de heap (configurable, en caso de requerir más,
* utiliza el disco per nunca consume más heap que el marcado.
* - Métodos getInputStream() y writeTo(OutputStream) para facilitar las
* conversiones (típico al invocar métodos de JDBC que requieren InputStreams, mientras
* que la información se va escribiendo en un OutputStream
* - Se añade un método write(InputStream) que escribe en este stream de salida todo el
* contenido del stream de entrada.
* - A diferencia que ByteArrayOutputStream, el método close() también libera recursos en memoria.
*
* El código fuente está basado en java.io.BufferedOutputStream, con los añadidos
* necesarios y quitando las regiones de exclusión mutua.
*
* @author "Server Performance" http://serverperformance.blogspot.com
* @author (Basado en) http://java.sun.com
* @version 1.1, 30/06/03
*/

public class IntelligentBufferedOutputStream extends OutputStream {

private final static int DEFAULT_BUFFER_SIZE = 32*1024;
private final static String TEMP_FILE_PREFIX = "temp_buff_serverperformance_";
private final static String TEMP_FILE_SUFFIX = ".tmp";

/**
* The output stream to the underlying temp file.
*/

protected OutputStream out;
protected InputStream in;

/**
* The underlying temporal file.
*/

protected File tempFile;

protected boolean usedTempFile = false;

/**
* The internal buffer where data is stored.
*/

protected byte buf[];

/**
* The number of valid bytes in the buffer. This value is always
* in the range 0 through buf.length; elements
* buf[0] through buf[count-1] contain valid
* byte data.
*/

protected int count;

protected boolean deleteTempFileOnClose = true;

/**
* Creates a new buffered output stream to write data to the
* specified underlying output stream with a default 32-kbyte
* buffer size.
*
* @author "Server Performance" http://serverperformance.blogspot.com
*/

public IntelligentBufferedOutputStream() throws IOException {
this(DEFAULT_BUFFER_SIZE, true);
}

/**
* Creates a new buffered output stream to write data to the
* specified underlying output stream with a default buffer size.
*
* @author "Server Performance" http://serverperformance.blogspot.com
*/

public IntelligentBufferedOutputStream(boolean deleteTempFileOnClose) throws IOException {
this(DEFAULT_BUFFER_SIZE, deleteTempFileOnClose);
}

/**
* Creates a new buffered output stream to write data to the
* specified underlying output stream with the specified buffer
* size.
*
* @param size the buffer size.
* @exception IllegalArgumentException if size <= 0. * @author "Server Performance" http://serverperformance.blogspot.com
*/

public IntelligentBufferedOutputStream(int size, boolean deleteTempFileOnClose) throws IOException {
this.buf = new byte[size];
this.deleteTempFileOnClose = deleteTempFileOnClose;
}

/** Obtiene un InputStream, bien un BufferedInputStream del fichero temporal, bien
* un ByteArrayInputStream del buffer en memoria (según el caso).
*
* @author "Server Performance" http://serverperformance.blogspot.com
*/

public InputStream getInputStream() throws IOException {
if (in==null) {
if (usedTempFile) {
flushBufferToFile();
in = new BufferedInputStream(new FileInputStream(tempFile),buf.length);
}
else {
in = new ByteArrayInputStream(buf,0,count);
}
}
return in;
}

/** Copia el contenido del buffer (en memoria o en disco) a otro OutputStream.
*
* @author "Server Performance" http://serverperformance.blogspot.com
*/

public void writeTo(OutputStream target) throws IOException {
// El tamaño del buffer temporal... pues el mismo que el original
byte[] tempBuf = new byte[buf.length];
getInputStream();
int readedLen;
while ((readedLen = in.read(tempBuf)) > 0) {
target.write(tempBuf, 0, readedLen);
}
}

/** Flush the internal buffer
*
* @author "Server Performance" http://serverperformance.blogspot.com
*/

private void flushBufferToFile() throws IOException {
// Si no estaba creado, crea el fichero temporal
if (!usedTempFile) {
tempFile = File.createTempFile(TEMP_FILE_PREFIX,TEMP_FILE_SUFFIX,null);
try {
tempFile.deleteOnExit();
}
catch (Throwable ignored) {}
out = new FileOutputStream(tempFile);
usedTempFile = true;
}
if (count > 0) {
// Flush to disk!
out.write(buf, 0, count);
out.flush();;
count = 0;
}
}

/**
* Consumes from de specified InputStream and writes into this buffered output stream.
* ¡OJO! no cierra el inputstream, debe cerrarlo el invocante...
*
* @param in the origin/producer of the data.
* @exception IOException if an I/O error occurs.
*
* @author "Server Performance" http://serverperformance.blogspot.com
* @author (Basado en) http://java.sun.com
*/

public /*synchronized*/ void write(InputStream in) throws IOException {
byte[] bufLectura = new byte[buf.length];
int len;
while ((len = in.read(bufLectura)) > 0) {
write(bufLectura, 0, len);
}
}

/**
* Writes the specified byte to this buffered output stream.
*
* @param b the byte to be written.
* @exception IOException if an I/O error occurs.
*
* @author "Server Performance" http://serverperformance.blogspot.com
* @author (Basado en) http://java.sun.com
*/

public /*synchronized*/ void write(int b) throws IOException {
if (count >= buf.length) {
flushBufferToFile();
}
buf[count++] = (byte)b;
}

/**
* Writes len bytes from the specified byte array
* starting at offset off to this buffered output stream.
*
* Ordinarily this method stores bytes from the given array into this
* stream's buffer, flushing the buffer to the underlying output stream as
* needed. If the requested length is at least as large as this stream's
* buffer, however, then this method will flush the buffer and write the
* bytes directly to the underlying output stream. Thus redundant
* BufferedTempFileOutputStreams will not copy data unnecessarily.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*
* @author "Server Performance" http://serverperformance.blogspot.com
* @author (Basado en) http://java.sun.com
*/

public /*synchronized*/ void write(byte b[], int off, int len) throws IOException {
if (len >= buf.length) {
/* If the request length exceeds the size of the output buffer,
flush the output buffer and then write the data directly.
In this way buffered streams will cascade harmlessly. */
flushBufferToFile();
out.write(b, off, len);
return;
}
if (len > buf.length - count) {
flushBufferToFile();
}
System.arraycopy(b, off, buf, count, len);
count += len;
}

/**
* Flushes this buffered output stream. This forces any buffered
* output bytes to be written out to the underlying output stream.
*
* @exception IOException if an I/O error occurs.
* @see java.io.OutputStream#out
*
* @author "Server Performance" http://serverperformance.blogspot.com
* @author (Basado en) http://java.sun.com
*/

public /*synchronized*/ void flush() throws IOException {
if (usedTempFile) {
flushBufferToFile();
out.flush();
}
}

/**
* Writes b.length bytes to this output stream.
*
* The write method of OutputStream
* calls its write method of three arguments with the
* arguments b, 0, and
* b.length.
*
* Note that this method does not call the one-argument
* write method of its underlying stream with the single
* argument b.
*
* @param b the data to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.OutputStream#write(byte[], int, int)
*
* @author "Server Performance" http://serverperformance.blogspot.com
* @author (Basado en) http://java.sun.com
*/

public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}

/**
* Closes this output stream and releases any system resources
* associated with the stream, including the memory buffer.
*
Also, delete the temporal file if so marked.
*
* @exception IOException if an I/O error occurs.
* @see java.io.OutputStream#flush()
* @see java.io.OutputStream#out
*
* @author "Server Performance" http://serverperformance.blogspot.com
* @author (Basado en) http://java.sun.com
*/

public void close() throws IOException {
// Libera recursos del fichero temporal
if (usedTempFile) {
flushBufferToFile();
try {
//if (out!=null) no puede ser nulo después de flushBufferToFile()
out.close();
out = null;
}
catch (Throwable ignored) {}
try {
if (in!=null)
in.close();
in = null;
}
catch (Throwable ignored) {}
if (deleteTempFileOnClose) {
try {
// Borra el fichero temporal.
// Aunque está marcado para ser borrado al cerrar la JVM,
// por seguridad y optimización de recursos, lo hago ya

tempFile.delete();
}
catch (Throwable ignored) {}
}
if (deleteTempFileOnClose) {
try {
// Borra el fichero temporal.
// Aunque está marcado para ser borrado al cerrar la JVM,
// por seguridad y optimización de recursos, lo hago ya

tempFile.delete();
}
catch (Throwable ignored) {}
}
}
// Libera recursos en memoria
buf = null;
}

/** Returns the total buffer size

* @author "Server Performance" http://serverperformance.blogspot.com
* @author (Basado en) http://java.sun.com
*/

public int size() throws IOException {
if (usedTempFile) {
flushBufferToFile();
return (int)tempFile.length();
}
else {
return count;
}
}

/**
* @author "Server Performance" http://serverperformance.blogspot.com
*/

public void closeAndDelete() throws IOException {
boolean oldFlag = deleteTempFileOnClose;
deleteTempFileOnClose = true;
close();
}

/**
* @author "Server Performance" http://serverperformance.blogspot.com
*/

public void finalize() {
// Por si acaso...
(en las clases de java.io también lo hace...)

try {
flush();
close();
}
catch (Throwable ignored) {}
}

}

En cualquier caso, como muchas otras encapsulaciones, es algo que (al menos a mi) me parece una gran idea pero que en una Organización suele terminar no utilizándose por desconocimiento o por complejidad, o porque es un conocimiento que se pierde, o porque estaba mal documentado, o porque los procedimientos internos de comunicación eran más bien pobres hace unos años.

Es decir, es un código ideal para "núcleos" y componentes muy concretos, y quizás poco más.

En mi anterior vida, desde 2003 hasta hace unos meses, creo que este código se usado sólo en tres situaciones y me he quitado la espinita de encima porque en mi actual vida un invento de encapsulamiento muy similar a este nos ha salvado la vida en un caso muy concreto de invocaciones a módulos desacoplados donde podían darse situaciones de lo más diversa.

:-)

No hay comentarios: