summaryrefslogtreecommitdiff
path: root/src/sync-gridfs-stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-gridfs-stream.c')
-rw-r--r--src/sync-gridfs-stream.c507
1 files changed, 507 insertions, 0 deletions
diff --git a/src/sync-gridfs-stream.c b/src/sync-gridfs-stream.c
new file mode 100644
index 0000000..c9b11ed
--- /dev/null
+++ b/src/sync-gridfs-stream.c
@@ -0,0 +1,507 @@
+/* sync-gridfs-stream.c - libmongo-client GridFS streaming implementation
+ * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** @file src/sync-gridfs-stream.c
+ * MongoDB GridFS Streaming API implementation.
+ */
+
+#include "sync-gridfs-stream.h"
+#include "libmongo-private.h"
+
+#include <unistd.h>
+#include <errno.h>
+
+mongo_sync_gridfs_stream *
+mongo_sync_gridfs_stream_find (mongo_sync_gridfs *gfs,
+ const bson *query)
+{
+ mongo_sync_gridfs_stream *stream;
+ bson *meta = NULL;
+ bson_cursor *c;
+ mongo_packet *p;
+ const guint8 *oid;
+
+ if (!gfs)
+ {
+ errno = ENOTCONN;
+ return NULL;
+ }
+ if (!query)
+ {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ p = mongo_sync_cmd_query (gfs->conn, gfs->ns.files, 0, 0, 1, query, NULL);
+ if (!p)
+ return NULL;
+
+ stream = g_new0 (mongo_sync_gridfs_stream, 1);
+ stream->gfs = gfs;
+ stream->file.type = LMC_GRIDFS_FILE_STREAM_READER;
+
+ mongo_wire_reply_packet_get_nth_document (p, 1, &meta);
+ bson_finish (meta);
+ mongo_wire_packet_free (p);
+
+ c = bson_find (meta, "_id");
+ if (!bson_cursor_get_oid (c, &oid))
+ {
+ bson_cursor_free (c);
+ bson_free (meta);
+ g_free (stream);
+
+ errno = EPROTO;
+ return NULL;
+ }
+ stream->file.id = g_malloc (12);
+ memcpy (stream->file.id, oid, 12);
+
+ bson_cursor_find (c, "length");
+ bson_cursor_get_int64 (c, &stream->file.length);
+ if (stream->file.length == 0)
+ {
+ gint32 i = 0;
+
+ bson_cursor_get_int32 (c, &i);
+ stream->file.length = i;
+ }
+
+ bson_cursor_find (c, "chunkSize");
+ bson_cursor_get_int32 (c, &stream->file.chunk_size);
+
+ bson_cursor_free (c);
+ bson_free (meta);
+
+ if (stream->file.length == 0 ||
+ stream->file.chunk_size == 0)
+ {
+ g_free (stream->file.id);
+ g_free (stream);
+
+ errno = EPROTO;
+ return NULL;
+ }
+
+ return stream;
+}
+
+mongo_sync_gridfs_stream *
+mongo_sync_gridfs_stream_new (mongo_sync_gridfs *gfs,
+ const bson *metadata)
+{
+ mongo_sync_gridfs_stream *stream;
+ bson_cursor *c;
+
+ if (!gfs)
+ {
+ errno = ENOTCONN;
+ return NULL;
+ }
+
+ stream = g_new0 (mongo_sync_gridfs_stream, 1);
+ stream->file.type = LMC_GRIDFS_FILE_STREAM_WRITER;
+ stream->gfs = gfs;
+
+ stream->file.chunk_size = gfs->chunk_size;
+
+ stream->writer.metadata = bson_new_from_data (bson_data (metadata),
+ bson_size (metadata) - 1);
+
+ c = bson_find (metadata, "_id");
+ if (!c)
+ {
+ stream->file.id = mongo_util_oid_new
+ (mongo_connection_get_requestid ((mongo_connection *)gfs->conn));
+ if (!stream->file.id)
+ {
+ bson_free (stream->writer.metadata);
+ g_free (stream);
+
+ errno = EFAULT;
+ return NULL;
+ }
+ bson_append_oid (stream->writer.metadata, "_id", stream->file.id);
+ }
+ else
+ {
+ const guint8 *oid;
+
+ if (!bson_cursor_get_oid (c, &oid))
+ {
+ bson_cursor_free (c);
+ bson_free (stream->writer.metadata);
+ g_free (stream);
+
+ errno = EPROTO;
+ return NULL;
+ }
+
+ stream->file.id = g_malloc (12);
+ memcpy (stream->file.id, oid, 12);
+ }
+ bson_cursor_free (c);
+ bson_finish (stream->writer.metadata);
+
+ stream->writer.buffer = g_malloc (stream->file.chunk_size);
+ stream->writer.checksum = g_checksum_new (G_CHECKSUM_MD5);
+
+ return stream;
+}
+
+static inline gboolean
+_stream_seek_chunk (mongo_sync_gridfs_stream *stream,
+ gint64 chunk)
+{
+ bson *b;
+ mongo_packet *p;
+ bson_cursor *c;
+ bson_binary_subtype subt = BSON_BINARY_SUBTYPE_USER_DEFINED;
+ gboolean r;
+
+ b = bson_new_sized (32);
+ bson_append_oid (b, "files_id", stream->file.id);
+ bson_append_int64 (b, "n", chunk);
+ bson_finish (b);
+
+ p = mongo_sync_cmd_query (stream->gfs->conn,
+ stream->gfs->ns.chunks, 0,
+ 0, 1, b, NULL);
+ bson_free (b);
+
+ bson_free (stream->reader.bson);
+ stream->reader.bson = NULL;
+ stream->reader.chunk.data = NULL;
+
+ mongo_wire_reply_packet_get_nth_document (p, 1, &stream->reader.bson);
+ mongo_wire_packet_free (p);
+ bson_finish (stream->reader.bson);
+
+ c = bson_find (stream->reader.bson, "data");
+ r = bson_cursor_get_binary (c, &subt, &stream->reader.chunk.data,
+ &stream->reader.chunk.size);
+ if (!r || (subt != BSON_BINARY_SUBTYPE_GENERIC &&
+ subt != BSON_BINARY_SUBTYPE_BINARY))
+ {
+ bson_cursor_free (c);
+ bson_free (stream->reader.bson);
+ stream->reader.bson = NULL;
+ stream->reader.chunk.data = NULL;
+
+ errno = EPROTO;
+ return FALSE;
+ }
+ bson_cursor_free (c);
+
+ if (subt == BSON_BINARY_SUBTYPE_BINARY)
+ {
+ stream->reader.chunk.start_offset = 4;
+ stream->reader.chunk.size -= 4;
+ }
+ stream->reader.chunk.offset = 0;
+
+ return TRUE;
+}
+
+gint64
+mongo_sync_gridfs_stream_read (mongo_sync_gridfs_stream *stream,
+ guint8 *buffer,
+ gint64 size)
+{
+ gint64 pos = 0;
+
+ if (!stream)
+ {
+ errno = ENOENT;
+ return -1;
+ }
+ if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER)
+ {
+ errno = EOPNOTSUPP;
+ return -1;
+ }
+ if (!buffer || size <= 0)
+ {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!stream->reader.chunk.data)
+ {
+ if (!_stream_seek_chunk (stream, 0))
+ return -1;
+ }
+
+ while (pos < size && stream->file.offset +
+ stream->reader.chunk.start_offset < stream->file.length)
+ {
+ gint32 csize = stream->reader.chunk.size - stream->reader.chunk.offset;
+
+ if (size - pos < csize)
+ csize = size - pos;
+
+ memcpy (buffer + pos,
+ stream->reader.chunk.data +
+ stream->reader.chunk.start_offset +
+ stream->reader.chunk.offset, csize);
+
+ stream->reader.chunk.offset += csize;
+ stream->file.offset += csize;
+ pos += csize;
+
+ if (stream->reader.chunk.offset + stream->reader.chunk.start_offset >=
+ stream->reader.chunk.size &&
+ stream->file.offset + stream->reader.chunk.start_offset <
+ stream->file.length)
+ {
+ stream->file.current_chunk++;
+ if (!_stream_seek_chunk (stream, stream->file.current_chunk))
+ return -1;
+ }
+ }
+
+ return pos;
+}
+
+static gboolean
+_stream_chunk_write (mongo_sync_gridfs *gfs,
+ const guint8 *oid, gint64 n,
+ const guint8 *buffer, gint32 size)
+{
+ bson *chunk;
+
+ chunk = bson_new_sized (size + 128);
+ bson_append_oid (chunk, "files_id", oid);
+ bson_append_int64 (chunk, "n", n);
+ bson_append_binary (chunk, "data", BSON_BINARY_SUBTYPE_GENERIC,
+ buffer, size);
+ bson_finish (chunk);
+
+ if (!mongo_sync_cmd_insert (gfs->conn, gfs->ns.chunks, chunk, NULL))
+ {
+ int e = errno;
+
+ bson_free (chunk);
+ errno = e;
+ return FALSE;
+ }
+ bson_free (chunk);
+
+ return TRUE;
+}
+
+gboolean
+mongo_sync_gridfs_stream_write (mongo_sync_gridfs_stream *stream,
+ const guint8 *buffer,
+ gint64 size)
+{
+ gint64 pos = 0;
+
+ if (!stream)
+ {
+ errno = ENOENT;
+ return FALSE;
+ }
+ if (stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER)
+ {
+ errno = EOPNOTSUPP;
+ return FALSE;
+ }
+ if (!buffer || size <= 0)
+ {
+ errno = EINVAL;
+ return FALSE;
+ }
+
+ while (pos < size)
+ {
+ gint32 csize = stream->file.chunk_size - stream->writer.buffer_offset;
+
+ if (size - pos < csize)
+ csize = size - pos;
+
+ memcpy (stream->writer.buffer + stream->writer.buffer_offset,
+ buffer + pos, csize);
+ stream->writer.buffer_offset += csize;
+ stream->file.offset += csize;
+ stream->file.length += csize;
+ pos += csize;
+
+ if (stream->writer.buffer_offset == stream->file.chunk_size)
+ {
+ if (!_stream_chunk_write (stream->gfs,
+ stream->file.id,
+ stream->file.current_chunk,
+ stream->writer.buffer,
+ stream->file.chunk_size))
+ return FALSE;
+ g_checksum_update (stream->writer.checksum, stream->writer.buffer,
+ stream->file.chunk_size);
+
+ stream->writer.buffer_offset = 0;
+ stream->file.current_chunk++;
+ }
+ }
+
+ return TRUE;
+}
+
+gboolean
+mongo_sync_gridfs_stream_seek (mongo_sync_gridfs_stream *stream,
+ gint64 pos,
+ gint whence)
+{
+ gint64 real_pos = 0;
+ gint64 chunk;
+ gint32 offs;
+
+ if (!stream)
+ {
+ errno = ENOENT;
+ return FALSE;
+ }
+ if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER)
+ {
+ errno = EOPNOTSUPP;
+ return FALSE;
+ }
+
+ switch (whence)
+ {
+ case SEEK_SET:
+ if (pos == stream->file.offset)
+ return TRUE;
+ if (pos < 0 || pos > stream->file.length)
+ {
+ errno = ERANGE;
+ return FALSE;
+ }
+ real_pos = pos;
+ break;
+ case SEEK_CUR:
+ if (pos + stream->file.offset < 0 ||
+ pos + stream->file.offset > stream->file.length)
+ {
+ errno = ERANGE;
+ return FALSE;
+ }
+ if (pos == 0)
+ return TRUE;
+ real_pos = pos + stream->file.offset;
+ break;
+ case SEEK_END:
+ if (pos > 0 || pos + stream->file.length < 0)
+ {
+ errno = ERANGE;
+ return FALSE;
+ }
+ real_pos = pos + stream->file.length;
+ break;
+ default:
+ errno = EINVAL;
+ return FALSE;
+ }
+
+ chunk = real_pos / stream->file.chunk_size;
+ offs = real_pos % stream->file.chunk_size;
+
+ if (!_stream_seek_chunk (stream, chunk))
+ return FALSE;
+
+ stream->reader.chunk.offset = offs;
+ stream->file.current_chunk = chunk;
+ stream->file.offset = real_pos;
+
+ return TRUE;
+}
+
+gboolean
+mongo_sync_gridfs_stream_close (mongo_sync_gridfs_stream *stream)
+{
+ if (!stream)
+ {
+ errno = ENOENT;
+ return FALSE;
+ }
+
+ if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER &&
+ stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER)
+ {
+ errno = EINVAL;
+ return FALSE;
+ }
+
+ if (stream->file.type == LMC_GRIDFS_FILE_STREAM_WRITER)
+ {
+ bson *meta;
+ gint64 upload_date;
+ GTimeVal tv;
+ gboolean closed = FALSE;
+
+ if (stream->writer.buffer_offset > 0)
+ {
+ closed = _stream_chunk_write (stream->gfs,
+ stream->file.id,
+ stream->file.current_chunk,
+ stream->writer.buffer,
+ stream->writer.buffer_offset);
+
+ if (closed)
+ g_checksum_update (stream->writer.checksum,
+ stream->writer.buffer,
+ stream->writer.buffer_offset);
+ }
+
+ if (closed)
+ {
+ g_get_current_time (&tv);
+ upload_date = (((gint64) tv.tv_sec) * 1000) +
+ (gint64)(tv.tv_usec / 1000);
+
+ /* _id is guaranteed by _stream_new() */
+ meta = bson_new_from_data (bson_data (stream->writer.metadata),
+ bson_size (stream->writer.metadata) - 1);
+ bson_append_int64 (meta, "length", stream->file.length);
+ bson_append_int32 (meta, "chunkSize", stream->file.chunk_size);
+ bson_append_utc_datetime (meta, "uploadDate", upload_date);
+ if (stream->file.length)
+ bson_append_string (meta, "md5",
+ g_checksum_get_string (stream->writer.checksum), -1);
+ bson_finish (meta);
+
+ if (!mongo_sync_cmd_insert (stream->gfs->conn,
+ stream->gfs->ns.files, meta, NULL))
+ {
+ int e = errno;
+
+ bson_free (meta);
+ errno = e;
+ return FALSE;
+ }
+ bson_free (meta);
+ }
+
+ bson_free (stream->writer.metadata);
+ g_checksum_free (stream->writer.checksum);
+ g_free (stream->writer.buffer);
+ }
+ else
+ bson_free (stream->reader.bson);
+
+ g_free (stream->file.id);
+ g_free (stream);
+ return TRUE;
+}