diff options
author | Jörg Frings-Fürst <debian@jff-webhosting.net> | 2014-12-02 10:06:21 +0100 |
---|---|---|
committer | Jörg Frings-Fürst <debian@jff-webhosting.net> | 2014-12-02 10:06:21 +0100 |
commit | fd841e416881cc0392e61ec312c1870f3a0004bd (patch) | |
tree | 8357ba56e79d614ba57f722e7878b853591dc339 /src |
Initial import of libmongo-client version 0.1.8-2
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 51 | ||||
-rw-r--r-- | src/bson.c | 1251 | ||||
-rw-r--r-- | src/bson.h | 856 | ||||
-rw-r--r-- | src/compat.c | 108 | ||||
-rw-r--r-- | src/compat.h | 50 | ||||
-rw-r--r-- | src/libmongo-client.pc.in | 12 | ||||
-rw-r--r-- | src/libmongo-client.ver | 163 | ||||
-rw-r--r-- | src/libmongo-macros.h | 51 | ||||
-rw-r--r-- | src/libmongo-private.h | 276 | ||||
-rw-r--r-- | src/mongo-client.c | 331 | ||||
-rw-r--r-- | src/mongo-client.h | 116 | ||||
-rw-r--r-- | src/mongo-sync-cursor.c | 118 | ||||
-rw-r--r-- | src/mongo-sync-cursor.h | 103 | ||||
-rw-r--r-- | src/mongo-sync-pool.c | 269 | ||||
-rw-r--r-- | src/mongo-sync-pool.h | 133 | ||||
-rw-r--r-- | src/mongo-sync.c | 2155 | ||||
-rw-r--r-- | src/mongo-sync.h | 640 | ||||
-rw-r--r-- | src/mongo-utils.c | 197 | ||||
-rw-r--r-- | src/mongo-utils.h | 121 | ||||
-rw-r--r-- | src/mongo-wire.c | 645 | ||||
-rw-r--r-- | src/mongo-wire.h | 433 | ||||
-rw-r--r-- | src/mongo.h | 49 | ||||
-rw-r--r-- | src/sync-gridfs-chunk.c | 329 | ||||
-rw-r--r-- | src/sync-gridfs-chunk.h | 134 | ||||
-rw-r--r-- | src/sync-gridfs-stream.c | 507 | ||||
-rw-r--r-- | src/sync-gridfs-stream.h | 141 | ||||
-rw-r--r-- | src/sync-gridfs.c | 345 | ||||
-rw-r--r-- | src/sync-gridfs.h | 193 |
28 files changed, 9777 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am new file mode 100644 index 0000000..243cb84 --- /dev/null +++ b/src/Makefile.am @@ -0,0 +1,51 @@ +LMC_CURRENT = 5 +LMC_REVISION = 0 +LMC_AGE = 5 + +lib_LTLIBRARIES = libmongo-client.la +libmongo_client_la_LIBADD = @GLIB_LIBS@ @OPENSSL_LIBS@ +libmongo_client_la_CFLAGS = @GLIB_CFLAGS@ @OPENSSL_CFLAGS@ +libmongo_client_la_LDFLAGS = -version-info ${LMC_CURRENT}:${LMC_REVISION}:${LMC_AGE} + +libmongo_client_la_SOURCES = \ + compat.c compat.h \ + bson.c bson.h \ + mongo-wire.c mongo-wire.h \ + mongo-client.c mongo-client.h \ + mongo-utils.c mongo-utils.h \ + mongo-sync.c mongo-sync.h \ + mongo-sync-cursor.c mongo-sync-cursor.h \ + mongo-sync-pool.c mongo-sync-pool.h \ + sync-gridfs.c sync-gridfs.h \ + sync-gridfs-chunk.c sync-gridfs-chunk.h \ + sync-gridfs-stream.c sync-gridfs-stream.h \ + mongo.h \ + libmongo-private.h libmongo-macros.h + +libmongo_client_includedir = $(includedir)/mongo-client +libmongo_client_include_HEADERS = \ + bson.h mongo-wire.h mongo-client.h mongo-utils.h \ + mongo-sync.h mongo-sync-cursor.h mongo-sync-pool.h \ + sync-gridfs.h sync-gridfs-chunk.h sync-gridfs-stream.h \ + mongo.h + +if HAVE_VERSIONING +libmongo_client_la_LDFLAGS += \ + -Wl,--version-script,$(top_srcdir)/src/libmongo-client.ver +libmongo_client_la_DEPENDENCIES = ${top_srcdir}/src/libmongo-client.ver +endif + +pkgconfigdir = $(libdir)/pkgconfig +pkgconfig_DATA = libmongo-client.pc + +CLEANFILES = *.gcda *.gcno *.gcov +CLEANDIRS = coverage + +coverage: + $(AM_V_GEN) + $(AM_V_at) SOURCES="$(SOURCES)" builddir="$(builddir)" srcdir="$(srcdir)" top_srcdir="$(top_srcdir)" $(top_srcdir)/tests/coverage.sh + +clean-local: + -test -z "$(CLEANDIRS)" || rm -rf "$(CLEANDIRS)" + +.PHONY: coverage diff --git a/src/bson.c b/src/bson.c new file mode 100644 index 0000000..845f6de --- /dev/null +++ b/src/bson.c @@ -0,0 +1,1251 @@ +/* bson.c - libmongo-client's BSON implementation + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/bson.c + * Implementation of the BSON API. + */ + +#include <glib.h> +#include <errno.h> +#include <string.h> +#include <stdarg.h> + +#include "bson.h" +#include "libmongo-macros.h" +#include "libmongo-private.h" + +/** @internal BSON cursor structure. + */ +struct _bson_cursor +{ + const bson *obj; /**< The BSON object this is a cursor for. */ + const gchar *key; /**< Pointer within the BSON object to the + current key. */ + size_t pos; /**< Position within the BSON object, pointing at the + element type. */ + size_t value_pos; /**< The start of the value within the BSON + object, pointing right after the end of the + key. */ +}; + +/** @internal Append a byte to a BSON stream. + * + * @param b is the BSON stream to append to. + * @param byte is the byte to append. + */ +static inline void +_bson_append_byte (bson *b, const guint8 byte) +{ + b->data = g_byte_array_append (b->data, &byte, sizeof (byte)); +} + +/** @internal Append a 32-bit integer to a BSON stream. + * + * @param b is the BSON stream to append to. + * @param i is the integer to append. + */ +static inline void +_bson_append_int32 (bson *b, const gint32 i) +{ + b->data = g_byte_array_append (b->data, (const guint8 *)&i, sizeof (gint32)); +} + +/** @internal Append a 64-bit integer to a BSON stream. + * + * @param b is the BSON stream to append to. + * @param i is the integer to append. + */ +static inline void +_bson_append_int64 (bson *b, const gint64 i) +{ + b->data = g_byte_array_append (b->data, (const guint8 *)&i, sizeof (gint64)); +} + +/** @internal Append an element header to a BSON stream. + * + * The element header is a single byte, signaling the type of the + * element, followed by a NULL-terminated C string: the key (element) + * name. + * + * @param b is the BSON object to append to. + * @param type is the element type to append. + * @param name is the key name. + * + * @returns TRUE on success, FALSE otherwise. + */ +static inline gboolean +_bson_append_element_header (bson *b, bson_type type, const gchar *name) +{ + if (!name || !b) + return FALSE; + + if (b->finished) + return FALSE; + + _bson_append_byte (b, (guint8) type); + b->data = g_byte_array_append (b->data, (const guint8 *)name, + strlen (name) + 1); + + return TRUE; +} + +/** @internal Append a string-like element to a BSON object. + * + * There are a few string-like elements in the BSON spec that differ + * only in type, not in structure. This convenience function is used + * to append them with the appropriate type. + * + * @param b is the BSON object to append to. + * @param type is the string-like type to append. + * @param name is the key name. + * @param val is the value to append. + * @param length is the length of the value. + * + * @note Passing @a -1 as length will use the full length of @a + * val. + * + * @returns TRUE on success, FALSE otherwise. + */ +static gboolean +_bson_append_string_element (bson *b, bson_type type, const gchar *name, + const gchar *val, gint32 length) +{ + size_t len; + + if (!val || !length || length < -1) + return FALSE; + + len = (length != -1) ? (size_t)length + 1: strlen (val) + 1; + + if (!_bson_append_element_header (b, type, name)) + return FALSE; + + _bson_append_int32 (b, GINT32_TO_LE (len)); + + b->data = g_byte_array_append (b->data, (const guint8 *)val, len - 1); + _bson_append_byte (b, 0); + + return TRUE; +} + +/** @internal Append a document-like element to a BSON object. + * + * Arrays and documents are both similar, and differ very little: + * different type, and arrays have restrictions on key names (which + * are not enforced by this library). + * + * This convenience function can append both types. + * + * @param b is the BSON object to append to. + * @param type is the document-like type to append. + * @param name is the key name. + * @param doc is the document-like object to append. + * + * @note The @a doc must be a finished BSON object. + * + * @returns TRUE on success, FALSE otherwise. + */ +static gboolean +_bson_append_document_element (bson *b, bson_type type, const gchar *name, + const bson *doc) +{ + if (bson_size (doc) < 0) + return FALSE; + + if (!_bson_append_element_header (b, type, name)) + return FALSE; + + b->data = g_byte_array_append (b->data, bson_data (doc), bson_size (doc)); + return TRUE; +} + +/** @internal Append a 64-bit integer to a BSON object. + * + * @param b is the BSON object to append to. + * @param type is the int64-like type to append. + * @param name is the key name. + * @param i is the 64-bit value to append. + * + * @returns TRUE on success, FALSE otherwise. + */ +static inline gboolean +_bson_append_int64_element (bson *b, bson_type type, const gchar *name, + gint64 i) +{ + if (!_bson_append_element_header (b, type, name)) + return FALSE; + + _bson_append_int64 (b, GINT64_TO_LE (i)); + return TRUE; +} + +/******************** + * Public interface * + ********************/ + +const gchar * +bson_type_as_string (bson_type type) +{ + switch (type) + { + case BSON_TYPE_NONE: + return "BSON_TYPE_NONE"; + case BSON_TYPE_DOUBLE: + return "BSON_TYPE_DOUBLE"; + case BSON_TYPE_STRING: + return "BSON_TYPE_STRING"; + case BSON_TYPE_DOCUMENT: + return "BSON_TYPE_DOCUMENT"; + case BSON_TYPE_ARRAY: + return "BSON_TYPE_ARRAY"; + case BSON_TYPE_BINARY: + return "BSON_TYPE_BINARY"; + case BSON_TYPE_UNDEFINED: + return "BSON_TYPE_UNDEFINED"; + case BSON_TYPE_OID: + return "BSON_TYPE_OID"; + case BSON_TYPE_BOOLEAN: + return "BSON_TYPE_BOOLEAN"; + case BSON_TYPE_UTC_DATETIME: + return "BSON_TYPE_UTC_DATETIME"; + case BSON_TYPE_NULL: + return "BSON_TYPE_NULL"; + case BSON_TYPE_REGEXP: + return "BSON_TYPE_REGEXP"; + case BSON_TYPE_DBPOINTER: + return "BSON_TYPE_DBPOINTER"; + case BSON_TYPE_JS_CODE: + return "BSON_TYPE_JS_CODE"; + case BSON_TYPE_SYMBOL: + return "BSON_TYPE_SYMBOL"; + case BSON_TYPE_JS_CODE_W_SCOPE: + return "BSON_TYPE_JS_CODE_W_SCOPE"; + case BSON_TYPE_INT32: + return "BSON_TYPE_INT32"; + case BSON_TYPE_TIMESTAMP: + return "BSON_TYPE_TIMESTAMP"; + case BSON_TYPE_INT64: + return "BSON_TYPE_INT64"; + case BSON_TYPE_MIN: + return "BSON_TYPE_MIN"; + case BSON_TYPE_MAX: + return "BSON_TYPE_MAX"; + default: + return NULL; + } +} + +bson * +bson_new (void) +{ + return bson_new_sized (0); +} + +bson * +bson_new_sized (gint32 size) +{ + bson *b = g_new0 (bson, 1); + + b->data = g_byte_array_sized_new (size + sizeof (gint32) + sizeof (guint8)); + _bson_append_int32 (b, 0); + + return b; +} + +bson * +bson_new_from_data (const guint8 *data, gint32 size) +{ + bson *b; + + if (!data || size <= 0) + return NULL; + + b = g_new0 (bson, 1); + b->data = g_byte_array_sized_new (size + sizeof (guint8)); + b->data = g_byte_array_append (b->data, data, size); + + return b; +} + +/** @internal Add a single element of any type to a BSON object. + * + * Used internally by bson_build() and bson_build_full(), this + * function adds a single element of any supported type to the target + * BSON object. + * + * @param b is the target BSON object. + * @param type is the element type to add. + * @param name is the key name. + * @param free_after signals whether to free the values after adding + * them. + * @param ap is the list of remaining parameters. + * + * @returns TRUE in @a single_result on success, FALSE otherwise. + */ +#define _bson_build_add_single(b,type,name,free_after,ap) \ + { \ + single_result = TRUE; \ + switch (type) \ + { \ + case BSON_TYPE_NONE: \ + case BSON_TYPE_UNDEFINED: \ + case BSON_TYPE_DBPOINTER: \ + single_result = FALSE; \ + break; \ + case BSON_TYPE_MIN: \ + case BSON_TYPE_MAX: \ + default: \ + single_result = FALSE; \ + break; \ + case BSON_TYPE_DOUBLE: \ + { \ + gdouble d = (gdouble)va_arg (ap, gdouble); \ + bson_append_double (b, name, d); \ + break; \ + } \ + case BSON_TYPE_STRING: \ + { \ + gchar *s = (gchar *)va_arg (ap, gpointer); \ + gint32 l = (gint32)va_arg (ap, gint32); \ + bson_append_string (b, name, s, l); \ + if (free_after) \ + g_free (s); \ + break; \ + } \ + case BSON_TYPE_DOCUMENT: \ + { \ + bson *d = (bson *)va_arg (ap, gpointer); \ + if (free_after && bson_size (d) < 0) \ + bson_finish (d); \ + bson_append_document (b, name, d); \ + if (free_after) \ + bson_free (d); \ + break; \ + } \ + case BSON_TYPE_ARRAY: \ + { \ + bson *d = (bson *)va_arg (ap, gpointer); \ + if (free_after && bson_size (d) < 0) \ + bson_finish (d); \ + bson_append_array (b, name, d); \ + if (free_after) \ + bson_free (d); \ + break; \ + } \ + case BSON_TYPE_BINARY: \ + { \ + bson_binary_subtype s = \ + (bson_binary_subtype)va_arg (ap, guint); \ + guint8 *d = (guint8 *)va_arg (ap, gpointer); \ + gint32 l = (gint32)va_arg (ap, gint32); \ + bson_append_binary (b, name, s, d, l); \ + if (free_after) \ + g_free (d); \ + break; \ + } \ + case BSON_TYPE_OID: \ + { \ + guint8 *oid = (guint8 *)va_arg (ap, gpointer); \ + bson_append_oid (b, name, oid); \ + if (free_after) \ + g_free (oid); \ + break; \ + } \ + case BSON_TYPE_BOOLEAN: \ + { \ + gboolean v = (gboolean)va_arg (ap, guint); \ + bson_append_boolean (b, name, v); \ + break; \ + } \ + case BSON_TYPE_UTC_DATETIME: \ + { \ + gint64 ts = (gint64)va_arg (ap, gint64); \ + bson_append_utc_datetime (b, name, ts); \ + break; \ + } \ + case BSON_TYPE_NULL: \ + { \ + bson_append_null (b, name); \ + break; \ + } \ + case BSON_TYPE_REGEXP: \ + { \ + gchar *r = (gchar *)va_arg (ap, gpointer); \ + gchar *o = (gchar *)va_arg (ap, gpointer); \ + bson_append_regex (b, name, r, o); \ + if (free_after) \ + { \ + g_free (r); \ + g_free (o); \ + } \ + break; \ + } \ + case BSON_TYPE_JS_CODE: \ + { \ + gchar *s = (gchar *)va_arg (ap, gpointer); \ + gint32 l = (gint32)va_arg (ap, gint32); \ + bson_append_javascript (b, name, s, l); \ + if (free_after) \ + g_free (s); \ + break; \ + } \ + case BSON_TYPE_SYMBOL: \ + { \ + gchar *s = (gchar *)va_arg (ap, gpointer); \ + gint32 l = (gint32)va_arg (ap, gint32); \ + bson_append_symbol (b, name, s, l); \ + if (free_after) \ + g_free (s); \ + break; \ + } \ + case BSON_TYPE_JS_CODE_W_SCOPE: \ + { \ + gchar *s = (gchar *)va_arg (ap, gpointer); \ + gint32 l = (gint32)va_arg (ap, gint32); \ + bson *scope = (bson *)va_arg (ap, gpointer); \ + if (free_after && bson_size (scope) < 0) \ + bson_finish (scope); \ + bson_append_javascript_w_scope (b, name, s, l, scope); \ + if (free_after) \ + bson_free (scope); \ + break; \ + } \ + case BSON_TYPE_INT32: \ + { \ + gint32 l = (gint32)va_arg (ap, gint32); \ + bson_append_int32 (b, name, l); \ + break; \ + } \ + case BSON_TYPE_TIMESTAMP: \ + { \ + gint64 ts = (gint64)va_arg (ap, gint64); \ + bson_append_timestamp (b, name, ts); \ + break; \ + } \ + case BSON_TYPE_INT64: \ + { \ + gint64 l = (gint64)va_arg (ap, gint64); \ + bson_append_int64 (b, name, l); \ + break; \ + } \ + } \ + } + +bson * +bson_build (bson_type type, const gchar *name, ...) +{ + va_list ap; + bson_type t; + const gchar *n; + bson *b; + gboolean single_result; + + b = bson_new (); + va_start (ap, name); + _bson_build_add_single (b, type, name, FALSE, ap); + + if (!single_result) + { + bson_free (b); + va_end (ap); + return NULL; + } + + while ((t = (bson_type)va_arg (ap, gint))) + { + n = (const gchar *)va_arg (ap, gpointer); + _bson_build_add_single (b, t, n, FALSE, ap); + if (!single_result) + { + bson_free (b); + va_end (ap); + return NULL; + } + } + va_end (ap); + + return b; +} + +bson * +bson_build_full (bson_type type, const gchar *name, gboolean free_after, ...) +{ + va_list ap; + bson_type t; + const gchar *n; + gboolean f; + bson *b; + gboolean single_result; + + b = bson_new (); + va_start (ap, free_after); + _bson_build_add_single (b, type, name, free_after, ap); + if (!single_result) + { + bson_free (b); + va_end (ap); + return NULL; + } + + while ((t = (bson_type)va_arg (ap, gint))) + { + n = (const gchar *)va_arg (ap, gpointer); + f = (gboolean)va_arg (ap, gint); + _bson_build_add_single (b, t, n, f, ap); + if (!single_result) + { + bson_free (b); + va_end (ap); + return NULL; + } + } + va_end (ap); + + return b; +} + +gboolean +bson_finish (bson *b) +{ + gint32 *i; + + if (!b) + return FALSE; + + if (b->finished) + return TRUE; + + _bson_append_byte (b, 0); + + i = (gint32 *) (&b->data->data[0]); + *i = GINT32_TO_LE ((gint32) (b->data->len)); + + b->finished = TRUE; + + return TRUE; +} + +gint32 +bson_size (const bson *b) +{ + if (!b) + return -1; + + if (b->finished) + return b->data->len; + else + return -1; +} + +const guint8 * +bson_data (const bson *b) +{ + if (!b) + return NULL; + + if (b->finished) + return b->data->data; + else + return NULL; +} + +gboolean +bson_reset (bson *b) +{ + if (!b) + return FALSE; + + b->finished = FALSE; + g_byte_array_set_size (b->data, 0); + _bson_append_int32 (b, 0); + + return TRUE; +} + +void +bson_free (bson *b) +{ + if (!b) + return; + + if (b->data) + g_byte_array_free (b->data, TRUE); + g_free (b); +} + +gboolean +bson_validate_key (const gchar *key, gboolean forbid_dots, + gboolean no_dollar) +{ + if (!key) + { + errno = EINVAL; + return FALSE; + } + errno = 0; + + if (no_dollar && key[0] == '$') + return FALSE; + + if (forbid_dots && strchr (key, '.') != NULL) + return FALSE; + + return TRUE; +} + +/* + * Append elements + */ + +gboolean +bson_append_double (bson *b, const gchar *name, gdouble val) +{ + gdouble d = GDOUBLE_TO_LE (val); + + if (!_bson_append_element_header (b, BSON_TYPE_DOUBLE, name)) + return FALSE; + + b->data = g_byte_array_append (b->data, (const guint8 *)&d, sizeof (val)); + return TRUE; +} + +gboolean +bson_append_string (bson *b, const gchar *name, const gchar *val, + gint32 length) +{ + return _bson_append_string_element (b, BSON_TYPE_STRING, name, val, length); +} + +gboolean +bson_append_document (bson *b, const gchar *name, const bson *doc) +{ + return _bson_append_document_element (b, BSON_TYPE_DOCUMENT, name, doc); +} + +gboolean +bson_append_array (bson *b, const gchar *name, const bson *array) +{ + return _bson_append_document_element (b, BSON_TYPE_ARRAY, name, array); +} + +gboolean +bson_append_binary (bson *b, const gchar *name, bson_binary_subtype subtype, + const guint8 *data, gint32 size) +{ + if (!data || !size || size <= 0) + return FALSE; + + if (!_bson_append_element_header (b, BSON_TYPE_BINARY, name)) + return FALSE; + + _bson_append_int32 (b, GINT32_TO_LE (size)); + _bson_append_byte (b, (guint8)subtype); + + b->data = g_byte_array_append (b->data, data, size); + return TRUE; +} + +gboolean +bson_append_oid (bson *b, const gchar *name, const guint8 *oid) +{ + if (!oid) + return FALSE; + + if (!_bson_append_element_header (b, BSON_TYPE_OID, name)) + return FALSE; + + b->data = g_byte_array_append (b->data, oid, 12); + return TRUE; +} + +gboolean +bson_append_boolean (bson *b, const gchar *name, gboolean value) +{ + if (!_bson_append_element_header (b, BSON_TYPE_BOOLEAN, name)) + return FALSE; + + _bson_append_byte (b, (guint8)value); + return TRUE; +} + +gboolean +bson_append_utc_datetime (bson *b, const gchar *name, gint64 ts) +{ + return _bson_append_int64_element (b, BSON_TYPE_UTC_DATETIME, name, ts); +} + +gboolean +bson_append_null (bson *b, const gchar *name) +{ + return _bson_append_element_header (b, BSON_TYPE_NULL, name); +} + +gboolean +bson_append_regex (bson *b, const gchar *name, const gchar *regexp, + const gchar *options) +{ + if (!regexp || !options) + return FALSE; + + if (!_bson_append_element_header (b, BSON_TYPE_REGEXP, name)) + return FALSE; + + b->data = g_byte_array_append (b->data, (const guint8 *)regexp, + strlen (regexp) + 1); + b->data = g_byte_array_append (b->data, (const guint8 *)options, + strlen (options) + 1); + + return TRUE; +} + +gboolean +bson_append_javascript (bson *b, const gchar *name, const gchar *js, + gint32 len) +{ + return _bson_append_string_element (b, BSON_TYPE_JS_CODE, name, js, len); +} + +gboolean +bson_append_symbol (bson *b, const gchar *name, const gchar *symbol, + gint32 len) +{ + return _bson_append_string_element (b, BSON_TYPE_SYMBOL, name, symbol, len); +} + +gboolean +bson_append_javascript_w_scope (bson *b, const gchar *name, + const gchar *js, gint32 len, + const bson *scope) +{ + gint size; + size_t length; + + if (!js || !scope || bson_size (scope) < 0 || len < -1) + return FALSE; + + if (!_bson_append_element_header (b, BSON_TYPE_JS_CODE_W_SCOPE, name)) + return FALSE; + + length = (len != -1) ? (size_t)len + 1: strlen (js) + 1; + + size = length + sizeof (gint32) + sizeof (gint32) + bson_size (scope); + + _bson_append_int32 (b, GINT32_TO_LE (size)); + + /* Append the JS code */ + _bson_append_int32 (b, GINT32_TO_LE (length)); + b->data = g_byte_array_append (b->data, (const guint8 *)js, length - 1); + _bson_append_byte (b, 0); + + /* Append the scope */ + b->data = g_byte_array_append (b->data, bson_data (scope), + bson_size (scope)); + + return TRUE; +} + +gboolean +bson_append_int32 (bson *b, const gchar *name, gint32 i) +{ + if (!_bson_append_element_header (b, BSON_TYPE_INT32, name)) + return FALSE; + + _bson_append_int32 (b, GINT32_TO_LE (i)); + return TRUE; + } + +gboolean +bson_append_timestamp (bson *b, const gchar *name, gint64 ts) +{ + return _bson_append_int64_element (b, BSON_TYPE_TIMESTAMP, name, ts); +} + +gboolean +bson_append_int64 (bson *b, const gchar *name, gint64 i) +{ + return _bson_append_int64_element (b, BSON_TYPE_INT64, name, i); +} + +/* + * Find & retrieve data + */ +bson_cursor * +bson_cursor_new (const bson *b) +{ + bson_cursor *c; + + if (bson_size (b) == -1) + return NULL; + + c = (bson_cursor *)g_new0 (bson_cursor, 1); + c->obj = b; + + return c; +} + +void +bson_cursor_free (bson_cursor *c) +{ + g_free (c); +} + +/** @internal Figure out the block size of a given type. + * + * Provided a #bson_type and some raw data, figures out the length of + * the block, counted from rigth after the element name's position. + * + * @param type is the type of object we need the size for. + * @param data is the raw data (starting right after the element's + * name). + * + * @returns The size of the block, or -1 on error. + */ +static gint32 +_bson_get_block_size (bson_type type, const guint8 *data) +{ + glong l; + + switch (type) + { + case BSON_TYPE_STRING: + case BSON_TYPE_JS_CODE: + case BSON_TYPE_SYMBOL: + return bson_stream_doc_size (data, 0) + sizeof (gint32); + case BSON_TYPE_DOCUMENT: + case BSON_TYPE_ARRAY: + case BSON_TYPE_JS_CODE_W_SCOPE: + return bson_stream_doc_size (data, 0); + case BSON_TYPE_DOUBLE: + return sizeof (gdouble); + case BSON_TYPE_BINARY: + return bson_stream_doc_size (data, 0) + + sizeof (gint32) + sizeof (guint8); + case BSON_TYPE_OID: + return 12; + case BSON_TYPE_BOOLEAN: + return 1; + case BSON_TYPE_UTC_DATETIME: + case BSON_TYPE_TIMESTAMP: + case BSON_TYPE_INT64: + return sizeof (gint64); + case BSON_TYPE_NULL: + case BSON_TYPE_UNDEFINED: + case BSON_TYPE_MIN: + case BSON_TYPE_MAX: + return 0; + case BSON_TYPE_REGEXP: + l = strlen((gchar *)data); + return l + strlen((gchar *)(data + l + 1)) + 2; + case BSON_TYPE_INT32: + return sizeof (gint32); + case BSON_TYPE_DBPOINTER: + return bson_stream_doc_size (data, 0) + sizeof (gint32) + 12; + case BSON_TYPE_NONE: + default: + return -1; + } +} + +gboolean +bson_cursor_next (bson_cursor *c) +{ + const guint8 *d; + gint32 pos, bs; + + if (!c) + return FALSE; + + d = bson_data (c->obj); + + if (c->pos == 0) + pos = sizeof (guint32); + else + { + bs = _bson_get_block_size (bson_cursor_type (c), d + c->value_pos); + if (bs == -1) + return FALSE; + pos = c->value_pos + bs; + } + + if (pos >= bson_size (c->obj) - 1) + return FALSE; + + c->pos = pos; + c->key = (gchar *) &d[c->pos + 1]; + c->value_pos = c->pos + strlen (c->key) + 2; + + return TRUE; +} + +static inline gboolean +_bson_cursor_find (const bson *b, const gchar *name, size_t start_pos, + gint32 end_pos, gboolean wrap_over, bson_cursor *dest_c) +{ + gint32 pos = start_pos, bs; + const guint8 *d; + gint32 name_len; + + name_len = strlen (name); + + d = bson_data (b); + + while (pos < end_pos) + { + bson_type t = (bson_type) d[pos]; + const gchar *key = (gchar *) &d[pos + 1]; + gint32 key_len = strlen (key); + gint32 value_pos = pos + key_len + 2; + + if (key_len == name_len && memcmp (key, name, key_len) == 0) + { + dest_c->obj = b; + dest_c->key = key; + dest_c->pos = pos; + dest_c->value_pos = value_pos; + + return TRUE; + } + bs = _bson_get_block_size (t, &d[value_pos]); + if (bs == -1) + return FALSE; + pos = value_pos + bs; + } + + if (wrap_over) + return _bson_cursor_find (b, name, sizeof (gint32), start_pos, + FALSE, dest_c); + + return FALSE; +} + +gboolean +bson_cursor_find (bson_cursor *c, const gchar *name) +{ + if (!c || !name) + return FALSE; + + return _bson_cursor_find (c->obj, name, c->pos, bson_size (c->obj) - 1, + TRUE, c); +} + +gboolean +bson_cursor_find_next (bson_cursor *c, const gchar *name) +{ + if (!c || !name) + return FALSE; + + return _bson_cursor_find (c->obj, name, c->pos, bson_size (c->obj) - 1, + FALSE, c); +} + +bson_cursor * +bson_find (const bson *b, const gchar *name) +{ + bson_cursor *c; + + if (bson_size (b) == -1 || !name) + return NULL; + + c = bson_cursor_new (b); + if (_bson_cursor_find (b, name, sizeof (gint32), bson_size (c->obj) - 1, + FALSE, c)) + return c; + bson_cursor_free (c); + return NULL; +} + +bson_type +bson_cursor_type (const bson_cursor *c) +{ + if (!c || c->pos < sizeof (gint32)) + return BSON_TYPE_NONE; + + return (bson_type)(bson_data (c->obj)[c->pos]); +} + +const gchar * +bson_cursor_type_as_string (const bson_cursor *c) +{ + if (!c || c->pos < sizeof (gint32)) + return NULL; + + return bson_type_as_string (bson_cursor_type (c)); +} + +const gchar * +bson_cursor_key (const bson_cursor *c) +{ + if (!c) + return NULL; + + return c->key; +} + +/** @internal Convenience macro to verify a cursor's type. + * + * Verifies that the cursor's type is the same as the type requested + * by the caller, and returns FALSE if there is a mismatch. + */ +#define BSON_CURSOR_CHECK_TYPE(c,type) \ + if (bson_cursor_type(c) != type) \ + return FALSE; + +gboolean +bson_cursor_get_string (const bson_cursor *c, const gchar **dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_STRING); + + *dest = (gchar *)(bson_data (c->obj) + c->value_pos + sizeof (gint32)); + + return TRUE; +} + +gboolean +bson_cursor_get_double (const bson_cursor *c, gdouble *dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_DOUBLE); + + memcpy (dest, bson_data (c->obj) + c->value_pos, sizeof (gdouble)); + *dest = GDOUBLE_FROM_LE (*dest); + + return TRUE; +} + +gboolean +bson_cursor_get_document (const bson_cursor *c, bson **dest) +{ + bson *b; + gint32 size; + + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_DOCUMENT); + + size = bson_stream_doc_size (bson_data(c->obj), c->value_pos) - + sizeof (gint32) - 1; + b = bson_new_sized (size); + b->data = g_byte_array_append (b->data, + bson_data (c->obj) + c->value_pos + + sizeof (gint32), size); + bson_finish (b); + + *dest = b; + + return TRUE; +} + +gboolean +bson_cursor_get_array (const bson_cursor *c, bson **dest) +{ + bson *b; + gint32 size; + + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_ARRAY); + + size = bson_stream_doc_size (bson_data(c->obj), c->value_pos) - + sizeof (gint32) - 1; + b = bson_new_sized (size); + b->data = g_byte_array_append (b->data, + bson_data (c->obj) + c->value_pos + + sizeof (gint32), size); + bson_finish (b); + + *dest = b; + + return TRUE; +} + +gboolean +bson_cursor_get_binary (const bson_cursor *c, + bson_binary_subtype *subtype, + const guint8 **data, gint32 *size) +{ + if (!subtype || !size || !data) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_BINARY); + + *size = bson_stream_doc_size (bson_data(c->obj), c->value_pos); + *subtype = (bson_binary_subtype)(bson_data (c->obj)[c->value_pos + + sizeof (gint32)]); + *data = (guint8 *)(bson_data (c->obj) + c->value_pos + sizeof (gint32) + 1); + + return TRUE; +} + +gboolean +bson_cursor_get_oid (const bson_cursor *c, const guint8 **dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_OID); + + *dest = (guint8 *)(bson_data (c->obj) + c->value_pos); + + return TRUE; +} + +gboolean +bson_cursor_get_boolean (const bson_cursor *c, gboolean *dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_BOOLEAN); + + *dest = (gboolean)(bson_data (c->obj) + c->value_pos)[0]; + + return TRUE; +} + +gboolean +bson_cursor_get_utc_datetime (const bson_cursor *c, + gint64 *dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_UTC_DATETIME); + + memcpy (dest, bson_data (c->obj) + c->value_pos, sizeof (gint64)); + *dest = GINT64_FROM_LE (*dest); + + return TRUE; +} + +gboolean +bson_cursor_get_regex (const bson_cursor *c, const gchar **regex, + const gchar **options) +{ + if (!regex || !options) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_REGEXP); + + *regex = (gchar *)(bson_data (c->obj) + c->value_pos); + *options = (gchar *)(*regex + strlen(*regex) + 1); + + return TRUE; +} + +gboolean +bson_cursor_get_javascript (const bson_cursor *c, const gchar **dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_JS_CODE); + + *dest = (gchar *)(bson_data (c->obj) + c->value_pos + sizeof (gint32)); + + return TRUE; +} + +gboolean +bson_cursor_get_symbol (const bson_cursor *c, const gchar **dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_SYMBOL); + + *dest = (gchar *)(bson_data (c->obj) + c->value_pos + sizeof (gint32)); + + return TRUE; +} + +gboolean +bson_cursor_get_javascript_w_scope (const bson_cursor *c, + const gchar **js, + bson **scope) +{ + bson *b; + gint32 size, docpos; + + if (!js || !scope) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_JS_CODE_W_SCOPE); + + docpos = bson_stream_doc_size (bson_data (c->obj), + c->value_pos + sizeof (gint32)) + + sizeof (gint32) * 2; + size = bson_stream_doc_size (bson_data (c->obj), c->value_pos + docpos) - + sizeof (gint32) - 1; + b = bson_new_sized (size); + b->data = g_byte_array_append (b->data, + bson_data (c->obj) + c->value_pos + docpos + + sizeof (gint32), size); + bson_finish (b); + + *scope = b; + *js = (gchar *)(bson_data (c->obj) + c->value_pos + sizeof (gint32) * 2); + + return TRUE; +} + +gboolean +bson_cursor_get_int32 (const bson_cursor *c, gint32 *dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_INT32); + + memcpy (dest, bson_data (c->obj) + c->value_pos, sizeof (gint32)); + *dest = GINT32_FROM_LE (*dest); + + return TRUE; +} + +gboolean +bson_cursor_get_timestamp (const bson_cursor *c, gint64 *dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_TIMESTAMP); + + memcpy (dest, bson_data (c->obj) + c->value_pos, sizeof (gint64)); + *dest = GINT64_FROM_LE (*dest); + + return TRUE; +} + +gboolean +bson_cursor_get_int64 (const bson_cursor *c, gint64 *dest) +{ + if (!dest) + return FALSE; + + BSON_CURSOR_CHECK_TYPE (c, BSON_TYPE_INT64); + + memcpy (dest, bson_data (c->obj) + c->value_pos, sizeof (gint64)); + *dest = GINT64_FROM_LE (*dest); + + return TRUE; +} diff --git a/src/bson.h b/src/bson.h new file mode 100644 index 0000000..9349ea9 --- /dev/null +++ b/src/bson.h @@ -0,0 +1,856 @@ +/* bson.h - libmongo-client's BSON implementation + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/bson.h + * The BSON API's public header. + */ + +#ifndef LIBMONGO_CLIENT_BSON_H +#define LIBMONGO_CLIENT_BSON_H 1 + +#include <glib.h> +#include <string.h> + +G_BEGIN_DECLS + +/** @defgroup bson_mod BSON + * + * The types, functions and everything else within this module is + * meant to allow one to work with BSON objects easily. + * + * @addtogroup bson_mod + * @{ + */ + +/** @defgroup bson_types Types + * + * @addtogroup bson_types + * @{ + */ + +/** An opaque BSON object. + * A BSON object represents a full BSON document, as specified at + * http://bsonspec.org/. + * + * Each object has two states: open and finished. While the document + * is open, it can be appended to, but it cannot be read from. While + * it is finished, it can be read from, and iterated over, but cannot + * be appended to. + */ +typedef struct _bson bson; + +/** Opaque BSON cursor. + * Cursors are used to represent a single entry within a BSON object, + * and to help iterating over said document. + */ +typedef struct _bson_cursor bson_cursor; + +/** Supported BSON object types. + */ +typedef enum + { + BSON_TYPE_NONE = 0, /**< Only used for errors */ + BSON_TYPE_DOUBLE = 0x01, /**< 8byte double */ + BSON_TYPE_STRING, /**< 4byte length + NULL terminated string */ + BSON_TYPE_DOCUMENT, /**< 4byte length + NULL terminated document */ + BSON_TYPE_ARRAY, /**< 4byte length + NULL terminated document */ + BSON_TYPE_BINARY, /**< 4byte length + 1byte subtype + data */ + BSON_TYPE_UNDEFINED, /* Deprecated*/ + BSON_TYPE_OID, /**< 12byte ObjectID */ + BSON_TYPE_BOOLEAN, /**< 1byte boolean value */ + BSON_TYPE_UTC_DATETIME, /**< 8byte timestamp; milliseconds since + Unix epoch */ + BSON_TYPE_NULL, /**< NULL value, No following data. */ + BSON_TYPE_REGEXP, /**< Two NULL terminated C strings, the regex + itself, and the options. */ + BSON_TYPE_DBPOINTER, /* Deprecated */ + BSON_TYPE_JS_CODE, /**< 4byte length + NULL terminated string */ + BSON_TYPE_SYMBOL, /**< 4byte length + NULL terminated string */ + BSON_TYPE_JS_CODE_W_SCOPE, /**< 4byte length, followed by a + string and a document */ + BSON_TYPE_INT32, /**< 4byte integer */ + BSON_TYPE_TIMESTAMP, /**< 4bytes increment + 4bytes timestamp */ + BSON_TYPE_INT64, /**< 8byte integer */ + BSON_TYPE_MIN = 0xff, + BSON_TYPE_MAX = 0x7f + } bson_type; + +/** Return a type's stringified name. + * + * @param type is the type to stringify. + * + * @returns The stringified type, or NULL on error. + */ +const gchar *bson_type_as_string (bson_type type); + +/** Supported BSON binary subtypes. + */ +typedef enum + { + BSON_BINARY_SUBTYPE_GENERIC = 0x00, /**< The Generic subtype, the + default. */ + BSON_BINARY_SUBTYPE_FUNCTION = 0x01, /**< Binary representation + of a function. */ + BSON_BINARY_SUBTYPE_BINARY = 0x02, /**< Obsolete, do not use. */ + BSON_BINARY_SUBTYPE_UUID = 0x03, /**< Binary representation of an + UUID. */ + BSON_BINARY_SUBTYPE_MD5 = 0x05, /**< Binary representation of an + MD5 sum. */ + BSON_BINARY_SUBTYPE_USER_DEFINED = 0x80 /**< User defined data, + nothing's known about + the structure. */ + } bson_binary_subtype; + +/** @} */ + +/** @defgroup bson_object_access Object Access + * + * Functions that operate on whole BSON objects. + * + * @addtogroup bson_object_access + * @{ + */ + +/** Create a new BSON object. + * + * @note The created object will have no memory pre-allocated for data, + * resulting in possibly more reallocations than neccessary when + * appending elements. + * + * @note If at all possible, use bson_new_sized() instead. + * + * @returns A newly allocated object, or NULL on error. + */ +bson *bson_new (void); + +/** Create a new BSON object, preallocating a given amount of space. + * + * Creates a new BSON object, pre-allocating @a size bytes of space + * for the data. + * + * @param size is the space to pre-allocate for data. + * + * @note It is not an error to pre-allocate either less, or more space + * than what will really end up being added. Pre-allocation does not + * set the size of the final object, it is merely a hint, a way to + * help the system avoid memory reallocations. + * + * @returns A newly allocated object, or NULL on error. + */ +bson *bson_new_sized (gint32 size); + +/** Create a BSON object from existing data. + * + * In order to be able to parse existing BSON, one must load it up + * into a bson object - and this function does just that. + * + * @note Objects created by this function are not final objects, in + * order to be able to extend them. As such, when importing existing + * BSON data, which are terminated by a zero byte, specify the size as + * one smaller than the original data stream. + * + * @note This is because bson_finish() will append a zero byte, thus + * one would end up with an invalid document if it had an extra one. + * + * @param data is the BSON byte stream to import. + * @param size is the size of the byte stream. + * + * @returns A newly allocated object, with a copy of @a data as its + * contents. + */ +bson *bson_new_from_data (const guint8 *data, gint32 size); + +/** Build a BSON object in one go, with full control. + * + * This function can be used to build a BSON object in one simple + * step, chaining all the elements together (including sub-documents, + * created by this same function - more about that later). + * + * One has to specify the type, the key name, and whether he wants to + * see the added object free'd after addition. Each element type is + * freed appropriately, and documents and arrays are finished before + * addition, if they're to be freed afterwards. + * + * This way of operation allows one to build a full BSON object, even + * with embedded documents, without leaking memory. + * + * After the three required parameters, one will need to list the data + * itself, in the same order as one would if he'd add with the + * bson_append family of functions. + * + * The list must be closed with a #BSON_TYPE_NONE element, and the @a + * name and @a free_after parameters are not needed for the closing + * entry. + * + * @param type is the element type we'll be adding. + * @param name is the key name. + * @param free_after determines whether the original variable will be + * freed after adding it to the BSON object. + * + * @returns A newly allocated, unfinished BSON object, which must be + * finalized and freed, once not needed anymore, by the caller. Or + * NULL on error. + */ +bson *bson_build_full (bson_type type, const gchar *name, + gboolean free_after, ...); + +/** Build a BSON object in one go. + * + * Very similar to bson_build_full(), so much so, that it's exactly + * the same, except that the @a free_after parameter is always FALSE, + * and must not be specified in this case. + * + * @param type is the element type we'll be adding. + * @param name is the key name. + * + * @returns A newly allocated, unfinished BSON object, which must be + * finalized and freed, once not needed anymore, by the caller. Or + * NULL on error. + */ +bson *bson_build (bson_type type, const gchar *name, ...); + +/** Finish a BSON object. + * + * Terminate a BSON object. This includes appending the trailing zero + * byte and finalising the length of the object. + * + * The object cannot be appended to after it is finalised. + * + * @param b is the BSON object to close & finish. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_finish (bson *b); + +/** Reset a BSON object. + * + * Resetting a BSON object clears the finished status, and sets its + * size to zero. Resetting is most useful when wants to keep the + * already allocated memory around for reuse. + * + * @param b is the BSON object to reset. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_reset (bson *b); + +/** Free the memory associated with a BSON object. + * + * Frees up all memory associated with a BSON object. The variable + * shall not be used afterwards. + * + * @param b is the BSON object to free. + */ +void bson_free (bson *b); + +/** Return the size of a finished BSON object. + * + * @param b is the finished BSON object. + * + * @returns The size of the document, or -1 on error. + * + * @note Trying to get the size of a BSON object that has not been + * closed by bson_finish() is considered an error. + */ +gint32 bson_size (const bson *b); + +/** Return the raw bytestream form of the BSON object. + * + * @param b is the BSON object to retrieve data from. + * + * @returns The raw datastream or NULL on error. The stream s all not + * be freed. + * + * @note Trying to retrieve the data of an unfinished BSON object is + * considered an error. + */ +const guint8 *bson_data (const bson *b); + +/** Validate a BSON key. + * + * Verifies that a given key is a valid BSON field name. Depending on + * context (togglable by the boolean flags) this means that the string + * must either be free of dots, or must not start with a dollar sign. + * + * @param key is the field name to validate. + * @param forbid_dots toggles whether to disallow dots in the name + * altogether. + * @param no_dollar toggles whether to forbid key names starting with + * a dollar sign. + * + * @returns TRUE if the field name is found to be valid, FALSE + * otherwise. + * + * @note This function does NOT do UTF-8 validation. That is left up + * to the application. + */ +gboolean bson_validate_key (const gchar *key, gboolean forbid_dots, + gboolean no_dollar); + +/** Reads out the 32-bit documents size from a BSON bytestream. + * + * This function can be used when reading data from a stream, and one + * wants to build a BSON object from the bytestream: for + * bson_new_from_data(), one needs the length. This function provides + * that. + * + * @param doc is the byte stream to check the size of. + * @param pos is the position in the bytestream to start reading at. + * + * @returns The size of the document at the appropriate position. + * + * @note The byte stream is expected to be in little-endian byte + * order. + */ +static __inline__ gint32 bson_stream_doc_size (const guint8 *doc, gint32 pos) +{ + gint32 size; + + memcpy (&size, doc + pos, sizeof (gint32)); + return GINT32_FROM_LE (size); +} + +/** @} */ + +/** @defgroup bson_append Appending + * + * @brief Functions to append various kinds of elements to existing + * BSON objects. + * + * Every such function expects the BSON object to be open, and will + * return FALSE immediately if it finds that the object has had + * bson_finish() called on it before. + * + * The only way to append to a finished BSON object is to @a clone it + * with bson_new_from_data(), and append to the newly created object. + * + * @addtogroup bson_append + * @{ + */ + +/** Append a string to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param val is the value to append. + * @param length is the length of value. Use @a -1 to use the full + * string supplied as @a name. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_string (bson *b, const gchar *name, const gchar *val, + gint32 length); + +/** Append a double to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param d is the double value to append. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_double (bson *b, const gchar *name, gdouble d); + +/** Append a BSON document to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param doc is the BSON document to append. + * + * @note @a doc MUST be a finished BSON document. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_document (bson *b, const gchar *name, const bson *doc); + +/** Append a BSON array to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param array is the BSON array to append. + * + * @note @a array MUST be a finished BSON document. + * + * @note The difference between plain documents and arrays - as far as + * this library is concerned, and apart from the type - is that array + * keys must be numbers in increasing order. However, no extra care is + * taken to verify that: it is the responsibility of the caller to set + * the array up appropriately. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_array (bson *b, const gchar *name, const bson *array); + +/** Append a BSON binary blob to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param subtype is the BSON binary subtype to use. + * @param data is a pointer to the blob data. + * @param size is the size of the blob. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_binary (bson *b, const gchar *name, + bson_binary_subtype subtype, + const guint8 *data, gint32 size); + +/** Append an ObjectID to a BSON object. + * + * ObjectIDs are 12 byte values, the first four being a timestamp in + * big endian byte order, the next three a machine ID, then two bytes + * for the PID, and finally three bytes of sequence number, in big + * endian byte order again. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param oid is the ObjectID to append. + * + * @note The OID must be 12 bytes long, and formatting it + * appropriately is the responsiblity of the caller. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_oid (bson *b, const gchar *name, const guint8 *oid); + +/** Append a boolean to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param value is the boolean value to append. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_boolean (bson *b, const gchar *name, gboolean value); + +/** Append an UTC datetime to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param ts is the UTC timestamp: the number of milliseconds since + * the Unix epoch. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_utc_datetime (bson *b, const gchar *name, gint64 ts); + +/** Append a NULL value to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_null (bson *b, const gchar *name); + +/** Append a regexp object to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param regexp is the regexp string itself. + * @param options represents the regexp options, serialised to a + * string. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_regex (bson *b, const gchar *name, const gchar *regexp, + const gchar *options); + +/** Append Javascript code to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param js is the javascript code as a C string. + * @param len is the length of the code, use @a -1 to use the full + * length of the string supplised in @a js. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_javascript (bson *b, const gchar *name, const gchar *js, + gint32 len); + +/** Append a symbol to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param symbol is the symbol to append. + * @param len is the length of the code, use @a -1 to use the full + * length of the string supplised in @a symbol. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_symbol (bson *b, const gchar *name, const gchar *symbol, + gint32 len); + +/** Append Javascript code (with scope) to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param js is the javascript code as a C string. + * @param len is the length of the code, use @a -1 to use the full + * length of the string supplied in @a js. + * @param scope is scope to evaluate the javascript code in. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_javascript_w_scope (bson *b, const gchar *name, + const gchar *js, gint32 len, + const bson *scope); + +/** Append a 32-bit integer to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param i is the integer to append. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_int32 (bson *b, const gchar *name, gint32 i); + +/** Append a timestamp to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param ts is the timestamp to append. + * + * @note The ts param should consists of 4 bytes of increment, + * followed by 4 bytes of timestamp. It is the responsibility of the + * caller to set the variable up appropriately. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_timestamp (bson *b, const gchar *name, gint64 ts); + +/** Append a 64-bit integer to a BSON object. + * + * @param b is the BSON object to append to. + * @param name is the key name. + * @param i is the integer to append. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_append_int64 (bson *b, const gchar *name, gint64 i); + +/** @} */ + +/** @defgroup bson_cursor Cursor & Retrieval + * + * This section documents the cursors, and the data retrieval + * functions. Each and every function here operates on finished BSON + * objects, and will return with an error if passed an open object. + * + * Data can be retrieved from cursors, which in turn point to a + * specific part of the BSON object. + * + * The idea is to place the cursor to the appropriate key first, then + * retrieve the data stored there. Trying to retrieve data that is of + * different type than what the cursor is results in an error. + * + * Functions to iterate to the next key, and retrieve the current + * keys name are also provided. + * + * @addtogroup bson_cursor + * @{ + */ + +/** Create a new cursor. + * + * Creates a new cursor, and positions it to the beginning of the + * supplied BSON object. + * + * @param b is the BSON object to create a cursor for. + * + * @returns A newly allocated cursor, or NULL on error. + */ +bson_cursor *bson_cursor_new (const bson *b); + +/** Create a new cursor positioned at a given key. + * + * Creates a new cursor, and positions it to the supplied key within + * the BSON object. + * + * @param b is the BSON object to create a cursor for. + * @param name is the key name to position to. + * + * @returns A newly allocated cursor, or NULL on error. + */ +bson_cursor *bson_find (const bson *b, const gchar *name); + +/** Delete a cursor, and free up all resources used by it. + * + * @param c is the cursor to free. + */ +void bson_cursor_free (bson_cursor *c); + +/** Position the cursor to the next key. + * + * @param c is the cursor to move forward. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_next (bson_cursor *c); + +/** Move the cursor to a given key, past the current one. + * + * Scans the BSON object past the current key, in search for the + * specified one, and positions the cursor there if found, leaves it + * in place if not. + * + * @param c is the cursor to move forward. + * @param name is the key name to position to. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_find_next (bson_cursor *c, const gchar *name); + +/** Move the cursor to a given key + * + * Like bson_cursor_find_next(), this function will start scanning the + * BSON object at the current position. If the key is not found after + * it, it will wrap over and search up to the original position. + * + * @param c is the cursor to move. + * @param name is the key name to position to. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_find (bson_cursor *c, const gchar *name); + +/** Determine the type of the current element. + * + * @param c is the cursor pointing at the appropriate element. + * + * @returns The type of the element, or #BSON_TYPE_NONE on error. + */ +bson_type bson_cursor_type (const bson_cursor *c); + +/** Retrieve the type of the current element, as string. + * + * @param c is the cursor pointing at the appropriate element. + * + * @returns The type of the element, as string, or NULL on error. + * + * @note The string points to an internal structure, it should not be + * freed or modified. + */ +const gchar *bson_cursor_type_as_string (const bson_cursor *c); + +/** Determine the name of the current elements key. + * + * @param c is the cursor pointing at the appropriate element. + * + * @returns The name of the key, or NULL on error. + * + * @note The name is a pointer to an internal string, one must NOT + * free it. + */ +const gchar *bson_cursor_key (const bson_cursor *c); + +/** Get the value stored at the cursor, as string. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @note The @a dest pointer will be set to point to an internal + * structure, and must not be freed or modified by the caller. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_string (const bson_cursor *c, const gchar **dest); + +/** Get the value stored at the cursor, as a double. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_double (const bson_cursor *c, gdouble *dest); + +/** Get the value stored at the cursor, as a BSON document. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @note The @a dest pointer will be a newly allocated, finished + * object: it is the responsibility of the caller to free it. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_document (const bson_cursor *c, bson **dest); + +/** Get the value stored at the cursor, as a BSON array. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @note The @a dest pointer will be a newly allocated, finished + * object: it is the responsibility of the caller to free it. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_array (const bson_cursor *c, bson **dest); + +/** Get the value stored at the cursor, as binary data. + * + * @param c is the cursor pointing at the appropriate element. + * @param subtype is a pointer to store the binary subtype at. + * @param data is a pointer to where the data shall be stored. + * @param size is a pointer to store the size at. + * + * @note The @a data pointer will be pointing to an internal + * structure, it must not be freed or modified. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_binary (const bson_cursor *c, + bson_binary_subtype *subtype, + const guint8 **data, gint32 *size); + +/** Get the value stored at the cursor, as an ObjectID. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @note The @a dest pointer will be set to point to an internal + * structure, and must not be freed or modified by the caller. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_oid (const bson_cursor *c, const guint8 **dest); + +/** Get the value stored at the cursor, as a boolean. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_boolean (const bson_cursor *c, gboolean *dest); + +/** Get the value stored at the cursor, as an UTC datetime. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_utc_datetime (const bson_cursor *c, gint64 *dest); + +/** Get the value stored at the cursor, as a regexp. + * + * @param c is the cursor pointing at the appropriate element. + * @param regex is a pointer to a variable where the regex can be + * stored. + * @param options is a pointer to a variable where the options can be + * stored. + * + * @note Both the @a regex and @a options pointers will be set to + * point to an internal structure, and must not be freed or modified + * by the caller. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_regex (const bson_cursor *c, const gchar **regex, + const gchar **options); + +/** Get the value stored at the cursor, as javascript code. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @note The @a dest pointer will be set to point to an internal + * structure, and must not be freed or modified by the caller. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_javascript (const bson_cursor *c, const gchar **dest); + +/** Get the value stored at the cursor, as a symbol. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @note The @a dest pointer will be set to point to an internal + * structure, and must not be freed or modified by the caller. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_symbol (const bson_cursor *c, const gchar **dest); + +/** Get the value stored at the cursor, as javascript code w/ scope. + * + * @param c is the cursor pointing at the appropriate element. + * @param js is a pointer to a variable where the javascript code can + * be stored. + * @param scope is a pointer to a variable where the scope can be + * stored. + * + * @note The @a scope pointer will be a newly allocated, finished + * BSON object: it is the responsibility of the caller to free it. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_javascript_w_scope (const bson_cursor *c, + const gchar **js, + bson **scope); + +/** Get the value stored at the cursor, as a 32-bit integer. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_int32 (const bson_cursor *c, gint32 *dest); + +/** Get the value stored at the cursor, as a timestamp. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_timestamp (const bson_cursor *c, gint64 *dest); + +/** Get the value stored at the cursor, as a 64-bit integer. + * + * @param c is the cursor pointing at the appropriate element. + * @param dest is a pointer to a variable where the value can be + * stored. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean bson_cursor_get_int64 (const bson_cursor *c, gint64 *dest); + +/** @} */ + +/** @} */ + +G_END_DECLS + +#endif diff --git a/src/compat.c b/src/compat.c new file mode 100644 index 0000000..d0b1be4 --- /dev/null +++ b/src/compat.c @@ -0,0 +1,108 @@ +/* compat.c - Various compatibility functions + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" + +#if WITH_OPENSSL + +#include "compat.h" +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <openssl/md5.h> + +struct _GChecksum +{ + GChecksumType type; + char hex_digest[33]; + + MD5_CTX context; +}; + +GChecksum * +g_checksum_new (GChecksumType checksum_type) +{ + GChecksum *chk; + + if (checksum_type != G_CHECKSUM_MD5) + { + errno = ENOSYS; + return NULL; + } + + chk = calloc (1, sizeof (GChecksum)); + chk->type = checksum_type; + + MD5_Init (&chk->context); + + return chk; +} + +void +g_checksum_free (GChecksum *checksum) +{ + if (checksum) + free (checksum); +} + +void +g_checksum_update (GChecksum *checksum, + const unsigned char *data, + ssize_t length) +{ + size_t l = length; + + if (!checksum || !data || length == 0) + { + errno = EINVAL; + return; + } + errno = 0; + + if (length < 0) + l = strlen ((const char *)data); + + MD5_Update (&checksum->context, (const void *)data, l); +} + +const char * +g_checksum_get_string (GChecksum *checksum) +{ + unsigned char digest[16]; + static const char hex[16] = + {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + 'a', 'b', 'c', 'd', 'e', 'f'}; + int i; + + if (!checksum) + { + errno = EINVAL; + return NULL; + } + + MD5_Final (digest, &checksum->context); + + for (i = 0; i < 16; i++) + { + checksum->hex_digest[2 * i] = hex[(digest[i] & 0xf0) >> 4]; + checksum->hex_digest[2 * i + 1] = hex[digest[i] & 0x0f]; + } + checksum->hex_digest[32] = '\0'; + + return checksum->hex_digest; +} + +#endif /* WITH_OPENSSL */ diff --git a/src/compat.h b/src/compat.h new file mode 100644 index 0000000..f5ab52f --- /dev/null +++ b/src/compat.h @@ -0,0 +1,50 @@ +/* compat.h - Various compatibility functions + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMONGO_COMPAT_H +#define LIBMONGO_COMPAT_H 1 + +#include "config.h" + +#include <sys/types.h> +#include <sys/socket.h> + +#if WITH_OPENSSL + +#include <openssl/md5.h> + +typedef enum { + G_CHECKSUM_MD5, + G_CHECKSUM_SHA1, + G_CHECKSUM_SHA256 +} GChecksumType; + +typedef struct _GChecksum GChecksum; + +GChecksum *g_checksum_new (GChecksumType checksum_type); +void g_checksum_free (GChecksum *checksum); +void g_checksum_update (GChecksum *checksum, + const unsigned char *data, + ssize_t length); +const char *g_checksum_get_string (GChecksum *checksum); + +#endif /* WITH_OPENSSL */ + +#ifndef MSG_WAITALL +#define MSG_WAITALL 0x40 +#endif + +#endif diff --git a/src/libmongo-client.pc.in b/src/libmongo-client.pc.in new file mode 100644 index 0000000..ce3a783 --- /dev/null +++ b/src/libmongo-client.pc.in @@ -0,0 +1,12 @@ +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: libmongo-client +Version: @VERSION@ +Description: MongoDB client library +URL: https://github.com/algernon/libmongo-client +Requires.private: glib-2.0 +Libs: -L${libdir} -lmongo-client +Cflags: -I${includedir}/mongo-client diff --git a/src/libmongo-client.ver b/src/libmongo-client.ver new file mode 100644 index 0000000..58f08f1 --- /dev/null +++ b/src/libmongo-client.ver @@ -0,0 +1,163 @@ +LMC_0.1.0_INTERNAL { + global: + mongo_wire_cmd_kill_cursors_va; + mongo_wire_packet_get_header_raw; + mongo_wire_packet_set_header_raw; + local: + *; +}; + +LMC_0.1.0 { + bson_append_array; + bson_append_binary; + bson_append_boolean; + bson_append_document; + bson_append_double; + bson_append_int32; + bson_append_int64; + bson_append_javascript; + bson_append_javascript_w_scope; + bson_append_null; + bson_append_oid; + bson_append_regex; + bson_append_string; + bson_append_symbol; + bson_append_timestamp; + bson_append_utc_datetime; + bson_build; + bson_build_full; + bson_cursor_free; + bson_cursor_get_array; + bson_cursor_get_binary; + bson_cursor_get_boolean; + bson_cursor_get_document; + bson_cursor_get_double; + bson_cursor_get_int32; + bson_cursor_get_int64; + bson_cursor_get_javascript; + bson_cursor_get_javascript_w_scope; + bson_cursor_get_oid; + bson_cursor_get_regex; + bson_cursor_get_string; + bson_cursor_get_symbol; + bson_cursor_get_timestamp; + bson_cursor_get_utc_datetime; + bson_cursor_key; + bson_cursor_new; + bson_cursor_next; + bson_cursor_type; + bson_cursor_type_as_string; + bson_data; + bson_find; + bson_finish; + bson_free; + bson_new; + bson_new_from_data; + bson_new_sized; + bson_reset; + bson_size; + bson_type_as_string; + mongo_connection_get_requestid; + mongo_disconnect; + mongo_packet_recv; + mongo_packet_send; + mongo_sync_cmd_authenticate; + mongo_sync_cmd_count; + mongo_sync_cmd_custom; + mongo_sync_cmd_delete; + mongo_sync_cmd_drop; + mongo_sync_cmd_get_last_error; + mongo_sync_cmd_get_more; + mongo_sync_cmd_insert; + mongo_sync_cmd_insert_n; + mongo_sync_cmd_is_master; + mongo_sync_cmd_kill_cursors; + mongo_sync_cmd_ping; + mongo_sync_cmd_query; + mongo_sync_cmd_reset_error; + mongo_sync_cmd_update; + mongo_sync_cmd_user_add; + mongo_sync_cmd_user_remove; + mongo_sync_conn_get_auto_reconnect; + mongo_sync_conn_get_max_insert_size; + mongo_sync_conn_get_safe_mode; + mongo_sync_conn_get_slaveok; + mongo_sync_conn_seed_add; + mongo_sync_conn_set_auto_reconnect; + mongo_sync_conn_set_max_insert_size; + mongo_sync_conn_set_safe_mode; + mongo_sync_conn_set_slaveok; + mongo_sync_disconnect; + mongo_sync_pool_free; + mongo_sync_pool_new; + mongo_sync_pool_pick; + mongo_sync_pool_return; + mongo_sync_reconnect; + mongo_util_oid_init; + mongo_util_oid_new; + mongo_util_oid_new_with_time; + mongo_util_parse_addr; + mongo_wire_cmd_custom; + mongo_wire_cmd_delete; + mongo_wire_cmd_get_more; + mongo_wire_cmd_insert; + mongo_wire_cmd_insert_n; + mongo_wire_cmd_kill_cursors; + mongo_wire_cmd_kill_cursors_va; + mongo_wire_cmd_query; + mongo_wire_cmd_update; + mongo_wire_packet_free; + mongo_wire_packet_get_data; + mongo_wire_packet_get_header; + mongo_wire_packet_get_header_raw; + mongo_wire_packet_new; + mongo_wire_packet_set_data; + mongo_wire_packet_set_header; + mongo_wire_packet_set_header_raw; + mongo_wire_reply_packet_get_data; + mongo_wire_reply_packet_get_header; + mongo_wire_reply_packet_get_nth_document; +} LMC_0.1.0_INTERNAL; + +LMC_0.1.1 { + bson_validate_key; + bson_cursor_find_next; + bson_stream_doc_size; + mongo_sync_cursor_*; +} LMC_0.1.0; + +LMC_0.1.2 { + bson_cursor_find; + mongo_connection_set_timeout; + mongo_sync_cmd_index_*; +} LMC_0.1.1; + +LMC_0.1.3 { + mongo_sync_gridfs_*; + mongo_sync_cmd_create; + mongo_sync_cmd_exists; + mongo_util_oid_as_string; +} LMC_0.1.2; + +LMC_0.1.6 { + global: + mongo_connect; + mongo_sync_connect; + local: + mongo_tcp_connect; + mongo_sync_connect_0_1_0; +} LMC_0.1.3; + +LMC_0.1.7 { + mongo_sync_cmd_user_add_with_roles; +} LMC_0.1.6; + +LMC_0.1.8 { + mongo_sync_conn_recovery_cache_new; + mongo_sync_conn_recovery_cache_free; + mongo_sync_conn_recovery_cache_discard; + mongo_sync_conn_recovery_cache_seed_add; + mongo_sync_connect_recovery_cache; + mongo_sync_conn_get_last_error; + mongo_sync_cmd_get_last_error_full; +} LMC_0.1.7; diff --git a/src/libmongo-macros.h b/src/libmongo-macros.h new file mode 100644 index 0000000..644fbe8 --- /dev/null +++ b/src/libmongo-macros.h @@ -0,0 +1,51 @@ +/* libmongo-macros.h - helper macros for libmongo-client. + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMONGO_MACROS_H +#define LIBMONGO_MACROS_H 1 + +#include <glib.h> + +inline static gdouble +GDOUBLE_SWAP_LE_BE(gdouble in) +{ + union + { + guint64 i; + gdouble d; + } u; + + u.d = in; + u.i = GUINT64_SWAP_LE_BE (u.i); + return u.d; +} + +#if G_BYTE_ORDER == G_LITTLE_ENDIAN +#define GDOUBLE_TO_LE(val) ((gdouble) (val)) +#define GDOUBLE_TO_BE(val) (GDOUBLE_SWAP_LE_BE (val)) + +#elif G_BYTE_ORDER == G_BIG_ENDIAN +#define GDOUBLE_TO_LE(val) (GDOUBLE_SWAP_LE_BE (val)) +#define GDOUBLE_TO_BE(val) ((gdouble) (val)) + +#else /* !G_LITTLE_ENDIAN && !G_BIG_ENDIAN */ +#error unknown ENDIAN type +#endif /* !G_LITTLE_ENDIAN && !G_BIG_ENDIAN */ + +#define GDOUBLE_FROM_LE(val) (GDOUBLE_TO_LE (val)) +#define GDOUBLE_FROM_BE(val) (GDOUBLE_TO_BE (val)) + +#endif diff --git a/src/libmongo-private.h b/src/libmongo-private.h new file mode 100644 index 0000000..e13f0da --- /dev/null +++ b/src/libmongo-private.h @@ -0,0 +1,276 @@ +/* libmongo-private.h - private headers for libmongo-client + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file libmongo-private.h + * + * Private types and functions, for internal use in libmongo-client only. + */ + +#ifndef LIBMONGO_PRIVATE_H +#define LIBMONGO_PRIVATE_H 1 + +#include "mongo.h" +#include "compat.h" + +/** @internal BSON structure. + */ +struct _bson +{ + GByteArray *data; /**< The actual data of the BSON object. */ + gboolean finished; /**< Flag to indicate whether the object is open + or finished. */ +}; + +/** @internal Mongo Connection state object. */ +struct _mongo_connection +{ + gint fd; /**< The file descriptor associated with the connection. */ + gint32 request_id; /**< The last sent command's requestID. */ +}; + +/** @internal Mongo Replica Set object. */ +typedef struct _replica_set +{ + GList *seeds; /**< Replica set seeds, as a list of strings. */ + GList *hosts; /**< Replica set members, as a list of strings. */ + gchar *primary; /**< The replica master, if any. */ +} replica_set; /**< Replica Set properties. */ + +/** @internal MongoDb Authentication Credentials object. + * These values are mlock()'ed. + */ +typedef struct _auth_credentials +{ + gchar *db; /**< The database to authenticate against. */ + gchar *user; /**< The username to authenticate with. */ + gchar *pw; /**< The password to authenticate with. */ +} auth_credentials; + +/** @internal Connection Recovery Cache for MongoDb. */ +struct _mongo_sync_conn_recovery_cache +{ + replica_set rs; /**< The replica set. */ + auth_credentials auth; /**< The authentication credentials.*/ +}; + +/** @internal Synchronous connection object. */ +struct _mongo_sync_connection +{ + mongo_connection super; /**< The parent object. */ + gboolean slaveok; /**< Whether queries against slave nodes are + acceptable. */ + gboolean safe_mode; /**< Safe-mode signal flag. */ + gboolean auto_reconnect; /**< Auto-reconnect flag. */ + + gchar *last_error; /**< The last error from the server, caught + during queries. */ + gint32 max_insert_size; /**< Maximum number of bytes an insert + command can be before being split to + smaller chunks. Used for bulk inserts. */ + + replica_set rs; /**< Replica set. */ + auth_credentials auth; /**< Authentication credentials. */ + + mongo_sync_conn_recovery_cache *recovery_cache; /**< Reference to the externally managed recovery cache. */ +}; + +/** @internal MongoDB cursor object. + * + * The cursor object can be used to conveniently iterate over a query + * result set. + */ +struct _mongo_sync_cursor +{ + mongo_sync_connection *conn; /**< The connection associated with + the cursor. Owned by the caller. */ + gchar *ns; /**< The namespace of the cursor. */ + mongo_packet *results; /**< The current result set, as a mongo + packet. */ + + gint32 offset; /**< Offset of the cursor within the active result + set. */ + mongo_reply_packet_header ph; /**< The reply headers extracted from + the active result set. */ +}; + +/** @internal Synchronous pool connection object. */ +struct _mongo_sync_pool_connection +{ + mongo_sync_connection super; /**< The parent object. */ + + gint pool_id; /**< ID of the connection. */ + gboolean in_use; /**< Whether the object is in use or not. */ +}; + +/** @internal GridFS object */ +struct _mongo_sync_gridfs +{ + mongo_sync_connection *conn; /**< Connection the object is + associated to. */ + + struct + { + gchar *prefix; /**< The namespace prefix. */ + gchar *files; /**< The file metadata namespace. */ + gchar *chunks; /**< The chunk namespace. */ + + gchar *db; /**< The database part of the namespace. */ + } ns; /**< Namespaces */ + + gint32 chunk_size; /**< The default chunk size. */ +}; + +/** @internal GridFS file types. */ +typedef enum +{ + LMC_GRIDFS_FILE_CHUNKED, /**< Chunked file. */ + LMC_GRIDFS_FILE_STREAM_READER, /**< Streamed file, reader. */ + LMC_GRIDFS_FILE_STREAM_WRITER, /**< Streamed file, writer. */ +} _mongo_gridfs_type; + +/** @internal GridFS common file properties. + * + * This is shared between chunked and streamed files. + */ +typedef struct +{ + gint32 chunk_size; /**< Maximum chunk size for this file. */ + gint64 length; /**< Total length of the file. */ + + union + { + /** Chunked file data. */ + struct + { + const guint8 *oid; /**< The file's ObjectID. */ + const gchar *md5; /**< MD5 sum of the file. */ + gint64 date; /**< The upload date. */ + bson *metadata; /**< Full file metadata, including user-set + keys. */ + }; + + /** Streamed file data */ + struct + { + gint64 offset; /**< Offset we're into the file. */ + gint64 current_chunk; /**< The current chunk we're on. */ + guint8 *id; /**< A copy of the file's ObjectID. */ + }; + }; + + _mongo_gridfs_type type; /**< The type of the GridFS file. */ +} mongo_sync_gridfs_file_common; + +/** @internal GridFS file object. */ +struct _mongo_sync_gridfs_chunked_file +{ + mongo_sync_gridfs_file_common meta; /**< The file metadata. */ + mongo_sync_gridfs *gfs; /**< The GridFS the file is on. */ +}; + +/** @internal GridFS file stream object. */ +struct _mongo_sync_gridfs_stream +{ + mongo_sync_gridfs_file_common file; /**< Common file data. */ + mongo_sync_gridfs *gfs; /**< The GridFS the file is on. */ + + /** Reader & Writer structure union. + */ + union + { + /** Reader-specific data. + */ + struct + { + bson *bson; /**< The current chunk as BSON. */ + + /** Chunk state information. + */ + struct + { + const guint8 *data; /**< The current chunk data, pointing + into ->reader.bson. */ + gint32 start_offset; /**< Offset to start reading data from, + needed to support the binary subtype. */ + gint32 size; /**< Size of the current chunk. */ + gint32 offset; /**< Offset we're into the chunk. */ + } chunk; + } reader; + + /** Writer-specific data. + */ + struct + { + bson *metadata; /**< Copy of the user-supplied metadata. */ + guint8 *buffer; /**< The current output buffer. */ + gint32 buffer_offset; /**< Offset into the output buffer. */ + + GChecksum *checksum; /**< The running checksum of the output + file. */ + } writer; + }; +}; + +/** @internal Construct a kill cursors command, using a va_list. + * + * @param id is the sequence id. + * @param n is the number of cursors to delete. + * @param ap is the va_list of cursors to kill. + * + * @note One must supply exaclty @a n number of cursor IDs. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_kill_cursors_va (gint32 id, gint32 n, + va_list ap); + +/** @internal Get the header data of a packet, without conversion. + * + * Retrieve the mongo packet's header data, but do not convert the + * values from little-endian. Use only when the source has the data in + * the right byte order already. + * + * @param p is the packet which header we seek. + * @param header is a pointer to a variable which will hold the data. + * + * @note Allocating the @a header is the responsibility of the caller. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean +mongo_wire_packet_get_header_raw (const mongo_packet *p, + mongo_packet_header *header); + +/** @internal Set the header data of a packet, without conversion. + * + * Override the mongo packet's header data, but do not convert the + * values from little-endian. Use only when the source has the data in + * the right byte order already. + * + * @note No sanity checks are done, use this function with great care. + * + * @param p is the packet whose header we want to override. + * @param header is the header structure to use. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean +mongo_wire_packet_set_header_raw (mongo_packet *p, + const mongo_packet_header *header); + +#endif diff --git a/src/mongo-client.c b/src/mongo-client.c new file mode 100644 index 0000000..a46cc0d --- /dev/null +++ b/src/mongo-client.c @@ -0,0 +1,331 @@ +/* mongo-client.c - libmongo-client user API + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-client.c + * MongoDB client API implementation. + */ + +#include "config.h" +#include "mongo-client.h" +#include "bson.h" +#include "mongo-wire.h" +#include "libmongo-private.h" + +#include <glib.h> + +#include <string.h> +#include <arpa/inet.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <netdb.h> +#include <sys/uio.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> +#include <stdlib.h> +#include <errno.h> + +#ifndef HAVE_MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +static const int one = 1; + +mongo_connection * +mongo_tcp_connect (const char *host, int port) +{ + struct addrinfo *res = NULL, *r; + struct addrinfo hints; + int e, fd = -1; + gchar *port_s; + mongo_connection *conn; + + if (!host) + { + errno = EINVAL; + return NULL; + } + + memset (&hints, 0, sizeof (hints)); + hints.ai_socktype = SOCK_STREAM; + +#ifdef __linux__ + hints.ai_flags = AI_ADDRCONFIG; +#endif + + port_s = g_strdup_printf ("%d", port); + e = getaddrinfo (host, port_s, &hints, &res); + if (e != 0) + { + int err = errno; + + g_free (port_s); + errno = err; + return NULL; + } + g_free (port_s); + + for (r = res; r != NULL; r = r->ai_next) + { + fd = socket (r->ai_family, r->ai_socktype, r->ai_protocol); + if (fd != -1 && connect (fd, r->ai_addr, r->ai_addrlen) == 0) + break; + if (fd != -1) + { + close (fd); + fd = -1; + } + } + freeaddrinfo (res); + + if (fd == -1) + { + errno = EADDRNOTAVAIL; + return NULL; + } + + setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof (one)); + + conn = g_new0 (mongo_connection, 1); + conn->fd = fd; + + return conn; +} + +static mongo_connection * +mongo_unix_connect (const char *path) +{ + int fd = -1; + mongo_connection *conn; + struct sockaddr_un remote; + + if (!path || strlen (path) >= sizeof (remote.sun_path)) + { + errno = path ? ENAMETOOLONG : EINVAL; + return NULL; + } + + fd = socket (AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) + { + errno = EADDRNOTAVAIL; + return NULL; + } + + remote.sun_family = AF_UNIX; + strncpy (remote.sun_path, path, sizeof (remote.sun_path)); + if (connect (fd, (struct sockaddr *)&remote, sizeof (remote)) == -1) + { + close (fd); + errno = EADDRNOTAVAIL; + return NULL; + } + + conn = g_new0 (mongo_connection, 1); + conn->fd = fd; + + return conn; +} + +mongo_connection * +mongo_connect (const char *address, int port) +{ + if (port == MONGO_CONN_LOCAL) + return mongo_unix_connect (address); + + return mongo_tcp_connect (address, port); +} + +#if VERSIONED_SYMBOLS +__asm__(".symver mongo_tcp_connect,mongo_connect@LMC_0.1.0"); +#endif + +void +mongo_disconnect (mongo_connection *conn) +{ + if (!conn) + { + errno = ENOTCONN; + return; + } + + if (conn->fd >= 0) + close (conn->fd); + + g_free (conn); + errno = 0; +} + +gboolean +mongo_packet_send (mongo_connection *conn, const mongo_packet *p) +{ + const guint8 *data; + gint32 data_size; + mongo_packet_header h; + struct iovec iov[2]; + struct msghdr msg; + + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + if (!p) + { + errno = EINVAL; + return FALSE; + } + + if (conn->fd < 0) + { + errno = EBADF; + return FALSE; + } + + if (!mongo_wire_packet_get_header_raw (p, &h)) + return FALSE; + + data_size = mongo_wire_packet_get_data (p, &data); + + if (data_size == -1) + return FALSE; + + iov[0].iov_base = (void *)&h; + iov[0].iov_len = sizeof (h); + iov[1].iov_base = (void *)data; + iov[1].iov_len = data_size; + + memset (&msg, 0, sizeof (struct msghdr)); + msg.msg_iov = iov; + msg.msg_iovlen = 2; + + if (sendmsg (conn->fd, &msg, MSG_NOSIGNAL) != (gint32)sizeof (h) + data_size) + return FALSE; + + conn->request_id = h.id; + + return TRUE; +} + +mongo_packet * +mongo_packet_recv (mongo_connection *conn) +{ + mongo_packet *p; + guint8 *data; + guint32 size; + mongo_packet_header h; + + if (!conn) + { + errno = ENOTCONN; + return NULL; + } + + if (conn->fd < 0) + { + errno = EBADF; + return NULL; + } + + memset (&h, 0, sizeof (h)); + if (recv (conn->fd, &h, sizeof (mongo_packet_header), + MSG_NOSIGNAL | MSG_WAITALL) != sizeof (mongo_packet_header)) + { + return NULL; + } + + h.length = GINT32_FROM_LE (h.length); + h.id = GINT32_FROM_LE (h.id); + h.resp_to = GINT32_FROM_LE (h.resp_to); + h.opcode = GINT32_FROM_LE (h.opcode); + + p = mongo_wire_packet_new (); + + if (!mongo_wire_packet_set_header_raw (p, &h)) + { + int e = errno; + + mongo_wire_packet_free (p); + errno = e; + return NULL; + } + + size = h.length - sizeof (mongo_packet_header); + data = g_new0 (guint8, size); + if ((guint32)recv (conn->fd, data, size, MSG_NOSIGNAL | MSG_WAITALL) != size) + { + int e = errno; + + g_free (data); + mongo_wire_packet_free (p); + errno = e; + return NULL; + } + + if (!mongo_wire_packet_set_data (p, data, size)) + { + int e = errno; + + g_free (data); + mongo_wire_packet_free (p); + errno = e; + return NULL; + } + + g_free (data); + + return p; +} + +gint32 +mongo_connection_get_requestid (const mongo_connection *conn) +{ + if (!conn) + { + errno = ENOTCONN; + return -1; + } + + return conn->request_id; +} + +gboolean +mongo_connection_set_timeout (mongo_connection *conn, gint timeout) +{ + struct timeval tv; + + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + if (timeout < 0) + { + errno = ERANGE; + return FALSE; + } + + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout % 1000) * 1000; + + if (setsockopt (conn->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof (tv)) == -1) + return FALSE; + if (setsockopt (conn->fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof (tv)) == -1) + return FALSE; + return TRUE; +} diff --git a/src/mongo-client.h b/src/mongo-client.h new file mode 100644 index 0000000..d31b273 --- /dev/null +++ b/src/mongo-client.h @@ -0,0 +1,116 @@ +/* mongo-client.h - libmongo-client user API + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-client.h + * MongoDB client API public header. + */ + +#ifndef LIBMONGO_CLIENT_H +#define LIBMONGO_CLIENT_H 1 + +#include <bson.h> +#include <mongo-wire.h> + +#include <glib.h> + +G_BEGIN_DECLS + +/** @defgroup mongo_client Mongo Client + * + * @addtogroup mongo_client + * @{ + */ + +/** Opaque MongoDB connection object type. */ +typedef struct _mongo_connection mongo_connection; + +/** Constant to signal that a connection is local (unix socket). + * + * When passed to mongo_connect() or mongo_sync_connect() as the port + * parameter, it signals that the address is to be interpreted as a + * unix socket path, not a hostname or IP. + */ +#define MONGO_CONN_LOCAL -1 + +/** Connect to a MongoDB server. + * + * Connects to a single MongoDB server. + * + * @param address is the address of the server (IP or unix socket path). + * @param port is the port to connect to, or #MONGO_CONN_LOCAL if + * address is a unix socket. + * + * @returns A newly allocated mongo_connection object or NULL on + * error. It is the responsibility of the caller to free it once it is + * not used anymore. + */ +mongo_connection *mongo_connect (const char *address, int port); + +/** Disconnect from a MongoDB server. + * + * @param conn is the connection object to disconnect from. + * + * @note This also frees up the object. + */ +void mongo_disconnect (mongo_connection *conn); + +/** Sends an assembled command packet to MongoDB. + * + * @param conn is the connection to use for sending. + * @param p is the packet to send. + * + * @returns TRUE on success, when the whole packet was sent, FALSE + * otherwise. + */ +gboolean mongo_packet_send (mongo_connection *conn, const mongo_packet *p); + +/** Receive a packet from MongoDB. + * + * @param conn is the connection to use for receiving. + * + * @returns A response packet, or NULL upon error. + */ +mongo_packet *mongo_packet_recv (mongo_connection *conn); + +/** Get the last requestID from a connection object. + * + * @param conn is the connection to get the requestID from. + * + * @returns The last requestID used, or -1 on error. + */ +gint32 mongo_connection_get_requestid (const mongo_connection *conn); + +/** Set a timeout for read/write operations on a connection + * + * On systems that support it, set a timeout for read/write operations + * on a socket. + * + * @param conn is the connection to set a timeout on. + * @param timeout is the timeout to set, in milliseconds. + * + * @returns TRUE on success, FALSE otherwise. + * + * @note The timeout is not preserved accross reconnects, if using the + * Sync API, however. It only applies to the active connection, and + * nothing else. + */ +gboolean mongo_connection_set_timeout (mongo_connection *conn, gint timeout); + +/** @} */ + +G_END_DECLS + +#endif diff --git a/src/mongo-sync-cursor.c b/src/mongo-sync-cursor.c new file mode 100644 index 0000000..b2492be --- /dev/null +++ b/src/mongo-sync-cursor.c @@ -0,0 +1,118 @@ +/* mongo-sync-cursor.c - libmongo-client cursor API on top of Sync + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-sync-cursor.c + * MongoDB Cursor API implementation. + */ + +#include "config.h" +#include "mongo.h" +#include "libmongo-private.h" + +#include <errno.h> + +mongo_sync_cursor * +mongo_sync_cursor_new (mongo_sync_connection *conn, const gchar *ns, + mongo_packet *packet) +{ + mongo_sync_cursor *c; + + if (!conn) + { + errno = ENOTCONN; + return NULL; + } + if (!ns || !packet) + { + errno = EINVAL; + return NULL; + } + + c = g_new0 (mongo_sync_cursor, 1); + c->conn = conn; + c->ns = g_strdup (ns); + c->results = packet; + c->offset = -1; + + mongo_wire_reply_packet_get_header (c->results, &c->ph); + + return c; +} + +gboolean +mongo_sync_cursor_next (mongo_sync_cursor *cursor) +{ + if (!cursor) + { + errno = EINVAL; + return FALSE; + } + errno = 0; + + if (cursor->offset >= cursor->ph.returned - 1) + { + gint32 ret = cursor->ph.returned; + gint64 cid = cursor->ph.cursor_id; + + mongo_wire_packet_free (cursor->results); + cursor->offset = -1; + cursor->results = mongo_sync_cmd_get_more (cursor->conn, cursor->ns, + ret, cid); + if (!cursor->results) + return FALSE; + mongo_wire_reply_packet_get_header (cursor->results, &cursor->ph); + } + cursor->offset++; + return TRUE; +} + +void +mongo_sync_cursor_free (mongo_sync_cursor *cursor) +{ + if (!cursor) + { + errno = ENOTCONN; + return; + } + errno = 0; + + mongo_sync_cmd_kill_cursors (cursor->conn, 1, cursor->ph.cursor_id); + g_free (cursor->ns); + mongo_wire_packet_free (cursor->results); + g_free (cursor); +} + +bson * +mongo_sync_cursor_get_data (mongo_sync_cursor *cursor) +{ + bson *r; + + if (!cursor) + { + errno = EINVAL; + return NULL; + } + + if (!mongo_wire_reply_packet_get_nth_document (cursor->results, + cursor->offset + 1, + &r)) + { + errno = ERANGE; + return NULL; + } + bson_finish (r); + return r; +} diff --git a/src/mongo-sync-cursor.h b/src/mongo-sync-cursor.h new file mode 100644 index 0000000..949cc65 --- /dev/null +++ b/src/mongo-sync-cursor.h @@ -0,0 +1,103 @@ +/* mongo-sync-cursor.h - libmongo-client cursor API on top of Sync + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-sync-cursor.h + * MongoDB cursor API public header. + * + * @addtogroup mongo_sync + * @{ + */ + +#ifndef LIBMONGO_SYNC_CURSOR_H +#define LIBMONGO_SYNC_CURSOR_H 1 + +#include <glib.h> +#include <mongo-sync.h> + +G_BEGIN_DECLS + +/** @defgroup mongo_sync_cursor Mongo Sync Cursor API + * + * @addtogroup mongo_sync_cursor + * @{ + */ + +/** Opaque Mongo Cursor object. */ +typedef struct _mongo_sync_cursor mongo_sync_cursor; + +/** Create a new MongoDB Cursor. + * + * This function can be used to create a new cursor, with which one + * can conveniently iterate over using mongo_sync_cursor_next(). + * + * The @a packet argument is supposed to be the output of - for + * example - mongo_sync_cmd_query(). + * + * @param conn is the connection to associate with the cursor. + * @param ns is the namespace to use with the cursor. + * @param packet is a reply packet on which the cursor should be + * based. The packet should not be freed or touched by the application + * afterwards, it will be handled by the cursor functions. + * + * @returns A newly allocated cursor, or NULL on error. + */ +mongo_sync_cursor *mongo_sync_cursor_new (mongo_sync_connection *conn, + const gchar *ns, + mongo_packet *packet); + +/** Iterate a MongoDB cursor. + * + * Iterating the cursor will move its position to the next document in + * the result set, querying the database if so need be. + * + * Queries will be done in bulks, provided that the original query was + * done so aswell. + * + * @param cursor is the cursor to advance. + * + * @returns TRUE if the cursor could be advanced, FALSE otherwise. If + * the cursor could not be advanced due to an error, then errno will + * be set appropriately. + */ +gboolean mongo_sync_cursor_next (mongo_sync_cursor *cursor); + +/** Retrieve the BSON document at the cursor's position. + * + * @param cursor is the cursor to retrieve data from. + * + * @returns A newly allocated BSON object, or NULL on failure. It is + * the responsiblity of the caller to free the BSON object once it is + * no longer needed. + */ +bson *mongo_sync_cursor_get_data (mongo_sync_cursor *cursor); + +/** Free a MongoDB cursor. + * + * Freeing a MongoDB cursor involves destroying the active cursor the + * database is holding, and then freeing up the resources allocated + * for it. + * + * @param cursor is the cursor to destroy. + */ +void mongo_sync_cursor_free (mongo_sync_cursor *cursor); + +/** @} */ + +/** @} */ + +G_END_DECLS + +#endif diff --git a/src/mongo-sync-pool.c b/src/mongo-sync-pool.c new file mode 100644 index 0000000..52f5042 --- /dev/null +++ b/src/mongo-sync-pool.c @@ -0,0 +1,269 @@ +/* mongo-sync-pool.c - libmongo-client connection pool implementation + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-sync-pool.c + * MongoDB connection pool API implementation. + */ + +#include <errno.h> +#include <string.h> +#include <glib.h> +#include <mongo.h> +#include "libmongo-private.h" + +/** @internal A connection pool object. */ +struct _mongo_sync_pool +{ + gint nmasters; /**< Number of master connections in the pool. */ + gint nslaves; /**< Number of slave connections in the pool. */ + + GList *masters; /**< List of master connections in the pool. */ + GList *slaves; /**< List of slave connections in the pool. */ +}; + +static mongo_sync_pool_connection * +_mongo_sync_pool_connect (const gchar *host, gint port, gboolean slaveok) +{ + mongo_sync_connection *c; + mongo_sync_pool_connection *conn; + + c = mongo_sync_connect (host, port, slaveok); + if (!c) + return NULL; + conn = g_realloc (c, sizeof (mongo_sync_pool_connection)); + conn->pool_id = 0; + conn->in_use = FALSE; + + return conn; +} + +mongo_sync_pool * +mongo_sync_pool_new (const gchar *host, + gint port, + gint nmasters, gint nslaves) +{ + mongo_sync_pool *pool; + mongo_sync_pool_connection *conn; + gint i, j = 0; + + if (!host || port < 0) + { + errno = EINVAL; + return NULL; + } + if (nmasters < 0 || nslaves < 0) + { + errno = ERANGE; + return NULL; + } + if (nmasters + nslaves <= 0) + { + errno = EINVAL; + return NULL; + } + + conn = _mongo_sync_pool_connect (host, port, FALSE); + if (!conn) + return FALSE; + + if (!mongo_sync_cmd_is_master ((mongo_sync_connection *)conn)) + { + mongo_sync_disconnect ((mongo_sync_connection *)conn); + errno = EPROTO; + return NULL; + } + + pool = g_new0 (mongo_sync_pool, 1); + pool->nmasters = nmasters; + pool->nslaves = nslaves; + + for (i = 0; i < pool->nmasters; i++) + { + mongo_sync_pool_connection *c; + + c = _mongo_sync_pool_connect (host, port, FALSE); + c->pool_id = i; + + pool->masters = g_list_append (pool->masters, c); + } + + for (i = 0; i < pool->nslaves; i++) + { + mongo_sync_pool_connection *c; + gchar *shost = NULL; + gint sport = 27017; + GList *l; + gboolean found = FALSE; + gboolean need_restart = (j != 0); + + /* Select the next secondary */ + l = g_list_nth (conn->super.rs.hosts, j); + + do + { + j++; + if (l && mongo_util_parse_addr ((gchar *)l->data, &shost, &sport)) + { + if (sport != port || strcmp (host, shost) != 0) + { + found = TRUE; + break; + } + } + l = g_list_next (l); + if (!l && need_restart) + { + need_restart = FALSE; + j = 0; + l = g_list_nth (conn->super.rs.hosts, j); + } + } + while (l); + + if (!found) + { + pool->nslaves = i - 1; + break; + } + + /* Connect to it*/ + c = _mongo_sync_pool_connect (shost, sport, TRUE); + c->pool_id = pool->nmasters + i + 1; + + pool->slaves = g_list_append (pool->slaves, c); + } + + mongo_sync_disconnect ((mongo_sync_connection *)conn); + return pool; +} + +void +mongo_sync_pool_free (mongo_sync_pool *pool) +{ + GList *l; + + if (!pool) + return; + + l = pool->masters; + while (l) + { + mongo_sync_disconnect ((mongo_sync_connection *)l->data); + l = g_list_delete_link (l, l); + } + + l = pool->slaves; + while (l) + { + mongo_sync_disconnect ((mongo_sync_connection *)l->data); + l = g_list_delete_link (l, l); + } + + g_free (pool); +} + +mongo_sync_pool_connection * +mongo_sync_pool_pick (mongo_sync_pool *pool, + gboolean want_master) +{ + GList *l; + + if (!pool) + { + errno = ENOTCONN; + return NULL; + } + + if (!want_master) + { + l = pool->slaves; + + while (l) + { + mongo_sync_pool_connection *c; + + c = (mongo_sync_pool_connection *)l->data; + if (!c->in_use) + { + c->in_use = TRUE; + return c; + } + l = g_list_next (l); + } + } + + l = pool->masters; + while (l) + { + mongo_sync_pool_connection *c; + + c = (mongo_sync_pool_connection *)l->data; + if (!c->in_use) + { + c->in_use = TRUE; + return c; + } + l = g_list_next (l); + } + + errno = EAGAIN; + return NULL; +} + +gboolean +mongo_sync_pool_return (mongo_sync_pool *pool, + mongo_sync_pool_connection *conn) +{ + if (!pool) + { + errno = ENOTCONN; + return FALSE; + } + if (!conn) + { + errno = EINVAL; + return FALSE; + } + + if (conn->pool_id > pool->nmasters) + { + mongo_sync_pool_connection *c; + + if (conn->pool_id - pool->nmasters > pool->nslaves || + pool->nslaves == 0) + { + errno = ERANGE; + return FALSE; + } + + c = (mongo_sync_pool_connection *)g_list_nth_data + (pool->slaves, conn->pool_id - pool->nmasters - 1); + c->in_use = FALSE; + return TRUE; + } + else + { + mongo_sync_pool_connection *c; + + c = (mongo_sync_pool_connection *)g_list_nth_data (pool->masters, + conn->pool_id); + c->in_use = FALSE; + return TRUE; + } + + errno = ENOENT; + return FALSE; +} diff --git a/src/mongo-sync-pool.h b/src/mongo-sync-pool.h new file mode 100644 index 0000000..8750815 --- /dev/null +++ b/src/mongo-sync-pool.h @@ -0,0 +1,133 @@ +/* mongo-sync-pool.h - libmongo-client connection pool API + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-sync-pool.h + * MongoDB connection pool API public header. + * + * @addtogroup mongo_sync + * @{ + */ + +#ifndef LIBMONGO_POOL_H +#define LIBMONGO_POOL_H 1 + +#include <mongo-sync.h> +#include <glib.h> + +G_BEGIN_DECLS + +/** @defgroup mongo_sync_pool_api Mongo Sync Pool API + * + * These commands implement connection pooling over the mongo_sync + * family of commands. + * + * Once a pool is set up, one can pick and return connections at one's + * leisure. Picking is done in a round-robin fashion (excluding + * connections that have been picked but not returned yet). + * + * @addtogroup mongo_sync_pool_api + * @{ + */ + +/** Opaque synchronous connection pool object. + * + * This represents a single connection within the pool. + */ +typedef struct _mongo_sync_pool_connection mongo_sync_pool_connection; + +/** Opaque synchronous pool object. + * + * This is the entire connection pool, with all its meta-data. + */ +typedef struct _mongo_sync_pool mongo_sync_pool; + +/** Create a new synchronous connection pool. + * + * Sets up a connection pool towards a given MongoDB server, and all + * its secondaries (if any). + * + * @param host is the address of the server. + * @param port is the port to connect to. + * @param nmasters is the number of connections to make towards the + * master. + * @param nslaves is the number of connections to make towards the + * secondaries. + * + * @note Either @a nmasters or @a nslaves can be zero, but not both at + * the same time. + * + * @note The @a host MUST be a master, otherwise the function will + * return an error. + * + * @returns A newly allocated mongo_sync_pool object, or NULL on + * error. It is the responsibility of the caller to close and free the + * pool when appropriate. + */ +mongo_sync_pool *mongo_sync_pool_new (const gchar *host, + gint port, + gint nmasters, gint nslaves); + +/** Close and free a synchronous connection pool. + * + * @param pool is the pool to shut down. + * + * @note The object will be freed, and shall not be used afterwards! + */ +void mongo_sync_pool_free (mongo_sync_pool *pool); + +/** Pick a connection from a synchronous connection pool. + * + * Based on given preferences, selects a free connection object from + * the pool, and returns it. + * + * @param pool is the pool to select from. + * @param want_master flags whether the caller wants a master connection, + * or secondaries are acceptable too. + * + * @note For write operations, always select a master! + * + * @returns A connection object from the pool. + * + * @note The returned object can be safely casted to + * mongo_sync_connection, and passed to any of the mongo_sync family + * of commands. Do note however, that one shall not close or otherwise + * free a connection object returned by this function. + */ +mongo_sync_pool_connection *mongo_sync_pool_pick (mongo_sync_pool *pool, + gboolean want_master); + +/** Return a connection to the synchronous connection pool. + * + * Once one is not using a connection anymore, it should be returned + * to the pool using this function. + * + * @param pool is the pool to return to. + * @param conn is the connection to return. + * + * @returns TRUE on success, FALSE otherwise. + * + * @note The returned connection should not be used afterwards. + */ +gboolean mongo_sync_pool_return (mongo_sync_pool *pool, + mongo_sync_pool_connection *conn); + +/** @} */ + +/** @} */ + +G_END_DECLS + +#endif diff --git a/src/mongo-sync.c b/src/mongo-sync.c new file mode 100644 index 0000000..cd37ec5 --- /dev/null +++ b/src/mongo-sync.c @@ -0,0 +1,2155 @@ +/* mongo-sync.c - libmongo-client synchronous wrapper API + * Copyright 2011, 2012, 2013, 2014 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-sync.c + * MongoDB synchronous wrapper API implementation. + */ + +#include "config.h" +#include "mongo.h" +#include "libmongo-private.h" + +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> +#include <sys/mman.h> + +static void +_list_free_full (GList **list) +{ + GList *l; + + if (!list || !*list) + return; + + l = *list; + while (l) + { + g_free (l->data); + l = g_list_delete_link (l, l); + } + + *list = NULL; +} + +static void +_mongo_auth_prop_destroy (gchar **prop) +{ + size_t l; + + if (!prop || !*prop) + return; + + l = strlen (*prop); + memset (*prop, 0, l); + munlock (*prop, l); + g_free (*prop); + + *prop = NULL; +} + +static void +_replica_set_free(replica_set *rs) +{ + g_free (rs->primary); + + _list_free_full (&rs->hosts); + _list_free_full (&rs->seeds); + + rs->hosts = NULL; + rs->seeds = NULL; + rs->primary = NULL; +} + +static GList * +_list_copy_full (GList *list) +{ + GList *new_list = NULL; + guint i; + + for (i = 0; i < g_list_length (list); i++) + { + gchar *data = (gchar *)g_list_nth_data (list, i); + new_list = g_list_append (new_list, g_strdup (data)); + } + + return new_list; +} + +static void +_recovery_cache_store (mongo_sync_conn_recovery_cache *cache, + mongo_sync_connection *conn) +{ + mongo_sync_conn_recovery_cache_discard (cache); + cache->rs.seeds = _list_copy_full (conn->rs.seeds); + cache->rs.hosts = _list_copy_full (conn->rs.hosts); + cache->rs.primary = g_strdup (conn->rs.primary); + + if (conn->auth.db) + { + cache->auth.db = g_strdup (conn->auth.db); + mlock (cache->auth.db, strlen (cache->auth.db)); + _mongo_auth_prop_destroy (&conn->auth.db); + } + + if (conn->auth.user) + { + cache->auth.user = g_strdup (conn->auth.user); + mlock (cache->auth.user, strlen (cache->auth.user)); + _mongo_auth_prop_destroy (&conn->auth.user); + } + + if (conn->auth.pw) + { + cache->auth.pw = g_strdup (conn->auth.pw); + mlock (cache->auth.pw, strlen (cache->auth.pw)); + _mongo_auth_prop_destroy (&conn->auth.pw); + } +} + +static void +_recovery_cache_load (mongo_sync_conn_recovery_cache *cache, + mongo_sync_connection *conn) +{ + conn->rs.seeds = _list_copy_full (cache->rs.seeds); + conn->rs.hosts = _list_copy_full (cache->rs.hosts); + conn->rs.primary = g_strdup (cache->rs.primary); + + _mongo_auth_prop_destroy (&conn->auth.db); + if (cache->auth.db) + { + conn->auth.db = g_strdup (cache->auth.db); + mlock (conn->auth.db, strlen (conn->auth.db)); + } + + _mongo_auth_prop_destroy (&conn->auth.user); + if (cache->auth.user) + { + conn->auth.user = g_strdup (cache->auth.user); + mlock (conn->auth.user, strlen (conn->auth.user)); + } + + _mongo_auth_prop_destroy (&conn->auth.pw); + if (cache->auth.pw) + { + conn->auth.pw = g_strdup (cache->auth.pw); + mlock (conn->auth.pw, strlen (conn->auth.pw)); + } + + conn->recovery_cache = cache; +} + +static void +_mongo_sync_conn_init (mongo_sync_connection *conn, gboolean slaveok) +{ + conn->slaveok = slaveok; + conn->safe_mode = FALSE; + conn->auto_reconnect = FALSE; + conn->last_error = NULL; + conn->max_insert_size = MONGO_SYNC_DEFAULT_MAX_INSERT_SIZE; + conn->recovery_cache = NULL; + conn->rs.seeds = NULL; + conn->rs.hosts = NULL; + conn->rs.primary = NULL; + conn->auth.db = NULL; + conn->auth.user = NULL; + conn->auth.pw = NULL; +} + +static mongo_sync_connection * +_recovery_cache_connect (mongo_sync_conn_recovery_cache *cache, + const gchar *address, gint port, + gboolean slaveok) +{ + mongo_sync_connection *s; + mongo_connection *c; + + c = mongo_connect (address, port); + if (!c) + return NULL; + s = g_realloc (c, sizeof (mongo_sync_connection)); + + _mongo_sync_conn_init (s, slaveok); + + if (!cache) + { + s->rs.seeds = g_list_append (NULL, g_strdup_printf ("%s:%d", address, port)); + } + else + { + _recovery_cache_load (cache, s); + } + + return s; +} + +mongo_sync_connection * +mongo_sync_connect (const gchar *address, gint port, + gboolean slaveok) +{ + return _recovery_cache_connect (NULL, address, port, slaveok); +} + +mongo_sync_connection * +mongo_sync_connect_0_1_0 (const gchar *host, gint port, + gboolean slaveok) +{ + return mongo_sync_connect (host, port, slaveok); +} + +#if VERSIONED_SYMBOLS +__asm__(".symver mongo_sync_connect_0_1_0,mongo_sync_connect@LMC_0.1.0"); +#endif + +gboolean +mongo_sync_conn_seed_add (mongo_sync_connection *conn, + const gchar *host, gint port) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + if (!host || port < 0) + { + errno = EINVAL; + return FALSE; + } + + conn->rs.seeds = g_list_append (conn->rs.seeds, + g_strdup_printf ("%s:%d", host, port)); + + return TRUE; +} + +static void +_mongo_sync_connect_replace (mongo_sync_connection *old, + mongo_sync_connection *new) +{ + if (!old || !new) + return; + + g_free (old->rs.primary); + old->rs.primary = NULL; + + /* Delete the host list. */ + _list_free_full (&old->rs.hosts); + + /* Free the replicaset struct in the new connection. These aren't + copied, in order to avoid infinite loops. */ + _list_free_full (&new->rs.hosts); + _list_free_full (&new->rs.seeds); + g_free (new->rs.primary); + + g_free (new->last_error); + if (old->super.fd && (old->super.fd != new->super.fd)) + close (old->super.fd); + + old->super.fd = new->super.fd; + old->super.request_id = -1; + old->slaveok = new->slaveok; + g_free (old->last_error); + old->last_error = NULL; + + g_free (new); +} + +mongo_sync_connection * +mongo_sync_reconnect (mongo_sync_connection *conn, + gboolean force_master) +{ + gboolean ping = FALSE; + guint i; + mongo_sync_connection *nc; + gchar *host; + gint port; + + if (!conn) + { + errno = ENOTCONN; + return NULL; + } + + ping = mongo_sync_cmd_ping (conn); + + if (ping) + { + if (!force_master) + return conn; + if (force_master && mongo_sync_cmd_is_master (conn)) + return conn; + + /* Force refresh the host list. */ + mongo_sync_cmd_is_master (conn); + } + + /* We either didn't ping, or we're not master, and have to + * reconnect. + * + * First, check if we have a primary, and if we can connect there. + */ + + if (conn->rs.primary) + { + if (mongo_util_parse_addr (conn->rs.primary, &host, &port)) + { + nc = mongo_sync_connect (host, port, conn->slaveok); + + g_free (host); + if (nc) + { + int e; + + /* We can call ourselves here, since connect does not set + conn->rs, thus, we won't end up in an infinite loop. */ + nc = mongo_sync_reconnect (nc, force_master); + e = errno; + _mongo_sync_connect_replace (conn, nc); + errno = e; + if (conn->auth.db && conn->auth.user && conn->auth.pw) + mongo_sync_cmd_authenticate (conn, conn->auth.db, + conn->auth.user, + conn->auth.pw); + return conn; + } + } + } + + /* No primary found, or we couldn't connect, try the rest of the + hosts. */ + + for (i = 0; i < g_list_length (conn->rs.hosts); i++) + { + gchar *addr = (gchar *)g_list_nth_data (conn->rs.hosts, i); + int e; + + if (!mongo_util_parse_addr (addr, &host, &port)) + continue; + + nc = mongo_sync_connect (host, port, conn->slaveok); + g_free (host); + if (!nc) + continue; + + nc = mongo_sync_reconnect (nc, force_master); + e = errno; + _mongo_sync_connect_replace (conn, nc); + errno = e; + + if (conn->auth.db && conn->auth.user && conn->auth.pw) + mongo_sync_cmd_authenticate (conn, conn->auth.db, + conn->auth.user, + conn->auth.pw); + + return conn; + } + + /* And if that failed too, try the seeds. */ + + for (i = 0; i < g_list_length (conn->rs.seeds); i++) + { + gchar *addr = (gchar *)g_list_nth_data (conn->rs.seeds, i); + int e; + + if (!mongo_util_parse_addr (addr, &host, &port)) + continue; + + nc = mongo_sync_connect (host, port, conn->slaveok); + + g_free (host); + + if (!nc) + continue; + + nc = mongo_sync_reconnect (nc, force_master); + e = errno; + _mongo_sync_connect_replace (conn, nc); + errno = e; + + if (conn->auth.db && conn->auth.user && conn->auth.pw) + mongo_sync_cmd_authenticate (conn, conn->auth.db, + conn->auth.user, + conn->auth.pw); + + return conn; + } + + errno = EHOSTUNREACH; + return NULL; +} + +void +mongo_sync_disconnect (mongo_sync_connection *conn) +{ + if (!conn) + return; + + g_free (conn->last_error); + + if (conn->recovery_cache) + { + _recovery_cache_store (conn->recovery_cache, conn); + } + + _mongo_auth_prop_destroy (&conn->auth.db); + _mongo_auth_prop_destroy (&conn->auth.user); + _mongo_auth_prop_destroy (&conn->auth.pw); + + _replica_set_free (&conn->rs); + + mongo_disconnect ((mongo_connection *)conn); +} + +gint32 +mongo_sync_conn_get_max_insert_size (mongo_sync_connection *conn) +{ + if (!conn) + { + errno = ENOTCONN; + return -1; + } + return conn->max_insert_size; +} + +gboolean +mongo_sync_conn_set_max_insert_size (mongo_sync_connection *conn, + gint32 max_size) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + if (max_size <= 0) + { + errno = ERANGE; + return FALSE; + } + + errno = 0; + conn->max_insert_size = max_size; + return TRUE; +} + +gboolean +mongo_sync_conn_get_safe_mode (const mongo_sync_connection *conn) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + errno = 0; + return conn->safe_mode; +} + +gboolean +mongo_sync_conn_set_safe_mode (mongo_sync_connection *conn, + gboolean safe_mode) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + errno = 0; + conn->safe_mode = safe_mode; + return TRUE; +} + +gboolean +mongo_sync_conn_get_auto_reconnect (const mongo_sync_connection *conn) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + errno = 0; + return conn->auto_reconnect; +} + +gboolean +mongo_sync_conn_set_auto_reconnect (mongo_sync_connection *conn, + gboolean auto_reconnect) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + conn->auto_reconnect = auto_reconnect; + return TRUE; +} + +gboolean +mongo_sync_conn_get_slaveok (const mongo_sync_connection *conn) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + errno = 0; + return conn->slaveok; +} + +gboolean +mongo_sync_conn_set_slaveok (mongo_sync_connection *conn, + gboolean slaveok) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + errno = 0; + conn->slaveok = slaveok; + return TRUE; +} + +#define _SLAVE_FLAG(c) ((c->slaveok) ? MONGO_WIRE_FLAG_QUERY_SLAVE_OK : 0) + +static inline gboolean +_mongo_cmd_ensure_conn (mongo_sync_connection *conn, + gboolean force_master) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + if (force_master || !conn->slaveok) + { + errno = 0; + if (!mongo_sync_cmd_is_master (conn)) + { + if (errno == EPROTO) + return FALSE; + if (!conn->auto_reconnect) + { + errno = ENOTCONN; + return FALSE; + } + if (!mongo_sync_reconnect (conn, TRUE)) + return FALSE; + } + return TRUE; + } + + errno = 0; + if (!mongo_sync_cmd_ping (conn)) + { + if (errno == EPROTO) + return FALSE; + if (!conn->auto_reconnect) + { + errno = ENOTCONN; + return FALSE; + } + if (!mongo_sync_reconnect (conn, FALSE)) + { + errno = ENOTCONN; + return FALSE; + } + } + errno = 0; + return TRUE; +} + +static inline gboolean +_mongo_cmd_verify_slaveok (mongo_sync_connection *conn) +{ + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + if (conn->slaveok || !conn->safe_mode) + return TRUE; + + errno = 0; + if (!mongo_sync_cmd_is_master (conn)) + { + if (errno == EPROTO) + return FALSE; + if (!conn->auto_reconnect) + { + errno = ENOTCONN; + return FALSE; + } + if (!mongo_sync_reconnect (conn, TRUE)) + return FALSE; + } + return TRUE; +} + +static inline gboolean +_mongo_sync_packet_send (mongo_sync_connection *conn, + mongo_packet *p, + gboolean force_master, + gboolean auto_reconnect) +{ + gboolean out = FALSE; + + if (force_master) + if (!_mongo_cmd_ensure_conn (conn, force_master)) + { + mongo_wire_packet_free (p); + return FALSE; + } + + for (;;) + { + if (!mongo_packet_send ((mongo_connection *)conn, p)) + { + int e = errno; + + if (!auto_reconnect || (conn && !conn->auto_reconnect)) + { + mongo_wire_packet_free (p); + errno = e; + return FALSE; + } + + if (out || !mongo_sync_reconnect (conn, force_master)) + { + mongo_wire_packet_free (p); + errno = e; + return FALSE; + } + + out = TRUE; + continue; + } + break; + } + mongo_wire_packet_free (p); + return TRUE; +} + +static inline mongo_packet * +_mongo_sync_packet_recv (mongo_sync_connection *conn, gint32 rid, gint32 flags) +{ + mongo_packet *p; + mongo_packet_header h; + mongo_reply_packet_header rh; + + p = mongo_packet_recv ((mongo_connection *)conn); + if (!p) + return NULL; + + if (!mongo_wire_packet_get_header_raw (p, &h)) + { + int e = errno; + + mongo_wire_packet_free (p); + errno = e; + return NULL; + } + + if (h.resp_to != rid) + { + mongo_wire_packet_free (p); + errno = EPROTO; + return NULL; + } + + if (!mongo_wire_reply_packet_get_header (p, &rh)) + { + int e = errno; + + mongo_wire_packet_free (p); + errno = e; + return NULL; + } + + if (rh.flags & flags) + { + mongo_wire_packet_free (p); + errno = EPROTO; + return NULL; + } + + if (rh.returned == 0) + { + mongo_wire_packet_free (p); + errno = ENOENT; + return NULL; + } + + return p; +} + +static gboolean +_mongo_sync_check_ok (bson *b) +{ + bson_cursor *c; + gdouble d; + + c = bson_find (b, "ok"); + if (!c) + { + errno = ENOENT; + return FALSE; + } + + if (!bson_cursor_get_double (c, &d)) + { + bson_cursor_free (c); + errno = EINVAL; + return FALSE; + } + bson_cursor_free (c); + errno = (d == 1) ? 0 : EPROTO; + return (d == 1); +} + +static gboolean +_mongo_sync_get_error (const bson *rep, gchar **error) +{ + bson_cursor *c; + + if (!error) + return FALSE; + + c = bson_find (rep, "err"); + if (!c) + { + c = bson_find (rep, "errmsg"); + if (!c) + { + errno = EPROTO; + return FALSE; + } + } + if (bson_cursor_type (c) == BSON_TYPE_NONE || + bson_cursor_type (c) == BSON_TYPE_NULL) + { + *error = NULL; + bson_cursor_free (c); + return TRUE; + } + else if (bson_cursor_type (c) == BSON_TYPE_STRING) + { + const gchar *err; + + bson_cursor_get_string (c, &err); + *error = g_strdup (err); + bson_cursor_free (c); + return TRUE; + } + errno = EPROTO; + return FALSE; +} + +static mongo_packet * +_mongo_sync_packet_check_error (mongo_sync_connection *conn, mongo_packet *p, + gboolean check_ok) +{ + bson *b; + gboolean error; + + if (!p) + return NULL; + + if (!mongo_wire_reply_packet_get_nth_document (p, 1, &b)) + { + mongo_wire_packet_free (p); + errno = EPROTO; + return NULL; + } + bson_finish (b); + + if (check_ok) + { + if (!_mongo_sync_check_ok (b)) + { + int e = errno; + + g_free (conn->last_error); + conn->last_error = NULL; + _mongo_sync_get_error (b, &conn->last_error); + bson_free (b); + mongo_wire_packet_free (p); + errno = e; + return NULL; + } + bson_free (b); + return p; + } + + g_free (conn->last_error); + conn->last_error = NULL; + error = _mongo_sync_get_error (b, &conn->last_error); + bson_free (b); + + if (error) + { + mongo_wire_packet_free (p); + return NULL; + } + return p; +} + +static inline gboolean +_mongo_sync_cmd_verify_result (mongo_sync_connection *conn, + const gchar *ns) +{ + gchar *error = NULL, *db, *tmp; + gboolean res; + + if (!conn || !ns) + return FALSE; + if (!conn->safe_mode) + return TRUE; + + tmp = g_strstr_len (ns, -1, "."); + if (tmp) + db = g_strndup (ns, tmp - ns); + else + db = g_strdup (ns); + + res = mongo_sync_cmd_get_last_error (conn, db, &error); + g_free (db); + res = res && ((error) ? FALSE : TRUE); + g_free (error); + + return res; +} + +static void +_set_last_error (mongo_sync_connection *conn, int err) +{ + g_free (conn->last_error); + conn->last_error = g_strdup(strerror(err)); +} + +gboolean +mongo_sync_cmd_update (mongo_sync_connection *conn, + const gchar *ns, + gint32 flags, const bson *selector, + const bson *update) +{ + mongo_packet *p; + gint32 rid; + + rid = mongo_connection_get_requestid ((mongo_connection *)conn) + 1; + + p = mongo_wire_cmd_update (rid, ns, flags, selector, update); + if (!p) + return FALSE; + + if (!_mongo_sync_packet_send (conn, p, TRUE, TRUE)) + return FALSE; + + return _mongo_sync_cmd_verify_result (conn, ns); +} + +gboolean +mongo_sync_cmd_insert_n (mongo_sync_connection *conn, + const gchar *ns, gint32 n, + const bson **docs) +{ + mongo_packet *p; + gint32 rid; + gint32 pos = 0, c, i = 0; + gint32 size = 0; + + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + if (!ns || !docs) + { + errno = EINVAL; + return FALSE; + } + if (n <= 0) + { + errno = EINVAL; + return FALSE; + } + + for (i = 0; i < n; i++) + { + if (bson_size (docs[i]) >= conn->max_insert_size) + { + errno = EMSGSIZE; + return FALSE; + } + } + + do + { + i = pos; + c = 0; + + while (i < n && size < conn->max_insert_size) + { + size += bson_size (docs[i++]); + c++; + } + size = 0; + if (i < n) + c--; + + rid = mongo_connection_get_requestid ((mongo_connection *)conn) + 1; + + p = mongo_wire_cmd_insert_n (rid, ns, c, &docs[pos]); + if (!p) + return FALSE; + + if (!_mongo_sync_packet_send (conn, p, TRUE, TRUE)) + { + _set_last_error (conn, errno); + return FALSE; + } + + if (!_mongo_sync_cmd_verify_result (conn, ns)) + return FALSE; + + pos += c; + } while (pos < n); + + return TRUE; +} + +gboolean +mongo_sync_cmd_insert (mongo_sync_connection *conn, + const gchar *ns, ...) +{ + gboolean b; + bson **docs, *d; + gint32 n = 0; + va_list ap; + + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + + if (!ns) + { + errno = EINVAL; + return FALSE; + } + + docs = (bson **)g_new0 (bson *, 1); + + va_start (ap, ns); + while ((d = (bson *)va_arg (ap, gpointer))) + { + if (bson_size (d) < 0) + { + g_free (docs); + errno = EINVAL; + return FALSE; + } + + docs = (bson **)g_renew (bson *, docs, n + 1); + docs[n++] = d; + } + va_end (ap); + + b = mongo_sync_cmd_insert_n (conn, ns, n, (const bson **)docs); + g_free (docs); + return b; +} + +mongo_packet * +mongo_sync_cmd_query (mongo_sync_connection *conn, + const gchar *ns, gint32 flags, + gint32 skip, gint32 ret, + const bson *query, const bson *sel) +{ + mongo_packet *p; + gint32 rid; + + if (!_mongo_cmd_verify_slaveok (conn)) + return FALSE; + + rid = mongo_connection_get_requestid ((mongo_connection *)conn) + 1; + + p = mongo_wire_cmd_query (rid, ns, flags | _SLAVE_FLAG (conn), + skip, ret, query, sel); + if (!p) + return NULL; + + if (!_mongo_sync_packet_send (conn, p, + !((conn && conn->slaveok) || + (flags & MONGO_WIRE_FLAG_QUERY_SLAVE_OK)), + TRUE)) + return NULL; + + p = _mongo_sync_packet_recv (conn, rid, MONGO_REPLY_FLAG_QUERY_FAIL); + return _mongo_sync_packet_check_error (conn, p, FALSE); +} + +mongo_packet * +mongo_sync_cmd_get_more (mongo_sync_connection *conn, + const gchar *ns, + gint32 ret, gint64 cursor_id) +{ + mongo_packet *p; + gint32 rid; + + if (!_mongo_cmd_verify_slaveok (conn)) + return FALSE; + + rid = mongo_connection_get_requestid ((mongo_connection *)conn) + 1; + + p = mongo_wire_cmd_get_more (rid, ns, ret, cursor_id); + if (!p) + return NULL; + + if (!_mongo_sync_packet_send (conn, p, FALSE, TRUE)) + return FALSE; + + p = _mongo_sync_packet_recv (conn, rid, MONGO_REPLY_FLAG_NO_CURSOR); + return _mongo_sync_packet_check_error (conn, p, FALSE); +} + +gboolean +mongo_sync_cmd_delete (mongo_sync_connection *conn, const gchar *ns, + gint32 flags, const bson *sel) +{ + mongo_packet *p; + gint32 rid; + + rid = mongo_connection_get_requestid ((mongo_connection *)conn) + 1; + + p = mongo_wire_cmd_delete (rid, ns, flags, sel); + if (!p) + return FALSE; + + return _mongo_sync_packet_send (conn, p, TRUE, TRUE); +} + +gboolean +mongo_sync_cmd_kill_cursors (mongo_sync_connection *conn, + gint32 n, ...) +{ + mongo_packet *p; + gint32 rid; + va_list ap; + + if (n <= 0) + { + errno = EINVAL; + return FALSE; + } + + rid = mongo_connection_get_requestid ((mongo_connection *)conn) + 1; + + va_start (ap, n); + p = mongo_wire_cmd_kill_cursors_va (rid, n, ap); + if (!p) + { + int e = errno; + + va_end (ap); + errno = e; + return FALSE; + } + va_end (ap); + + return _mongo_sync_packet_send (conn, p, FALSE, TRUE); +} + +static mongo_packet * +_mongo_sync_cmd_custom (mongo_sync_connection *conn, + const gchar *db, + const bson *command, + gboolean check_conn, + gboolean force_master) +{ + mongo_packet *p; + gint32 rid; + + if (!conn) + { + errno = ENOTCONN; + return NULL; + } + + rid = mongo_connection_get_requestid ((mongo_connection *)conn) + 1; + + p = mongo_wire_cmd_custom (rid, db, _SLAVE_FLAG (conn), command); + if (!p) + return NULL; + + if (!_mongo_sync_packet_send (conn, p, force_master, check_conn)) + return NULL; + + p = _mongo_sync_packet_recv (conn, rid, MONGO_REPLY_FLAG_QUERY_FAIL); + return _mongo_sync_packet_check_error (conn, p, TRUE); +} + +mongo_packet * +mongo_sync_cmd_custom (mongo_sync_connection *conn, + const gchar *db, + const bson *command) +{ + return _mongo_sync_cmd_custom (conn, db, command, TRUE, FALSE); +} + +gdouble +mongo_sync_cmd_count (mongo_sync_connection *conn, + const gchar *db, const gchar *coll, + const bson *query) +{ + mongo_packet *p; + bson *cmd; + bson_cursor *c; + gdouble d; + + cmd = bson_new_sized (bson_size (query) + 32); + bson_append_string (cmd, "count", coll, -1); + if (query) + bson_append_document (cmd, "query", query); + bson_finish (cmd); + + p = _mongo_sync_cmd_custom (conn, db, cmd, TRUE, FALSE); + if (!p) + { + int e = errno; + + bson_free (cmd); + errno = e; + return -1; + } + bson_free (cmd); + + if (!mongo_wire_reply_packet_get_nth_document (p, 1, &cmd)) + { + int e = errno; + + mongo_wire_packet_free (p); + errno = e; + return -1; + } + mongo_wire_packet_free (p); + bson_finish (cmd); + + c = bson_find (cmd, "n"); + if (!c) + { + bson_free (cmd); + errno = ENOENT; + return -1; + } + if (!bson_cursor_get_double (c, &d)) + { + bson_free (cmd); + bson_cursor_free (c); + errno = EINVAL; + return -1; + } + bson_cursor_free (c); + bson_free (cmd); + + return d; +} + +gboolean +mongo_sync_cmd_create (mongo_sync_connection *conn, + const gchar *db, const gchar *coll, + gint flags, ...) +{ + mongo_packet *p; + bson *cmd; + + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + if (!db || !coll) + { + errno = EINVAL; + return FALSE; + } + + cmd = bson_new_sized (128); + bson_append_string (cmd, "create", coll, -1); + if (flags & MONGO_COLLECTION_AUTO_INDEX_ID) + bson_append_boolean (cmd, "autoIndexId", TRUE); + if (flags & MONGO_COLLECTION_CAPPED || + flags & MONGO_COLLECTION_CAPPED_MAX || + flags & MONGO_COLLECTION_SIZED) + { + va_list ap; + gint64 i; + + if (flags & MONGO_COLLECTION_CAPPED || + flags & MONGO_COLLECTION_CAPPED_MAX) + bson_append_boolean (cmd, "capped", TRUE); + + va_start (ap, flags); + i = (gint64)va_arg (ap, gint64); + if (i <= 0) + { + bson_free (cmd); + errno = ERANGE; + return FALSE; + } + bson_append_int64 (cmd, "size", i); + + if (flags & MONGO_COLLECTION_CAPPED_MAX) + { + i = (gint64)va_arg (ap, gint64); + if (i <= 0) + { + bson_free (cmd); + errno = ERANGE; + return FALSE; + } + bson_append_int64 (cmd, "max", i); + } + va_end (ap); + } + bson_finish (cmd); + + p = _mongo_sync_cmd_custom (conn, db, cmd, TRUE, TRUE); + if (!p) + { + int e = errno; + + bson_free (cmd); + errno = e; + return FALSE; + } + bson_free (cmd); + mongo_wire_packet_free (p); + + return TRUE; +} + +bson * +mongo_sync_cmd_exists (mongo_sync_connection *conn, + const gchar *db, const gchar *coll) +{ + bson *cmd, *r; + mongo_packet *p; + gchar *ns, *sys; + gint32 rid; + + if (!conn) + { + errno = ENOTCONN; + return NULL; + } + if (!db || !coll) + { + errno = EINVAL; + return NULL; + } + + rid = mongo_connection_get_requestid ((mongo_connection *)conn) + 1; + + ns = g_strconcat (db, ".", coll, NULL); + cmd = bson_new_sized (128); + bson_append_string (cmd, "name", ns, -1); + bson_finish (cmd); + g_free (ns); + + sys = g_strconcat (db, ".system.namespaces", NULL); + + p = mongo_wire_cmd_query (rid, sys, _SLAVE_FLAG (conn), 0, 1, cmd, NULL); + if (!p) + { + int e = errno; + + bson_free (cmd); + g_free (sys); + + errno = e; + return NULL; + } + g_free (sys); + bson_free (cmd); + + if (!_mongo_sync_packet_send (conn, p, !conn->slaveok, TRUE)) + return NULL; + + p = _mongo_sync_packet_recv (conn, rid, MONGO_REPLY_FLAG_QUERY_FAIL); + if (!p) + return NULL; + + p = _mongo_sync_packet_check_error (conn, p, FALSE); + if (!p) + return NULL; + + if (!mongo_wire_reply_packet_get_nth_document (p, 1, &r)) + { + int e = errno; + + mongo_wire_packet_free (p); + errno = e; + return NULL; + } + mongo_wire_packet_free (p); + bson_finish (r); + + return r; +} + +gboolean +mongo_sync_cmd_drop (mongo_sync_connection *conn, + const gchar *db, const gchar *coll) +{ + mongo_packet *p; + bson *cmd; + + cmd = bson_new_sized (64); + bson_append_string (cmd, "drop", coll, -1); + bson_finish (cmd); + + p = _mongo_sync_cmd_custom (conn, db, cmd, TRUE, TRUE); + if (!p) + { + int e = errno; + + bson_free (cmd); + errno = e; + return FALSE; + } + bson_free (cmd); + mongo_wire_packet_free (p); + + return TRUE; +} + +gboolean +mongo_sync_cmd_get_last_error_full (mongo_sync_connection *conn, + const gchar *db, bson **error) +{ + mongo_packet *p; + bson *cmd; + + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + if (!error) + { + errno = EINVAL; + return FALSE; + } + + cmd = bson_new_sized (64); + bson_append_int32 (cmd, "getlasterror", 1); + bson_finish (cmd); + + p = _mongo_sync_cmd_custom (conn, db, cmd, FALSE, FALSE); + if (!p) + { + int e = errno; + + bson_free (cmd); + errno = e; + _set_last_error (conn, e); + return FALSE; + } + bson_free (cmd); + + if (!mongo_wire_reply_packet_get_nth_document (p, 1, error)) + { + int e = errno; + + mongo_wire_packet_free (p); + errno = e; + _set_last_error (conn, e); + return FALSE; + } + mongo_wire_packet_free (p); + bson_finish (*error); + + return TRUE; +} + +gboolean +mongo_sync_cmd_get_last_error (mongo_sync_connection *conn, + const gchar *db, gchar **error) +{ + bson *err_bson; + + if (!error) + { + errno = EINVAL; + return FALSE; + } + + if (!mongo_sync_cmd_get_last_error_full (conn, db, &err_bson)) + return FALSE; + + if (!_mongo_sync_get_error (err_bson, error)) + { + int e = errno; + + bson_free (err_bson); + errno = e; + _set_last_error (conn, e); + return FALSE; + } + bson_free (err_bson); + + if (*error == NULL) + *error = g_strdup (conn->last_error); + else + { + g_free (conn->last_error); + conn->last_error = g_strdup(*error); + } + + return TRUE; +} + +gboolean +mongo_sync_cmd_reset_error (mongo_sync_connection *conn, + const gchar *db) +{ + mongo_packet *p; + bson *cmd; + + if (conn) + { + g_free (conn->last_error); + conn->last_error = NULL; + } + + cmd = bson_new_sized (32); + bson_append_int32 (cmd, "reseterror", 1); + bson_finish (cmd); + + p = _mongo_sync_cmd_custom (conn, db, cmd, FALSE, FALSE); + if (!p) + { + int e = errno; + + bson_free (cmd); + errno = e; + return FALSE; + } + bson_free (cmd); + mongo_wire_packet_free (p); + return TRUE; +} + +gboolean +mongo_sync_cmd_is_master (mongo_sync_connection *conn) +{ + bson *cmd, *res, *hosts; + mongo_packet *p; + bson_cursor *c; + gboolean b; + + cmd = bson_new_sized (32); + bson_append_int32 (cmd, "ismaster", 1); + bson_finish (cmd); + + p = _mongo_sync_cmd_custom (conn, "system", cmd, FALSE, FALSE); + if (!p) + { + int e = errno; + + bson_free (cmd); + errno = e; + return FALSE; + } + bson_free (cmd); + + if (!mongo_wire_reply_packet_get_nth_document (p, 1, &res)) + { + int e = errno; + + mongo_wire_packet_free (p); + errno = e; + return FALSE; + } + mongo_wire_packet_free (p); + bson_finish (res); + + c = bson_find (res, "ismaster"); + if (!bson_cursor_get_boolean (c, &b)) + { + bson_cursor_free (c); + bson_free (res); + errno = EPROTO; + return FALSE; + } + bson_cursor_free (c); + + if (!b) + { + const gchar *s; + + /* We're not the master, so we should have a 'primary' key in + the response. */ + c = bson_find (res, "primary"); + if (bson_cursor_get_string (c, &s)) + { + g_free (conn->rs.primary); + conn->rs.primary = g_strdup (s); + } + bson_cursor_free (c); + } + + /* Find all the members of the set, and cache them. */ + c = bson_find (res, "hosts"); + if (!c) + { + bson_free (res); + errno = 0; + return b; + } + + if (!bson_cursor_get_array (c, &hosts)) + { + bson_cursor_free (c); + bson_free (res); + errno = 0; + return b; + } + bson_cursor_free (c); + bson_finish (hosts); + + /* Delete the old host list. */ + _list_free_full (&conn->rs.hosts); + + c = bson_cursor_new (hosts); + while (bson_cursor_next (c)) + { + const gchar *s; + + if (bson_cursor_get_string (c, &s)) + conn->rs.hosts = g_list_append (conn->rs.hosts, g_strdup (s)); + } + bson_cursor_free (c); + bson_free (hosts); + + c = bson_find (res, "passives"); + if (bson_cursor_get_array (c, &hosts)) + { + bson_cursor_free (c); + bson_finish (hosts); + + c = bson_cursor_new (hosts); + while (bson_cursor_next (c)) + { + const gchar *s; + + if (bson_cursor_get_string (c, &s)) + conn->rs.hosts = g_list_append (conn->rs.hosts, g_strdup (s)); + } + bson_free (hosts); + } + bson_cursor_free (c); + + bson_free (res); + errno = 0; + return b; +} + +gboolean +mongo_sync_cmd_ping (mongo_sync_connection *conn) +{ + bson *cmd; + mongo_packet *p; + + cmd = bson_new_sized (32); + bson_append_int32 (cmd, "ping", 1); + bson_finish (cmd); + + p = _mongo_sync_cmd_custom (conn, "system", cmd, FALSE, FALSE); + if (!p) + { + int e = errno; + + bson_free (cmd); + errno = e; + return FALSE; + } + bson_free (cmd); + mongo_wire_packet_free (p); + + errno = 0; + return TRUE; +} + +static gchar * +_pass_digest (const gchar *user, const gchar *pw) +{ + GChecksum *chk; + gchar *digest; + + chk = g_checksum_new (G_CHECKSUM_MD5); + g_checksum_update (chk, (const guchar *)user, -1); + g_checksum_update (chk, (const guchar *)":mongo:", 7); + g_checksum_update (chk, (const guchar *)pw, -1); + digest = g_strdup (g_checksum_get_string (chk)); + g_checksum_free (chk); + + return digest; +} + +gboolean +mongo_sync_cmd_user_add_with_roles (mongo_sync_connection *conn, + const gchar *db, + const gchar *user, + const gchar *pw, + const bson *roles) +{ + bson *s, *u; + gchar *userns; + gchar *hex_digest; + + if (!db || !user || !pw) + { + errno = EINVAL; + return FALSE; + } + + userns = g_strconcat (db, ".system.users", NULL); + + hex_digest = _pass_digest (user, pw); + + s = bson_build (BSON_TYPE_STRING, "user", user, -1, + BSON_TYPE_NONE); + bson_finish (s); + u = bson_build_full (BSON_TYPE_DOCUMENT, "$set", TRUE, + bson_build (BSON_TYPE_STRING, "pwd", hex_digest, -1, + BSON_TYPE_NONE), + BSON_TYPE_NONE); + if (roles) + bson_append_array (u, "roles", roles); + bson_finish (u); + g_free (hex_digest); + + if (!mongo_sync_cmd_update (conn, userns, MONGO_WIRE_FLAG_UPDATE_UPSERT, + s, u)) + { + int e = errno; + + bson_free (s); + bson_free (u); + g_free (userns); + errno = e; + return FALSE; + } + bson_free (s); + bson_free (u); + g_free (userns); + + return TRUE; +} + +gboolean +mongo_sync_cmd_user_add (mongo_sync_connection *conn, + const gchar *db, + const gchar *user, + const gchar *pw) +{ + return mongo_sync_cmd_user_add_with_roles (conn, db, user, pw, NULL); +} + +gboolean +mongo_sync_cmd_user_remove (mongo_sync_connection *conn, + const gchar *db, + const gchar *user) +{ + bson *s; + gchar *userns; + + if (!db || !user) + { + errno = EINVAL; + return FALSE; + } + + userns = g_strconcat (db, ".system.users", NULL); + + s = bson_build (BSON_TYPE_STRING, "user", user, -1, + BSON_TYPE_NONE); + bson_finish (s); + + if (!mongo_sync_cmd_delete (conn, userns, 0, s)) + { + int e = errno; + + bson_free (s); + g_free (userns); + errno = e; + return FALSE; + } + bson_free (s); + g_free (userns); + + return TRUE; +} + +gboolean +mongo_sync_cmd_authenticate (mongo_sync_connection *conn, + const gchar *db, + const gchar *user, + const gchar *pw) +{ + bson *b; + mongo_packet *p; + const gchar *s; + gchar *nonce; + bson_cursor *c; + + GChecksum *chk; + gchar *hex_digest; + const gchar *digest; + gchar *ndb, *nuser, *npw; + + if (!db || !user || !pw) + { + errno = EINVAL; + return FALSE; + } + + /* Obtain nonce */ + b = bson_new_sized (32); + bson_append_int32 (b, "getnonce", 1); + bson_finish (b); + + p = mongo_sync_cmd_custom (conn, db, b); + if (!p) + { + int e = errno; + + bson_free (b); + errno = e; + return FALSE; + } + bson_free (b); + + if (!mongo_wire_reply_packet_get_nth_document (p, 1, &b)) + { + int e = errno; + + mongo_wire_packet_free (p); + errno = e; + return FALSE; + } + mongo_wire_packet_free (p); + bson_finish (b); + + c = bson_find (b, "nonce"); + if (!c) + { + bson_free (b); + errno = EPROTO; + return FALSE; + } + if (!bson_cursor_get_string (c, &s)) + { + bson_free (b); + errno = EPROTO; + return FALSE; + } + nonce = g_strdup (s); + bson_cursor_free (c); + bson_free (b); + + /* Generate the password digest. */ + hex_digest = _pass_digest (user, pw); + + /* Generate the key */ + chk = g_checksum_new (G_CHECKSUM_MD5); + g_checksum_update (chk, (const guchar *)nonce, -1); + g_checksum_update (chk, (const guchar *)user, -1); + g_checksum_update (chk, (const guchar *)hex_digest, -1); + g_free (hex_digest); + + digest = g_checksum_get_string (chk); + + /* Run the authenticate command. */ + b = bson_build (BSON_TYPE_INT32, "authenticate", 1, + BSON_TYPE_STRING, "user", user, -1, + BSON_TYPE_STRING, "nonce", nonce, -1, + BSON_TYPE_STRING, "key", digest, -1, + BSON_TYPE_NONE); + bson_finish (b); + g_free (nonce); + g_checksum_free (chk); + + p = mongo_sync_cmd_custom (conn, db, b); + if (!p) + { + int e = errno; + + bson_free (b); + errno = e; + return FALSE; + } + bson_free (b); + mongo_wire_packet_free (p); + + ndb = g_strdup (db); + _mongo_auth_prop_destroy (&conn->auth.db); + conn->auth.db = ndb; + mlock (conn->auth.db, strlen (ndb)); + + nuser = g_strdup (user); + _mongo_auth_prop_destroy (&conn->auth.user); + conn->auth.user = nuser; + mlock (conn->auth.user, strlen (nuser)); + + npw = g_strdup (pw); + _mongo_auth_prop_destroy (&conn->auth.pw); + conn->auth.pw = npw; + mlock (conn->auth.pw, strlen (npw)); + + return TRUE; +} + +static GString * +_mongo_index_gen_name (const bson *key) +{ + bson_cursor *c; + GString *name; + + name = g_string_new ("_"); + c = bson_cursor_new (key); + while (bson_cursor_next (c)) + { + gint64 v = 0; + + g_string_append (name, bson_cursor_key (c)); + g_string_append_c (name, '_'); + + switch (bson_cursor_type (c)) + { + case BSON_TYPE_BOOLEAN: + { + gboolean vb; + + bson_cursor_get_boolean (c, &vb); + v = vb; + break; + } + case BSON_TYPE_INT32: + { + gint32 vi; + + bson_cursor_get_int32 (c, &vi); + v = vi; + break; + } + case BSON_TYPE_INT64: + { + gint64 vl; + + bson_cursor_get_int64 (c, &vl); + v = vl; + break; + } + case BSON_TYPE_DOUBLE: + { + gdouble vd; + + bson_cursor_get_double (c, &vd); + v = (gint64)vd; + break; + } + default: + bson_cursor_free (c); + g_string_free (name, TRUE); + return NULL; + } + if (v != 0) + g_string_append_printf (name, "%" G_GINT64_FORMAT "_", v); + } + bson_cursor_free (c); + + return name; +} + +gboolean +mongo_sync_cmd_index_create (mongo_sync_connection *conn, + const gchar *ns, + const bson *key, + gint options) +{ + GString *name; + gchar *idxns, *t; + bson *cmd; + + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + if (!ns || !key) + { + errno = EINVAL; + return FALSE; + } + if (strchr (ns, '.') == NULL) + { + errno = EINVAL; + return FALSE; + } + + name = _mongo_index_gen_name (key); + if (!name) + { + errno = ENOTSUP; + return FALSE; + } + + cmd = bson_new_sized (bson_size (key) + name->len + 128); + bson_append_document (cmd, "key", key); + bson_append_string (cmd, "ns", ns, -1); + bson_append_string (cmd, "name", name->str, name->len); + if (options & MONGO_INDEX_UNIQUE) + bson_append_boolean (cmd, "unique", TRUE); + if (options & MONGO_INDEX_DROP_DUPS) + bson_append_boolean (cmd, "dropDups", TRUE); + if (options & MONGO_INDEX_BACKGROUND) + bson_append_boolean (cmd, "background", TRUE); + if (options & MONGO_INDEX_SPARSE) + bson_append_boolean (cmd, "sparse", TRUE); + bson_finish (cmd); + g_string_free (name, TRUE); + + t = g_strdup (ns); + *(strchr (t, '.')) = '\0'; + idxns = g_strconcat (t, ".system.indexes", NULL); + g_free (t); + + if (!mongo_sync_cmd_insert_n (conn, idxns, 1, (const bson **)&cmd)) + { + int e = errno; + + bson_free (cmd); + g_free (idxns); + errno = e; + return FALSE; + } + bson_free (cmd); + g_free (idxns); + + return TRUE; +} + +static gboolean +_mongo_sync_cmd_index_drop (mongo_sync_connection *conn, + const gchar *full_ns, + const gchar *index_name) +{ + bson *cmd; + gchar *db, *ns; + mongo_packet *p; + + if (!conn) + { + errno = ENOTCONN; + return FALSE; + } + if (!full_ns || !index_name) + { + errno = EINVAL; + return FALSE; + } + ns = strchr (full_ns, '.'); + if (ns == NULL) + { + errno = EINVAL; + return FALSE; + } + ns++; + + cmd = bson_new_sized (256 + strlen (index_name)); + bson_append_string (cmd, "deleteIndexes", ns, -1); + bson_append_string (cmd, "index", index_name, -1); + bson_finish (cmd); + + db = g_strndup (full_ns, ns - full_ns - 1); + p = mongo_sync_cmd_custom (conn, db, cmd); + if (!p) + { + int e = errno; + + bson_free (cmd); + g_free (db); + errno = e; + return FALSE; + } + mongo_wire_packet_free (p); + g_free (db); + bson_free (cmd); + + return TRUE; +} + +gboolean +mongo_sync_cmd_index_drop (mongo_sync_connection *conn, + const gchar *ns, + const bson *key) +{ + GString *name; + gboolean b; + + if (!key) + { + errno = EINVAL; + return FALSE; + } + + name = _mongo_index_gen_name (key); + + b = _mongo_sync_cmd_index_drop (conn, ns, name->str); + g_string_free (name, TRUE); + return b; +} + +gboolean +mongo_sync_cmd_index_drop_all (mongo_sync_connection *conn, + const gchar *ns) +{ + return _mongo_sync_cmd_index_drop (conn, ns, "*"); +} + +mongo_sync_conn_recovery_cache * +mongo_sync_conn_recovery_cache_new (void) +{ + mongo_sync_conn_recovery_cache *cache; + + cache = g_new0 (mongo_sync_conn_recovery_cache, 1); + + return cache; +} + +void +mongo_sync_conn_recovery_cache_free (mongo_sync_conn_recovery_cache *cache) +{ + mongo_sync_conn_recovery_cache_discard(cache); + + g_free(cache); +} + +void +mongo_sync_conn_recovery_cache_discard (mongo_sync_conn_recovery_cache *cache) +{ + _mongo_auth_prop_destroy (&cache->auth.db); + _mongo_auth_prop_destroy (&cache->auth.user); + _mongo_auth_prop_destroy (&cache->auth.pw); + + _replica_set_free (&cache->rs); +} + +gboolean +mongo_sync_conn_recovery_cache_seed_add (mongo_sync_conn_recovery_cache *cache, + const gchar *host, + gint port) +{ + if (!host) + { + errno = EINVAL; + return FALSE; + } + + cache->rs.seeds = g_list_append (cache->rs.seeds, g_strdup_printf ("%s:%d", host, port)); + + return TRUE; +} + +static mongo_sync_connection * +_recovery_cache_pick_connect_from_list (mongo_sync_conn_recovery_cache *cache, + GList *address_list, + gboolean slaveok) +{ + gint port; + guint i; + gchar *host; + mongo_sync_connection *c = NULL; + + if (address_list) + { + for (i = 0; i < g_list_length (address_list); i++) + { + gchar *addr = (gchar *)g_list_nth_data (address_list, i); + + if (!mongo_util_parse_addr (addr, &host, &port)) + continue; + + c = _recovery_cache_connect (cache, host, port, slaveok); + g_free (host); + if (c) + { + if (slaveok) + return c; + mongo_sync_conn_recovery_cache_discard (c->recovery_cache); + return mongo_sync_reconnect (c, FALSE); + } + } + } + + return NULL; +} + +mongo_sync_connection * +mongo_sync_connect_recovery_cache (mongo_sync_conn_recovery_cache *cache, + gboolean slaveok) +{ + mongo_sync_connection *c = NULL; + gchar *host; + gint port; + + if (cache->rs.primary && mongo_util_parse_addr (cache->rs.primary, &host, &port)) + { + if ( (c = _recovery_cache_connect (cache, host, port, slaveok)) ) + { + g_free (host); + if (slaveok) + return c; + mongo_sync_conn_recovery_cache_discard (c->recovery_cache); + return mongo_sync_reconnect (c, FALSE); + } + } + + c = _recovery_cache_pick_connect_from_list (cache, cache->rs.seeds, slaveok); + + if (!c) + c = _recovery_cache_pick_connect_from_list (cache, cache->rs.hosts, slaveok); + + return c; +} + +const gchar * +mongo_sync_conn_get_last_error (mongo_sync_connection *conn) +{ + return conn->last_error; +} diff --git a/src/mongo-sync.h b/src/mongo-sync.h new file mode 100644 index 0000000..0ae813b --- /dev/null +++ b/src/mongo-sync.h @@ -0,0 +1,640 @@ +/* mongo-sync.h - libmongo-client synchronous wrapper API + * Copyright 2011, 2012, 2013, 2014 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-sync.h + * MongoDB synchronous wrapper API public header. + */ + +#ifndef LIBMONGO_SYNC_H +#define LIBMONGO_SYNC_H 1 + +#include <mongo-client.h> + +#include <glib.h> + +G_BEGIN_DECLS + +/** Default maximum size for a single bulk insert. + * + * Defaults to somewhat shy of 4Mb. + */ +#define MONGO_SYNC_DEFAULT_MAX_INSERT_SIZE 4 * 1000 * 1000 + +/** @defgroup mongo_sync Mongo Sync API + * + * These commands provide wrappers for the most often used MongoDB + * commands. All of these will send the command, and receive any + * results, thus saving the caller from having to do that himself. + * + * However, these are only of use when blocking the application is not + * an issue. For asynchronous operation, one should still construct + * the packets himself, and send / receive when appropriate. + * + * @addtogroup mongo_sync + * @{ + */ + +/** Opaque synchronous connection object. */ +typedef struct _mongo_sync_connection mongo_sync_connection; + +/** synchronous connection recovery cache object */ +typedef struct _mongo_sync_conn_recovery_cache mongo_sync_conn_recovery_cache; + +/** Create a new connection recovery cache object. + * + * @return the newly created recovery cache object + */ +mongo_sync_conn_recovery_cache *mongo_sync_conn_recovery_cache_new (void); + +/** Free a connection recovery cache object. + * + * @param cache is the recovery cache object + */ +void mongo_sync_conn_recovery_cache_free (mongo_sync_conn_recovery_cache *cache); + +/** Discards a connection recovery cache object. + * + * @param cache is the recovery cache object + */ +void mongo_sync_conn_recovery_cache_discard (mongo_sync_conn_recovery_cache *cache); + +/** Add a seed to a connection recovery cache object. + * + * The seed list will be used for reconnects, prioritized before the + * automatically discovered host list. + * + * @param cache is the connection recovery cache to add a seed to. + * @param host is the seed host to add. + * @param port is the seed's port. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_conn_recovery_cache_seed_add (mongo_sync_conn_recovery_cache *cache, + const gchar *host, gint port); + +/** Synchronously connect to a MongoDB server using an external + * connection recovery cache object. + * + * Sets up a synchronous connection to a MongoDB server. + * + * @param cache is the externally managed connection recovery cache object. + * @param slaveok signals whether queries made against a slave are + * acceptable. + * + * @returns A newly allocated mongo_sync_connection object, or NULL on + * error. It is the responsibility of the caller to close and free the + * connection when appropriate. + */ +mongo_sync_connection *mongo_sync_connect_recovery_cache (mongo_sync_conn_recovery_cache *cache, + gboolean slaveok); + +/** Synchronously connect to a MongoDB server. + * + * Sets up a synchronous connection to a MongoDB server. + * + * @param address is the address of the server (IP or unix socket path). + * @param port is the port to connect to, or #MONGO_CONN_LOCAL if + * address is a unix socket. + * @param slaveok signals whether queries made against a slave are + * acceptable. + * + * @returns A newly allocated mongo_sync_connection object, or NULL on + * error. It is the responsibility of the caller to close and free the + * connection when appropriate. + */ +mongo_sync_connection *mongo_sync_connect (const gchar *address, + gint port, + gboolean slaveok); + +/** Add a seed to an existing MongoDB connection. + * + * The seed list will be used for reconnects, prioritized before the + * automatically discovered host list. + * + * @param conn is the connection to add a seed to. + * @param host is the seed host to add. + * @param port is the seed's port. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_conn_seed_add (mongo_sync_connection *conn, + const gchar *host, gint port); + +/** Attempt to connect to another member of a replica set. + * + * Given an existing connection, this function will try to connect to + * an available node (enforcing that it's a primary, if asked to) by + * trying all known hosts until it finds one available. + * + * @param conn is an existing MongoDB connection. + * @param force_master signals whether a primary node should be found. + * + * @returns A mongo_sync_collection object, or NULL if the reconnect fails + * for one reason or the other. + * + * @note The original connection object will be updated too! + */ +mongo_sync_connection *mongo_sync_reconnect (mongo_sync_connection *conn, + gboolean force_master); + +/** Close and free a synchronous MongoDB connection. + * + * @param conn is the connection to close. + * + * @note The object will be freed, and shall not be used afterwards! + */ +void mongo_sync_disconnect (mongo_sync_connection *conn); + +/** Retrieve the state of the SLAVE_OK flag from a sync connection. + * + * @param conn is the connection to check the flag on. + * + * @returns The state of the SLAVE_OK flag. + */ +gboolean mongo_sync_conn_get_slaveok (const mongo_sync_connection *conn); + +/** Set the SLAVE_OK flag on a sync connection. + * + * @param conn is the connection to set the flag on. + * @param slaveok is the state to set. + * + * @returns TRUE on sucess, FALSE otherwise. + */ +gboolean mongo_sync_conn_set_slaveok (mongo_sync_connection *conn, + gboolean slaveok); + +/** Retrieve the state of the safe mode flag from a sync connection. + * + * @param conn is the connection to check the flag on. + * + * @returns The state of the safe mode flag. + */ +gboolean mongo_sync_conn_get_safe_mode (const mongo_sync_connection *conn); + +/** Set the safe mode flag on a sync connection. + * + * Enabling safe mode will result in an additional getLastError() call + * after each insert or update, and extra checks performed on other + * commands aswell. + * + * The upside is more guarantees that the commands succeed, at the + * expense of network traffic and speed. + * + * @param conn is the connection to set the flag on. + * @param safe_mode is the state to set it to. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_conn_set_safe_mode (mongo_sync_connection *conn, + gboolean safe_mode); + +/** Get the state of the auto-reconnect flag from a sync connection. + * + * @param conn is the connection to check the flag on. + * + * @returns The state of the auto-reconnect flag. + */ +gboolean mongo_sync_conn_get_auto_reconnect (const mongo_sync_connection *conn); + +/** Set the state of the auto-reconnect flag on a sync connection. + * + * When auto-reconnect is enabled, the library will automatically + * attempt to reconnect to a server behind the scenes, when it detects + * an error. + * + * If safe-mode is turned on aswell, then auto-reconnect will only + * happen if the error is detected before a command is sent towards + * the database. + * + * @param conn is the connection to set auto-reconnect on. + * @param auto_reconnect is the state to set it to. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_conn_set_auto_reconnect (mongo_sync_connection *conn, + gboolean auto_reconnect); + +/** Get the maximum size of a bulk insert package. + * + * @param conn is the connection to get the maximum size from. + * + * @returns The maximum size, or -1 on failiure. + */ +gint32 mongo_sync_conn_get_max_insert_size (mongo_sync_connection *conn); + +/** Set the maximum size of a bulk insert package. + * + * When inserting multiple documents at a time, the library can + * automatically split the pack up into smaller chunks. With this + * function, one can set the maximum size, past which, the request + * will be split into smaller chunks. + * + * @param conn is the connection to set the maximum size for. + * @param max_size is the maximum size, in bytes. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_conn_set_max_insert_size (mongo_sync_connection *conn, + gint32 max_size); + +/** Send an update command to MongoDB. + * + * Constructs and sends an update command to MongoDB. + * + * @param conn is the connection to work with. + * @param ns is the namespace to work in. + * @param flags are the flags for the update command. See + * mongo_wire_cmd_update(). + * @param selector is the BSON document that will act as the selector. + * @param update is the BSON document that contains the updated + * values. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_update (mongo_sync_connection *conn, + const gchar *ns, + gint32 flags, const bson *selector, + const bson *update); + +/** Send an insert command to MongoDB. + * + * Constructs and sends an insert command to MongodB. + * + * @param conn is the connection to work with. + * @param ns is the namespace to work in. + * @tparam docs are the documents to insert. One must close the list + * with a NULL value. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_insert (mongo_sync_connection *conn, + const gchar *ns, ...) G_GNUC_NULL_TERMINATED; + + +/** Send an insert command to MongoDB. + * + * Constructs and sends an insert command to MongodB. + * + * @param conn is the connection to work with. + * @param ns is the namespace to work in. + * @param n is the number of documents to insert. + * @param docs is the array the documents to insert. There must be at + * least @a n documents in the array. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_insert_n (mongo_sync_connection *conn, + const gchar *ns, gint32 n, + const bson **docs); + +/** Send a query command to MongoDB. + * + * @param conn is the connection to work with. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @param flags are the query options. See mongo_wire_cmd_query(). + * @param skip is the number of documents to skip. + * @param ret is the number of documents to return. + * @param query is the query BSON object. + * @param sel is the (optional) selector BSON object indicating the + * fields to return. Passing NULL will return all fields. + * + * @returns A newly allocated reply packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_sync_cmd_query (mongo_sync_connection *conn, + const gchar *ns, gint32 flags, + gint32 skip, gint32 ret, const bson *query, + const bson *sel); + +/** Send a get more command to MongoDB. + * + * @param conn is the connection to work with. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @param ret is the number of documents to return. + * @param cursor_id is the ID of the cursor to use. + * + * @returns A newly allocated reply packet, or NULL on error. It is + * the responsibility of the caller to free the packet once it is not + * used anymore. + */ +mongo_packet *mongo_sync_cmd_get_more (mongo_sync_connection *conn, + const gchar *ns, + gint32 ret, gint64 cursor_id); + +/** Send a delete command to MongoDB. + * + * @param conn is the connection to work with. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @param flags are the delete options. See mongo_wire_cmd_delete(). + * @param sel is the BSON object to use as a selector. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_delete (mongo_sync_connection *conn, const gchar *ns, + gint32 flags, const bson *sel); + +/** Send a kill_cursors command to MongoDB. + * + * @param conn is the connection to work with. + * @param n is the number of cursors to kill. + * @tparam cursor_ids is the list of cursor ids to kill. + * + * @note One must supply exaclty @a n number of cursor IDs. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_kill_cursors (mongo_sync_connection *conn, + gint32 n, ...); + +/** Send a custom command to MongoDB. + * + * Custom commands are queries run in the db.$cmd namespace. The + * commands themselves are queries, and as such, BSON objects. + * + * @param conn is the connection to work with. + * @param db is the database in which the command shall be run. + * @param command is the BSON object representing the command. + * + * @returns A newly allocated reply packet, or NULL on error. It is + * the responsibility of the caller to free the packet once it is not + * used anymore. + */ +mongo_packet *mongo_sync_cmd_custom (mongo_sync_connection *conn, + const gchar *db, + const bson *command); + +/** Send a count() command to MongoDB. + * + * The count command is an efficient way to count tha available + * documents matching a selector. + * + * @param conn is the connection to work with. + * @param db is the name of the database. + * @param coll is the name of the collection. + * @param query is the optional selector (NULL will count all + * documents within the collection). + * + * @returns The number of matching documents, or -1 on error. + */ +gdouble mongo_sync_cmd_count (mongo_sync_connection *conn, + const gchar *db, const gchar *coll, + const bson *query); + +/** Flags that can be set during collection creation. */ +enum + { + /** Default options. */ + MONGO_COLLECTION_DEFAULTS = 0, + /** The collection is capped. */ + MONGO_COLLECTION_CAPPED = 1 << 0, + /** The collection is capped by element number aswell. */ + MONGO_COLLECTION_CAPPED_MAX = 1 << 1, + /** The collection's _id should be autoindexed. */ + MONGO_COLLECTION_AUTO_INDEX_ID = 1 << 2, + /** The collection needs to be pre-allocated. */ + MONGO_COLLECTION_SIZED = 1 << 3 + }; + +/** Create a new MongoDB collection. + * + * This command can be used to explicitly create a MongoDB collection, + * with various parameters pre-set. + * + * @param conn is the connection to work with. + * @param db is the name of the database. + * @param coll is the name of the collection to create. + * @param flags is a collection of flags for the collection. Any + * combination of MONGO_COLLECTION_DEFAULTS, MONGO_COLLECTION_CAPPED, + * MONGO_COLLECTION_CAPPED_MAX, MONGO_COLLECTION_SIZED and + * MONGO_COLLECTION_AUTO_INDEX_ID is acceptable. + * + * @tparam size @b MUST be a 64-bit integer, if + * MONGO_COLLECTION_CAPPED or MONGO_COLLECTION_SIZED is specified, and + * it must follow the @a flags parameter. + * @tparam max @b MUST be a 64-bit integer, if + * MONGO_COLLECTION_CAPPED_MAX is specified, and must follow @a size. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_create (mongo_sync_connection *conn, + const gchar *db, const gchar *coll, + gint flags, ...); + +/** Check whether a collection exists in MongoDB. + * + * @param conn is the connection to work with. + * @param db is the database to search for the collection. + * @param coll is the collection to search for. + * + * @returns A newly allocated BSON object, with data about the + * collection on success, NULL otherwise. It is the responsiblity of + * the caller to free the BSON object once it is no longer needed. + */ +bson *mongo_sync_cmd_exists (mongo_sync_connection *conn, + const gchar *db, const gchar *coll); + +/** Send a drop() command to MongoDB. + * + * With this command, one can easily drop a collection. + * + * @param conn is the connection to work with. + * @param db is the name of the database. + * @param coll is the name of the collection to drop. + * + * @returns TRUE if the collection was dropped, FALSE otherwise. + */ +gboolean mongo_sync_cmd_drop (mongo_sync_connection *conn, + const gchar *db, const gchar *coll); + +/** Get the last error from MongoDB. + * + * Retrieves the last error from MongoDB. + * + * @param conn is the connection to work with. + * @param db is the name of the database. + * @param error is a pointer to a string variable that will hold the + * error message. + * + * @returns TRUE if the error was succesfully retrieved, FALSE + * otherwise. The output variable @a error is only set if the function + * is returning TRUE. + */ +gboolean mongo_sync_cmd_get_last_error (mongo_sync_connection *conn, + const gchar *db, gchar **error); + +/** Get the last error from MongoDB. + * + * Retrieves the last error from MongoDB. + * + * @param conn is the connection to work with. + * @param db is the name of the database. + * @param error is a pointer to a BSON variable that will hold the + * error message. + * + * @returns TRUE if the error was succesfully retrieved, FALSE + * otherwise. The output variable @a error is only set if the function + * is returning TRUE. + */ +gboolean mongo_sync_cmd_get_last_error_full (mongo_sync_connection *conn, + const gchar *db, bson **error); + +/** Reset the last error variable in MongoDB. + * + * @param conn is the connection to work with. + * @param db is the name of the database. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_reset_error (mongo_sync_connection *conn, + const gchar *db); + +/** Check whether the current node is the master. + * + * @param conn is the connection to work with. + * + * @returns TRUE if it is master, FALSE otherwise and on errors. + */ +gboolean mongo_sync_cmd_is_master (mongo_sync_connection *conn); + +/** Send a PING command to MongoDB. + * + * @param conn is the connection to work with. + * + * @returns TRUE if the connection is alive and kicking, FALSE + * otherwise. + */ +gboolean mongo_sync_cmd_ping (mongo_sync_connection *conn); + +/** Add a user to MongoDB. + * + * @param conn is the connection to work with. + * @param db is the database to add the user to. + * @param user is the user to add. + * @param pw is the password. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_user_add (mongo_sync_connection *conn, + const gchar *db, + const gchar *user, + const gchar *pw); + +/** Add a user to MongoDB, with roles. + * + * @param conn is the connection to work with. + * @param db is the database to add the user to. + * @param user is the user to add. + * @param pw is the password. + * @param roles is a BSON array containing the roles for the user. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_user_add_with_roles (mongo_sync_connection *conn, + const gchar *db, + const gchar *user, + const gchar *pw, + const bson *roles); + +/** Remove a user from MongoDB. + * + * @param conn is the connection to work with. + * @param db is the database to remove the user from. + * @param user is the username to remove. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_user_remove (mongo_sync_connection *conn, + const gchar *db, + const gchar *user); + +/** Authenticate a user with MongoDB. + * + * @param conn is the connection to work with. + * @param db is the database to authenticate against. + * @param user is the username. + * @param pw is the password. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_authenticate (mongo_sync_connection *conn, + const gchar *db, + const gchar *user, + const gchar *pw); + +/** Flags that can be set at index creation. */ +enum + { + MONGO_INDEX_UNIQUE = 0x01, /**< Create a unique index. */ + MONGO_INDEX_DROP_DUPS = 0x02, /**< Drop duplicate entries when + creating the indexes. */ + MONGO_INDEX_BACKGROUND = 0x04, /**< Create indexes in the + background. */ + MONGO_INDEX_SPARSE = 0x08 /**< Create sparse indexes. */ + }; + +/** Create an index. + * + * @param conn is the connection to work with. + * @param ns is the namespace to create indexes for. + * @param key is the key pattern to base indexes on. + * @param options are the index options. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_index_create (mongo_sync_connection *conn, + const gchar *ns, + const bson *key, + gint options); + +/** Drop an index. + * + * @param conn is the connection to work with. + * @param ns is the namespace to drop the index from. + * @param key is the index pattern to drop. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_index_drop (mongo_sync_connection *conn, + const gchar *ns, + const bson *key); + +/** Drop all indexes from a namespace. + * + * @param conn is the connection to work with. + * @param ns is the namespace whose indexes to drop. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_cmd_index_drop_all (mongo_sync_connection *conn, + const gchar *ns); + +/** Get the last error message on a connection + * + * @param conn is the connection + * + * @returns pointer to the error message, if exists, NULL otherwise + */ +const gchar *mongo_sync_conn_get_last_error (mongo_sync_connection *conn); + +/** @} */ + +G_END_DECLS + +#endif diff --git a/src/mongo-utils.c b/src/mongo-utils.c new file mode 100644 index 0000000..6676aa9 --- /dev/null +++ b/src/mongo-utils.c @@ -0,0 +1,197 @@ +/* mongo-utils.c - libmongo-client utility functions + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-utils.c + * Implementation for various libmongo-client helper functions. + */ + +#include <glib.h> +#include <glib/gprintf.h> + +#include <sys/types.h> +#include <string.h> +#include <stdlib.h> +#include <time.h> +#include <unistd.h> +#include <errno.h> + +#include "mongo-client.h" + +static guint32 machine_id = 0; +static gint16 pid = 0; + +void +mongo_util_oid_init (gint32 mid) +{ + pid_t p = getpid (); + + if (mid == 0) + { + srand (time (NULL)); + machine_id = rand (); + } + else + machine_id = mid; + + /* + * If our pid has more than 16 bits, let half the bits modulate the + * machine_id. + */ + if (sizeof (pid_t) > 2) + { + machine_id ^= pid >> 16; + } + pid = (gint16)p; +} + +guint8 * +mongo_util_oid_new_with_time (gint32 ts, gint32 seq) +{ + guint8 *oid; + gint32 t = GINT32_TO_BE (ts); + gint32 tmp = GINT32_TO_BE (seq); + + if (machine_id == 0 || pid == 0) + return NULL; + + oid = (guint8 *)g_new0 (guint8, 12); + + /* Sequence number, last 3 bytes + * For simplicity's sake, we put this in first, and overwrite the + * first byte later. + */ + memcpy (oid + 4 + 2 + 2, &tmp, 4); + /* First four bytes: the time, BE byte order */ + memcpy (oid, &t, 4); + /* Machine ID, byte order doesn't matter, 3 bytes */ + memcpy (oid + 4, &machine_id, 3); + /* PID, byte order doesn't matter, 2 bytes */ + memcpy (oid + 4 + 3, &pid, 2); + + return oid; +} + +guint8 * +mongo_util_oid_new (gint32 seq) +{ + return mongo_util_oid_new_with_time (time (NULL), seq); +} + +gchar * +mongo_util_oid_as_string (const guint8 *oid) +{ + gchar *str; + gint j; + + if (!oid) + return NULL; + + str = g_new (gchar, 26); + for (j = 0; j < 12; j++) + g_sprintf (&str[j * 2], "%02x", oid[j]); + str[25] = 0; + return str; +} + +gboolean +mongo_util_parse_addr (const gchar *addr, gchar **host, gint *port) +{ + gchar *port_s, *ep; + glong p; + + if (!addr || !host || !port) + { + if (host) + *host = NULL; + if (port) + *port = -1; + errno = EINVAL; + return FALSE; + } + + /* Check for IPv6 literal */ + if (addr[0] == '[') + { + /* Host is everything between [] */ + port_s = strchr (addr + 1, ']'); + if (!port_s || port_s - addr == 1) + { + *host = NULL; + *port = -1; + errno = EINVAL; + return FALSE; + } + *host = g_strndup (addr + 1, port_s - addr - 1); + + port_s += 2; + if (port_s - addr >= (glong)strlen (addr)) + { + *port = -1; + return TRUE; + } + } + else + { + /* Dealing with something that's not an IPv6 literal */ + + /* Split up to host:port */ + port_s = g_strrstr (addr, ":"); + if (!port_s) + { + *host = g_strdup (addr); + *port = -1; + return TRUE; + } + if (port_s == addr) + { + *host = NULL; + *port = -1; + errno = EINVAL; + return FALSE; + } + port_s++; + *host = g_strndup (addr, port_s - addr - 1); + } + + p = strtol (port_s, &ep, 10); + if (p == LONG_MIN || p == LONG_MAX) + { + g_free (*host); + *host = NULL; + *port = -1; + errno = ERANGE; + return FALSE; + } + if ((p != MONGO_CONN_LOCAL) && (p < 0 || p > INT_MAX)) + { + g_free (*host); + *host = NULL; + *port = -1; + errno = ERANGE; + return FALSE; + } + *port = (gint)p; + + if (ep && *ep) + { + g_free (*host); + *host = NULL; + *port = -1; + errno = EINVAL; + return FALSE; + } + return TRUE; +} diff --git a/src/mongo-utils.h b/src/mongo-utils.h new file mode 100644 index 0000000..3c3b5df --- /dev/null +++ b/src/mongo-utils.h @@ -0,0 +1,121 @@ +/* mongo-utils.h - libmongo-client utility functions + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-utils.h + * Public header for various libmongo-client helper functions. + */ + +#ifndef LIBMONGO_CLIENT_UTILS_H +#define LIBMONGO_CLIENT_UTILS_H 1 + +#include <glib.h> + +G_BEGIN_DECLS + +/** @defgroup mongo_util Mongo Utils + * + * Various utility functions related to MongoDB. + * + * @addtogroup mongo_util + * @{ + */ + +/** Intitialize the static ObjectID components. + * + * @param machine_id is the machine id to use, or zero to generate one + * automatically. + * + * This function needs to be called once, before any OIDs are + * generated. It is also a good idea to call it whenever the calling + * program's PID might change. + */ +void mongo_util_oid_init (gint32 machine_id); + +/** Generate a new ObjectID. + * + * Based on the current time, the pre-determined pid and machine ID + * and a supplied sequence number, generate a new ObjectID. + * + * The machine id and the PID are updated whenever + * mongo_util_oid_init() is called. + * + * @param seq is the sequence number to use. + * + * @note The ObjectID has space for only 24 bits of sequence bytes, so + * it should be noted that while @a seq is 32 bits wide, only 24 of + * that will be used. + * + * @returns A newly allocated ObjectID or NULL on error. Freeing it is + * the responsibility of the caller. + */ +guint8 *mongo_util_oid_new (gint32 seq); + +/** Generate a new ObjectID, with a predefined timestamp. + * + * Based on the suppiled time and sequence number, and the + * pre-determined pid and machine ID, generate a new ObjectID. + * + * The machine id and the PID are updated whenever + * mongo_util_oid_init() is called. + * + * @param time is the timestamp to use. + * @param seq is the sequence number to use. + * + * + * @note The ObjectID has space for only 24 bits of sequence bytes, so + * it should be noted that while @a seq is 32 bits wide, only 24 of + * that will be used. + * + * @returns A newly allocated ObjectID or NULL on error. Freeing it is + * the responsibility of the caller. + */ +guint8 *mongo_util_oid_new_with_time (gint32 time, gint32 seq); + +/** Convert an ObjectID to its string representation. + * + * Turns a binary ObjectID into a hexadecimal string. + * + * @param oid is the binary ObjectID. + * + * @returns A newly allocated string representation of the ObjectID, + * or NULL on error. It is the responsibility of the caller to free it + * once it is no longer needed. + */ +gchar *mongo_util_oid_as_string (const guint8 *oid); + +/** Parse a HOST:IP pair. + * + * Given a HOST:IP pair, split it up into a host and a port. IPv6 + * addresses supported, the function cuts at the last ":". + * + * @param addr is the address to split. + * @param host is a pointer to a string where the host part will be + * stored. + * @param port is a pointer to an integer, where the port part will be + * stored. + * + * @returns TRUE on success, FALSE otherwise. The @a host parameter + * will contain a newly allocated string on succes. On failiure, host + * will be set to NULL, and port to -1. + */ +gboolean mongo_util_parse_addr (const gchar *addr, gchar **host, + gint *port); + +/** @} */ + +G_END_DECLS + +#endif diff --git a/src/mongo-wire.c b/src/mongo-wire.c new file mode 100644 index 0000000..cf140a5 --- /dev/null +++ b/src/mongo-wire.c @@ -0,0 +1,645 @@ +/* mongo-wire.c - libmongo-client's MongoDB wire protocoll implementation. + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-wire.c + * Implementation of the MongoDB Wire Protocol. + */ + +#include <glib.h> +#include <string.h> +#include <stdarg.h> +#include <errno.h> + +#include "bson.h" +#include "mongo-wire.h" +#include "libmongo-private.h" + +/** @internal Constant zero value. */ +static const gint32 zero = 0; + +/** @internal A MongoDB command, as it appears on the wire. + * + * For the sake of clarity, and sanity of the library, the header and + * data parts are stored separately, and as such, will need to be sent + * separately aswell. + */ +struct _mongo_packet +{ + mongo_packet_header header; /**< The packet header. */ + guint8 *data; /**< The actual data of the packet. */ + gint32 data_size; /**< Size of the data payload. */ +}; + +/** @internal Mongo command opcodes. */ +typedef enum + { + OP_REPLY = 1, /**< Message is a reply. Only sent by the server. */ + OP_MSG = 1000, /**< Message is a generic message. */ + OP_UPDATE = 2001, /**< Message is an update command. */ + OP_INSERT = 2002, /**< Message is an insert command. */ + OP_RESERVED = 2003, /**< Reserved and unused. */ + OP_QUERY = 2004, /**< Message is a query command. */ + OP_GET_MORE = 2005, /**< Message is a get more command. */ + OP_DELETE = 2006, /**< Message is a delete command. */ + OP_KILL_CURSORS = 2007 /**< Message is a kill cursors command. */ + } mongo_wire_opcode; + +mongo_packet * +mongo_wire_packet_new (void) +{ + mongo_packet *p = (mongo_packet *)g_new0 (mongo_packet, 1); + + p->header.length = GINT32_TO_LE (sizeof (mongo_packet_header)); + return p; +} + +gboolean +mongo_wire_packet_get_header (const mongo_packet *p, + mongo_packet_header *header) +{ + if (!p || !header) + { + errno = EINVAL; + return FALSE; + } + + header->length = GINT32_FROM_LE (p->header.length); + header->id = GINT32_FROM_LE (p->header.id); + header->resp_to = GINT32_FROM_LE (p->header.resp_to); + header->opcode = GINT32_FROM_LE (p->header.opcode); + + return TRUE; +} + +gboolean +mongo_wire_packet_get_header_raw (const mongo_packet *p, + mongo_packet_header *header) +{ + if (!p || !header) + { + errno = EINVAL; + return FALSE; + } + + header->length = p->header.length; + header->id = p->header.id; + header->resp_to = p->header.resp_to; + header->opcode = p->header.opcode; + + return TRUE; +} + +gboolean +mongo_wire_packet_set_header (mongo_packet *p, + const mongo_packet_header *header) +{ + if (!p || !header) + { + errno = EINVAL; + return FALSE; + } + if (GINT32_FROM_LE (header->length) < (gint32)sizeof (mongo_packet_header)) + { + errno = ERANGE; + return FALSE; + } + + p->header.length = GINT32_TO_LE (header->length); + p->header.id = GINT32_TO_LE (header->id); + p->header.resp_to = GINT32_TO_LE (header->resp_to); + p->header.opcode = GINT32_TO_LE (header->opcode); + + p->data_size = header->length - sizeof (mongo_packet_header); + + return TRUE; +} + +gboolean +mongo_wire_packet_set_header_raw (mongo_packet *p, + const mongo_packet_header *header) +{ + if (!p || !header) + { + errno = EINVAL; + return FALSE; + } + + p->header.length = header->length; + p->header.id = header->id; + p->header.resp_to = header->resp_to; + p->header.opcode = header->opcode; + + p->data_size = header->length - sizeof (mongo_packet_header); + + return TRUE; +} + +gint32 +mongo_wire_packet_get_data (const mongo_packet *p, const guint8 **data) +{ + if (!p || !data) + { + errno = EINVAL; + return -1; + } + if (p->data == NULL) + { + errno = EINVAL; + return -1; + } + + *data = (const guint8 *)p->data; + return p->data_size; +} + +gboolean +mongo_wire_packet_set_data (mongo_packet *p, const guint8 *data, gint32 size) +{ + if (!p || !data || size <= 0) + { + errno = EINVAL; + return FALSE; + } + + if (p->data) + g_free (p->data); + p->data = g_malloc (size); + memcpy (p->data, data, size); + + p->data_size = size; + p->header.length = + GINT32_TO_LE (p->data_size + sizeof (mongo_packet_header)); + + return TRUE; +} + +void +mongo_wire_packet_free (mongo_packet *p) +{ + if (!p) + { + errno = EINVAL; + return; + } + + if (p->data) + g_free (p->data); + g_free (p); +} + +mongo_packet * +mongo_wire_cmd_update (gint32 id, const gchar *ns, gint32 flags, + const bson *selector, const bson *update) +{ + mongo_packet *p; + gint32 t_flags = GINT32_TO_LE (flags); + gint nslen; + + if (!ns || !selector || !update) + { + errno = EINVAL; + return NULL; + } + + if (bson_size (selector) < 0 || + bson_size (update) < 0) + { + errno = EINVAL; + return NULL; + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_UPDATE); + + nslen = strlen (ns) + 1; + p->data_size = bson_size (selector) + bson_size (update) + + sizeof (gint32) * 2 + nslen; + + p->data = g_malloc (p->data_size); + + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, nslen); + memcpy (p->data + sizeof (gint32) + nslen, (void *)&t_flags, + sizeof (gint32)); + memcpy (p->data + sizeof (gint32) * 2 + nslen, + bson_data (selector), bson_size (selector)); + memcpy (p->data + sizeof (gint32) * 2 + nslen + bson_size (selector), + bson_data (update), bson_size (update)); + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_insert_n (gint32 id, const gchar *ns, gint32 n, + const bson **docs) +{ + mongo_packet *p; + gint32 pos, dsize = 0; + gint32 i; + + if (!ns || !docs) + { + errno = EINVAL; + return NULL; + } + + if (n <= 0) + { + errno = ERANGE; + return NULL; + } + + for (i = 0; i < n; i++) + { + if (bson_size (docs[i]) <= 0) + { + errno = EINVAL; + return NULL; + } + dsize += bson_size (docs[i]); + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_INSERT); + + pos = sizeof (gint32) + strlen (ns) + 1; + p->data_size = pos + dsize; + p->data = (guint8 *)g_malloc (p->data_size); + + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, strlen (ns) + 1); + + for (i = 0; i < n; i++) + { + memcpy (p->data + pos, bson_data (docs[i]), bson_size (docs[i])); + pos += bson_size (docs[i]); + } + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_insert (gint32 id, const gchar *ns, ...) +{ + mongo_packet *p; + bson **docs, *d; + gint32 n = 0; + va_list ap; + + if (!ns) + { + errno = EINVAL; + return NULL; + } + + docs = (bson **)g_new0 (bson *, 1); + + va_start (ap, ns); + while ((d = (bson *)va_arg (ap, gpointer))) + { + if (bson_size (d) < 0) + { + g_free (docs); + errno = EINVAL; + return NULL; + } + + docs = (bson **)g_renew (bson *, docs, n + 1); + docs[n++] = d; + } + va_end (ap); + + p = mongo_wire_cmd_insert_n (id, ns, n, (const bson **)docs); + g_free (docs); + return p; +} + +mongo_packet * +mongo_wire_cmd_query (gint32 id, const gchar *ns, gint32 flags, + gint32 skip, gint32 ret, const bson *query, + const bson *sel) +{ + mongo_packet *p; + gint32 tmp, nslen; + + if (!ns || !query) + { + errno = EINVAL; + return NULL; + } + + if (bson_size (query) < 0 || (sel && bson_size (sel) < 0)) + { + errno = EINVAL; + return NULL; + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_QUERY); + + nslen = strlen (ns) + 1; + p->data_size = + sizeof (gint32) + nslen + sizeof (gint32) * 2 + bson_size (query); + + if (sel) + p->data_size += bson_size (sel); + p->data = g_malloc (p->data_size); + + tmp = GINT32_TO_LE (flags); + memcpy (p->data, (void *)&tmp, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, nslen); + tmp = GINT32_TO_LE (skip); + memcpy (p->data + sizeof (gint32) + nslen, (void *)&tmp, sizeof (gint32)); + tmp = GINT32_TO_LE (ret); + memcpy (p->data + sizeof (gint32) * 2 + nslen, + (void *)&tmp, sizeof (gint32)); + memcpy (p->data + sizeof (gint32) * 3 + nslen, bson_data (query), + bson_size (query)); + + if (sel) + memcpy (p->data + sizeof (gint32) * 3 + nslen + bson_size (query), + bson_data (sel), bson_size (sel)); + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_get_more (gint32 id, const gchar *ns, + gint32 ret, gint64 cursor_id) +{ + mongo_packet *p; + gint32 t_ret; + gint64 t_cid; + gint32 nslen; + + if (!ns) + { + errno = EINVAL; + return NULL; + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_GET_MORE); + + t_ret = GINT32_TO_LE (ret); + t_cid = GINT64_TO_LE (cursor_id); + + nslen = strlen (ns) + 1; + p->data_size = sizeof (gint32) + nslen + sizeof (gint32) + sizeof (gint64); + p->data = g_malloc (p->data_size); + + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, nslen); + memcpy (p->data + sizeof (gint32) + nslen, (void *)&t_ret, sizeof (gint32)); + memcpy (p->data + sizeof (gint32) * 2 + nslen, + (void *)&t_cid, sizeof (gint64)); + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_delete (gint32 id, const gchar *ns, + gint32 flags, const bson *sel) +{ + mongo_packet *p; + gint32 t_flags, nslen; + + if (!ns || !sel) + { + errno = EINVAL; + return NULL; + } + + if (bson_size (sel) < 0) + { + errno = EINVAL; + return NULL; + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_DELETE); + + nslen = strlen (ns) + 1; + p->data_size = sizeof (gint32) + nslen + sizeof (gint32) + bson_size (sel); + p->data = g_malloc (p->data_size); + + t_flags = GINT32_TO_LE (flags); + + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, nslen); + memcpy (p->data + sizeof (gint32) + nslen, + (void *)&t_flags, sizeof (gint32)); + memcpy (p->data + sizeof (gint32) * 2 + nslen, + bson_data (sel), bson_size (sel)); + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_kill_cursors_va (gint32 id, gint32 n, va_list ap) +{ + mongo_packet *p; + gint32 i, t_n, pos; + gint64 t_cid; + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_KILL_CURSORS); + + p->data_size = sizeof (gint32) + sizeof (gint32) + sizeof (gint64)* n; + p->data = g_malloc (p->data_size); + + t_n = GINT32_TO_LE (n); + pos = sizeof (gint32) * 2; + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)&t_n, sizeof (gint32)); + + for (i = 1; i <= n; i++) + { + t_cid = va_arg (ap, gint64); + t_cid = GINT64_TO_LE (t_cid); + + memcpy (p->data + pos, (void *)&t_cid, sizeof (gint64)); + pos += sizeof (gint64); + } + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_kill_cursors (gint32 id, gint32 n, ...) +{ + va_list ap; + mongo_packet *p; + + if (n <= 0) + { + errno = EINVAL; + return NULL; + } + + va_start (ap, n); + p = mongo_wire_cmd_kill_cursors_va (id, n, ap); + va_end (ap); + + return p; +} + +mongo_packet * +mongo_wire_cmd_custom (gint32 id, const gchar *db, gint32 flags, + const bson *command) +{ + mongo_packet *p; + gchar *ns; + bson *empty; + + if (!db || !command) + { + errno = EINVAL; + return NULL; + } + + if (bson_size (command) < 0) + { + errno = EINVAL; + return NULL; + } + + ns = g_strconcat (db, ".$cmd", NULL); + + empty = bson_new (); + bson_finish (empty); + + p = mongo_wire_cmd_query (id, ns, flags, 0, 1, command, empty); + g_free (ns); + bson_free (empty); + return p; +} + +gboolean +mongo_wire_reply_packet_get_header (const mongo_packet *p, + mongo_reply_packet_header *hdr) +{ + mongo_reply_packet_header h; + const guint8 *data; + + if (!p || !hdr) + { + errno = EINVAL; + return FALSE; + } + + if (p->header.opcode != OP_REPLY) + { + errno = EPROTO; + return FALSE; + } + + if (mongo_wire_packet_get_data (p, &data) == -1) + return FALSE; + + memcpy (&h, data, sizeof (mongo_reply_packet_header)); + + hdr->flags = GINT32_FROM_LE (h.flags); + hdr->cursor_id = GINT64_FROM_LE (h.cursor_id); + hdr->start = GINT32_FROM_LE (h.start); + hdr->returned = GINT32_FROM_LE (h.returned); + + return TRUE; +} + +gboolean +mongo_wire_reply_packet_get_data (const mongo_packet *p, + const guint8 **data) +{ + const guint8 *d; + + if (!p || !data) + { + errno = EINVAL; + return FALSE; + } + + if (p->header.opcode != OP_REPLY) + { + errno = EPROTO; + return FALSE; + } + + if (mongo_wire_packet_get_data (p, &d) == -1) + return FALSE; + + *data = d + sizeof (mongo_reply_packet_header); + return TRUE; +} + +gboolean +mongo_wire_reply_packet_get_nth_document (const mongo_packet *p, + gint32 n, + bson **doc) +{ + const guint8 *d; + mongo_reply_packet_header h; + gint32 i; + gint32 pos = 0; + + if (!p || !doc || n <= 0) + { + errno = EINVAL; + return FALSE; + } + + if (p->header.opcode != OP_REPLY) + { + errno = EPROTO; + return FALSE; + } + + if (!mongo_wire_reply_packet_get_header (p, &h)) + return FALSE; + + if (h.returned < n) + { + errno = ERANGE; + return FALSE; + } + + if (!mongo_wire_reply_packet_get_data (p, &d)) + return FALSE; + + for (i = 1; i < n; i++) + pos += bson_stream_doc_size (d, pos); + + *doc = bson_new_from_data (d + pos, bson_stream_doc_size (d, pos) - 1); + return TRUE; +} diff --git a/src/mongo-wire.h b/src/mongo-wire.h new file mode 100644 index 0000000..081a3e2 --- /dev/null +++ b/src/mongo-wire.h @@ -0,0 +1,433 @@ +/* mongo-wire.h - libmongo-client's MongoDB wire protocoll implementation. + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-wire.h + * MongoDB Wire Protocol API public header. + */ + +#ifndef LIBMONGO_CLIENT_MONGO_WIRE_H +#define LIBMONGO_CLIENT_MONGO_WIRE_H 1 + +#include <glib.h> + +#include <bson.h> + +G_BEGIN_DECLS + +/** @defgroup mongo_wire Mongo Wire Protocol + * + * The structures and functions within this module implement the + * MongoDB wire protocol: functions to assemble various commands into + * binary blobs that can be sent over the wire. + * + * @see mongo_client + * + * @addtogroup mongo_wire + * @{ + */ + +/** @defgroup mongo_wire_packet Packets + * + * @addtogroup mongo_wire_packet + * @{ + */ + +/** Mongo packet header. + * + * Every mongo packet has a header like this. Normally, one does not + * need to touch it, though. + */ +typedef struct +{ + gint32 length; /**< Full length of the packet, including the + header. */ + gint32 id; /**< Sequence ID, used when MongoDB responds to a + command. */ + gint32 resp_to; /**< ID the response is an answer to. Only sent by + the MongoDB server, never set on client-side. */ + gint32 opcode; /**< The opcode of the command. @see + mongo_wire_opcode. <*/ +} mongo_packet_header; + +/** An opaque Mongo Packet on the wire. + * + * This structure contains the binary data that can be written + * straight to the wire. + */ +typedef struct _mongo_packet mongo_packet; + +/** Create an empty packet. + * + * Creates an empty packet to be filled in later with + * mongo_wire_packet_set_header() and mongo_packet_set_data(). + * + * @returns A newly allocated packet, or NULL on error. + */ +mongo_packet *mongo_wire_packet_new (void); + +/** Get the header data of a packet. + * + * Retrieve the mongo packet's header data. + * + * @param p is the packet which header we seek. + * @param header is a pointer to a variable which will hold the data. + * + * @note Allocating the @a header is the responsibility of the caller. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_wire_packet_get_header (const mongo_packet *p, + mongo_packet_header *header); + +/** Set the header data of a packet. + * + * Override the mongo packet's header data. + * + * @note No sanity checks are done, use this function with great care. + * + * @param p is the packet whose header we want to override. + * @param header is the header structure to use. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_wire_packet_set_header (mongo_packet *p, + const mongo_packet_header *header); + +/** Get the data part of a packet. + * + * Retrieve the raw binary blob of the mongo packet's data. + * + * @param p is the packet which header we seek. + * @param data is a pointer to a variable which will hold the data. + * + * @note The @a data parameter will point to an internal structure, + * which shall not be freed or written to. + * + * @returns The size of the data, or -1 on error. + */ +gint32 mongo_wire_packet_get_data (const mongo_packet *p, const guint8 **data); + +/** Set the data part of a packet. + * + * Overrides the data part of a packet, adjusting the packet length in + * the header too. + * + * @note No sanity checks are performed on the data, it is the + * caller's responsibility to supply valid information. + * + * @param p is the packet whose data is to be set. + * @param data is the data to set. + * @param size is the size of the data. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_wire_packet_set_data (mongo_packet *p, const guint8 *data, + gint32 size); + +/** Free up a mongo packet. + * + * @param p is the packet to free. + * + * @note The packet shall not be used afterwards. + */ +void mongo_wire_packet_free (mongo_packet *p); + +/** @} */ + +/** @defgroup mongo_wire_reply Reply handling + * + * @addtogroup mongo_wire_reply + * @{ + */ + +/** Flags the server can set in replies. */ +enum + { + /** Set when get_more is called but the cursor id is invalid. */ + MONGO_REPLY_FLAG_NO_CURSOR = 0x1, + /** Set when the query failed. */ + MONGO_REPLY_FLAG_QUERY_FAIL = 0x2, + /** Set when the server suppots the AwaitData query option. + * If not set, the client should sleep a little between get_more + * calls on a tailable cursor. On Mongo >= 1.6, this flag is + * always set. + */ + MONGO_REPLY_FLAG_AWAITCAPABLE = 0x8 + }; + +/** Mongo reply packet header. + */ +#pragma pack(1) +typedef struct +{ + gint32 flags; /**< Response flags. */ + gint64 cursor_id; /**< Cursor ID, in case the client needs to do + get_more requests. */ + gint32 start; /**< Starting position of the reply within the + cursor. */ + gint32 returned; /**< Number of documents returned in the reply. */ +} mongo_reply_packet_header; +#pragma pack() + +/** Get the header of a reply packet. + * + * @param p is the packet to retrieve the reply header from. + * @param hdr is a pointer to a variable where the reply header will + * be stored. + * + * @note It is the responsibility of the caller to allocate space for + * the header. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_wire_reply_packet_get_header (const mongo_packet *p, + mongo_reply_packet_header *hdr); + +/** Get the full data part of a reply packet. + * + * The result will include the full, unparsed data part of the reply. + * + * @param p is the packet to retrieve the data from. + * @param data is a pointer to a variable where the replys data can be + * stored. + * + * @note The @a data variable will point to an internal structure, + * which must not be freed or modified. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_wire_reply_packet_get_data (const mongo_packet *p, + const guint8 **data); + +/** Get the Nth document from a reply packet. + * + * @param p is the packet to retrieve a document from. + * @param n is the number of the document to retrieve. + * @param doc is a pointer to a variable to hold the BSON document. + * + * @note The @a doc variable will be a newly allocated object, it is + * the responsibility of the caller to free it once it is not needed + * anymore. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_wire_reply_packet_get_nth_document (const mongo_packet *p, + gint32 n, + bson **doc); + +/** @}*/ + +/** @defgroup mongo_wire_cmd Commands + * + * Each command has an @a id parameter, which can be used to track + * replies to various commands. It is the responsibility of the caller + * to keep track of IDs. + * + * @addtogroup mongo_wire_cmd + * @{ + */ + +/** Flags available for the update command. + * @see mongo_wire_cmd_update(). + */ +enum + { + /** When set, inserts if no matching document was found. */ + MONGO_WIRE_FLAG_UPDATE_UPSERT = 0x1, + /** When set, all matching documents will be updated, not just + the first. */ + MONGO_WIRE_FLAG_UPDATE_MULTI = 0x2 + }; + +/** Construct an update command. + * + * @param id is the sequence id. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @param flags are the flags for the update command. Available flags + * are #MONGO_WIRE_FLAG_UPDATE_UPSERT and + * #MONGO_WIRE_FLAG_UPDATE_MULTI. + * @param selector is the BSON document that will act as the selector. + * @param update is the BSON document that contains the updated values. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_update (gint32 id, const gchar *ns, + gint32 flags, const bson *selector, + const bson *update); + +/** Construct an insert command. + * + * @param id is the sequence id. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @tparam docs are the BSON documents to insert. One must close the + * list with a NULL value. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_insert (gint32 id, const gchar *ns, ...) + G_GNUC_NULL_TERMINATED; + +/** Construct an insert command with N documents. + * + * @param id is the sequence id. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @param n is the number of documents to insert. + * @param docs is the array containing the bson documents to insert. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_insert_n (gint32 id, const gchar *ns, gint32 n, + const bson **docs); + +/** Flags available for the query command. + * @see mongo_wire_cmd_query(). + */ +enum + { + /** Set the TailableCursor flag on the query. */ + MONGO_WIRE_FLAG_QUERY_TAILABLE_CURSOR = 1 << 1, + /** Allow queries made against a replica slave. */ + MONGO_WIRE_FLAG_QUERY_SLAVE_OK = 1 << 2, + /** Disable cursor timeout. */ + MONGO_WIRE_FLAG_QUERY_NO_CURSOR_TIMEOUT = 1 << 4, + /** Block if at the end of the data block, awaiting data. + * Use only with #MONGO_WIRE_FLAG_QUERY_TAILABLE_CURSOR! + */ + MONGO_WIRE_FLAG_QUERY_AWAIT_DATA = 1 << 5, + /** Stream the data down full blast in multiple packages. + * When set, the client is not allowed not to read all the data, + * unless it closes connection. + */ + MONGO_WIRE_FLAG_QUERY_EXHAUST = 1 << 6, + /** Allow partial results in a sharded environment. + * In case one or more required shards are down, with this flag + * set, partial results will be returned instead of failing. + */ + MONGO_WIRE_FLAG_QUERY_PARTIAL_RESULTS = 1 << 7 + }; + +/** Construct a query command. + * + * @param id is the sequence id. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @param flags are the query options. Available flags are: + * #MONGO_WIRE_FLAG_QUERY_TAILABLE_CURSOR, + * #MONGO_WIRE_FLAG_QUERY_SLAVE_OK, + * #MONGO_WIRE_FLAG_QUERY_NO_CURSOR_TIMEOUT, + * #MONGO_WIRE_FLAG_QUERY_AWAIT_DATA, #MONGO_WIRE_FLAG_QUERY_EXHAUST. + * @param skip is the number of documents to skip. + * @param ret is the number of documents to return. + * @param query is the query BSON object. + * @param sel is the (optional) selector BSON object indicating the + * fields to return. Passing NULL will return all fields. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_query (gint32 id, const gchar *ns, gint32 flags, + gint32 skip, gint32 ret, const bson *query, + const bson *sel); + +/** Construct a get more command. + * + * @param id is the sequence id. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @param ret is the number of documents to return. + * @param cursor_id is the ID of the cursor to use. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_get_more (gint32 id, const gchar *ns, + gint32 ret, gint64 cursor_id); + +/** Flags available for the delete command. + */ +enum + { + /** Only remove the first match. */ + MONGO_WIRE_FLAG_DELETE_SINGLE = 0x1 + }; + +/** Construct a delete command. + * + * @param id is the sequence id. + * @param ns is the namespace, the database and collection name + * concatenated, and separated with a single dot. + * @param flags are the delete options. The only available flag is + * MONGO_WIRE_FLAG_DELETE_SINGLE. + * @param sel is the BSON object to use as a selector. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_delete (gint32 id, const gchar *ns, + gint32 flags, const bson *sel); + +/** Construct a kill cursors command. + * + * @param id is the sequence id. + * @param n is the number of cursors to delete. + * @tparam cursor_ids are the ids of the cursors to delete. + * + * @note One must supply exaclty @a n number of cursor IDs. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_kill_cursors (gint32 id, gint32 n, ...); + +/** Construct a custom command. + * + * Custom commands are queries run in the db.$cmd namespace. The + * commands themselves are queries, and as such, BSON objects. + * + * @param id is the sequence id. + * @param db is the database in which the command shall be run. + * @param flags are the query flags. See mongo_wire_cmd_query() for a + * list. + * @param command is the BSON object representing the command. + * + * @returns A newly allocated packet, or NULL on error. It is the + * responsibility of the caller to free the packet once it is not used + * anymore. + */ +mongo_packet *mongo_wire_cmd_custom (gint32 id, const gchar *db, + gint32 flags, + const bson *command); + +/** @} */ + +/** @} */ + +G_END_DECLS + +#endif diff --git a/src/mongo.h b/src/mongo.h new file mode 100644 index 0000000..49f0187 --- /dev/null +++ b/src/mongo.h @@ -0,0 +1,49 @@ +/* mongo.h - libmongo-client general header + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo.h + * libmongo-client meta-header. + * + * This header includes all the rest, it is advised for applications + * to include this header, and this header only. + */ + +#include <bson.h> +#include <mongo-wire.h> +#include <mongo-client.h> +#include <mongo-utils.h> +#include <mongo-sync.h> +#include <mongo-sync-cursor.h> +#include <mongo-sync-pool.h> +#include <sync-gridfs.h> +#include <sync-gridfs-chunk.h> +#include <sync-gridfs-stream.h> + +/** @mainpage libmongo-client + * + * @section Introduction + * + * libmongo-client is an alternative MongoDB driver for the C + * language, with clarity, correctness and completeness in mind. + * + * Contents: + * @htmlonly + * <ul> + * <li><a href="modules.html"><b>API Documentation</b></a></li> + * <li><a href="tutorial.html"><b>Tutorial</b></a></li> + * </ul> + * @endhtmlonly + */ diff --git a/src/sync-gridfs-chunk.c b/src/sync-gridfs-chunk.c new file mode 100644 index 0000000..9bcc62e --- /dev/null +++ b/src/sync-gridfs-chunk.c @@ -0,0 +1,329 @@ +/* sync-gridfs-chunk.c - libmongo-client GridFS chunk access implementation + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/sync-gridfs-chunk.c + * MongoDB GridFS chunk access implementation. + */ + +#include "sync-gridfs-chunk.h" +#include "libmongo-private.h" + +#include <unistd.h> +#include <errno.h> + +void +mongo_sync_gridfs_chunked_file_free (mongo_sync_gridfs_chunked_file *gfile) +{ + if (!gfile) + { + errno = ENOTCONN; + return; + } + bson_free (gfile->meta.metadata); + g_free (gfile); + + errno = 0; +} + +mongo_sync_gridfs_chunked_file * +mongo_sync_gridfs_chunked_find (mongo_sync_gridfs *gfs, const bson *query) +{ + mongo_sync_gridfs_chunked_file *f; + mongo_packet *p; + bson_cursor *c; + + if (!gfs) + { + errno = ENOTCONN; + return NULL; + } + if (!query) + { + errno = EINVAL; + return NULL; + } + + p = mongo_sync_cmd_query (gfs->conn, gfs->ns.files, 0, 0, 1, query, NULL); + if (!p) + return NULL; + + f = g_new0 (mongo_sync_gridfs_chunked_file, 1); + f->gfs = gfs; + f->meta.type = LMC_GRIDFS_FILE_CHUNKED; + + mongo_wire_reply_packet_get_nth_document (p, 1, &f->meta.metadata); + bson_finish (f->meta.metadata); + mongo_wire_packet_free (p); + + c = bson_find (f->meta.metadata, "_id"); + if (!bson_cursor_get_oid (c, &f->meta.oid)) + { + mongo_sync_gridfs_chunked_file_free (f); + bson_cursor_free (c); + errno = EPROTO; + return NULL; + } + + bson_cursor_find (c, "length"); + bson_cursor_get_int64 (c, &f->meta.length); + + if (f->meta.length == 0) + { + gint32 i = 0; + + bson_cursor_get_int32 (c, &i); + f->meta.length = i; + } + + bson_cursor_find (c, "chunkSize"); + bson_cursor_get_int32 (c, &f->meta.chunk_size); + + if (f->meta.length == 0 || f->meta.chunk_size == 0) + { + bson_cursor_free (c); + mongo_sync_gridfs_chunked_file_free (f); + errno = EPROTO; + return NULL; + } + + bson_cursor_find (c, "uploadDate"); + if (!bson_cursor_get_utc_datetime (c, &f->meta.date)) + { + mongo_sync_gridfs_chunked_file_free (f); + bson_cursor_free (c); + errno = EPROTO; + return NULL; + } + + bson_cursor_find (c, "md5"); + if (!bson_cursor_get_string (c, &f->meta.md5)) + { + mongo_sync_gridfs_chunked_file_free (f); + bson_cursor_free (c); + errno = EPROTO; + return NULL; + } + bson_cursor_free (c); + + return f; +} + +mongo_sync_cursor * +mongo_sync_gridfs_chunked_file_cursor_new (mongo_sync_gridfs_chunked_file *gfile, + gint start, gint num) +{ + bson *q; + mongo_sync_cursor *cursor; + mongo_packet *p; + + if (!gfile) + { + errno = ENOTCONN; + return NULL; + } + if (start < 0 || num < 0) + { + errno = EINVAL; + return NULL; + } + + q = bson_build_full (BSON_TYPE_DOCUMENT, "$query", TRUE, + bson_build (BSON_TYPE_OID, "files_id", gfile->meta.oid, BSON_TYPE_NONE), + BSON_TYPE_DOCUMENT, "$orderby", TRUE, + bson_build (BSON_TYPE_INT32, "n", 1, BSON_TYPE_NONE), + BSON_TYPE_NONE); + bson_finish (q); + + p = mongo_sync_cmd_query (gfile->gfs->conn, gfile->gfs->ns.chunks, 0, + start, num, q, NULL); + cursor = mongo_sync_cursor_new (gfile->gfs->conn, + gfile->gfs->ns.chunks, p); + bson_free (q); + + return cursor; +} + +guint8 * +mongo_sync_gridfs_chunked_file_cursor_get_chunk (mongo_sync_cursor *cursor, + gint32 *size) +{ + bson *b; + bson_cursor *c; + const guint8 *d; + guint8 *data; + gint32 s; + bson_binary_subtype sub = BSON_BINARY_SUBTYPE_USER_DEFINED; + gboolean r; + + if (!cursor) + { + errno = ENOTCONN; + return NULL; + } + + b = mongo_sync_cursor_get_data (cursor); + c = bson_find (b, "data"); + r = bson_cursor_get_binary (c, &sub, &d, &s); + if (!r || (sub != BSON_BINARY_SUBTYPE_GENERIC && + sub != BSON_BINARY_SUBTYPE_BINARY)) + { + bson_cursor_free (c); + errno = EPROTO; + return NULL; + } + bson_cursor_free (c); + + if (sub == BSON_BINARY_SUBTYPE_BINARY) + { + s -= 4; + data = g_malloc (s); + memcpy (data, d + 4, s); + } + else + { + data = g_malloc (s); + memcpy (data, d, s); + } + + if (size) + *size = s; + + bson_free (b); + return data; +} + +mongo_sync_gridfs_chunked_file * +mongo_sync_gridfs_chunked_file_new_from_buffer (mongo_sync_gridfs *gfs, + const bson *metadata, + const guint8 *data, + gint64 size) +{ + mongo_sync_gridfs_chunked_file *gfile; + bson *meta; + bson_cursor *c; + guint8 *oid; + gint64 pos = 0, chunk_n = 0, upload_date; + GTimeVal tv; + GChecksum *chk; + + if (!gfs) + { + errno = ENOTCONN; + return NULL; + } + if (!data || size <= 0) + { + errno = EINVAL; + return NULL; + } + + oid = mongo_util_oid_new + (mongo_connection_get_requestid ((mongo_connection *)gfs->conn)); + if (!oid) + { + errno = EFAULT; + return NULL; + } + + chk = g_checksum_new (G_CHECKSUM_MD5); + + /* Insert chunks first */ + while (pos < size) + { + bson *chunk; + gint32 csize = gfs->chunk_size; + + if (size - pos < csize) + csize = size - pos; + + chunk = bson_new_sized (gfs->chunk_size + 128); + bson_append_oid (chunk, "files_id", oid); + bson_append_int64 (chunk, "n", (gint64)chunk_n); + bson_append_binary (chunk, "data", BSON_BINARY_SUBTYPE_GENERIC, + data + pos, csize); + bson_finish (chunk); + + g_checksum_update (chk, data + pos, csize); + + if (!mongo_sync_cmd_insert (gfs->conn, gfs->ns.chunks, chunk, NULL)) + { + int e = errno; + + bson_free (chunk); + g_free (oid); + errno = e; + return NULL; + } + bson_free (chunk); + + pos += csize; + chunk_n++; + } + + /* Insert metadata */ + if (metadata) + meta = bson_new_from_data (bson_data (metadata), + bson_size (metadata) - 1); + else + meta = bson_new_sized (128); + + g_get_current_time (&tv); + upload_date = (((gint64) tv.tv_sec) * 1000) + (gint64)(tv.tv_usec / 1000); + + bson_append_int64 (meta, "length", size); + bson_append_int32 (meta, "chunkSize", gfs->chunk_size); + bson_append_utc_datetime (meta, "uploadDate", upload_date); + bson_append_string (meta, "md5", g_checksum_get_string (chk), -1); + bson_append_oid (meta, "_id", oid); + bson_finish (meta); + + g_checksum_free (chk); + + if (!mongo_sync_cmd_insert (gfs->conn, gfs->ns.files, meta, NULL)) + { + int e = errno; + + bson_free (meta); + g_free (oid); + errno = e; + return NULL; + } + + /* Return the resulting gfile. + * No need to check cursor errors here, as we constructed the BSON + * just above, and all the fields exist and have the appropriate + * types. + */ + gfile = g_new0 (mongo_sync_gridfs_chunked_file, 1); + gfile->gfs = gfs; + + gfile->meta.metadata = meta; + gfile->meta.length = size; + gfile->meta.chunk_size = gfs->chunk_size; + gfile->meta.date = 0; + gfile->meta.type = LMC_GRIDFS_FILE_CHUNKED; + + c = bson_find (meta, "_id"); + bson_cursor_get_oid (c, &gfile->meta.oid); + + bson_cursor_find (c, "md5"); + bson_cursor_get_string (c, &gfile->meta.md5); + bson_cursor_free (c); + + g_free (oid); + + return gfile; +} diff --git a/src/sync-gridfs-chunk.h b/src/sync-gridfs-chunk.h new file mode 100644 index 0000000..e567328 --- /dev/null +++ b/src/sync-gridfs-chunk.h @@ -0,0 +1,134 @@ +/* sync-gridfs-chunk.h - libmong-client GridFS chunk API + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/sync-gridfs-chunk.h + * MongoDB GridFS Chunk API. + * + * @addtogroup mongo_sync_gridfs_api + * @{ + */ + +#ifndef LIBMONGO_SYNC_GRIDFS_CHUNK_H +#define LIBMONGO_SYNC_GRIDFS_CHUNK_H 1 + +#include <sync-gridfs.h> +#include <glib.h> + +G_BEGIN_DECLS + +/** @defgroup mongo_sync_gridfs_chunk_api Mongo GridFS Chunk API + * + * This submodule provides chunk-based access to GridFS + * files. Chunk-based access has the advantage of being reasonably + * lightweight and fast, and the disadvantage of making it harder to + * do arbitrary reads or multi-part writes. + * + * It's best used when the whole file needs to be retrieved, or when + * uploading files that either fit in a buffer, or can be mmapped. + * + * @addtogroup mongo_sync_gridfs_chunk_api + * @{ + */ + +/** Opaque GridFS chunked file object. */ +typedef struct _mongo_sync_gridfs_chunked_file mongo_sync_gridfs_chunked_file; + +/** Find a file on GridFS. + * + * Finds a file on GridFS, based on a custom query. + * + * @param gfs is the GridFS to find the file in. + * @param query is the custom query based on which the file shall be + * sought. + * + * @returns A newly allocated chunked file object, or NULL on + * error. It is the responsibility of the caller to free the returned + * object once it is no longer needed. + */ +mongo_sync_gridfs_chunked_file *mongo_sync_gridfs_chunked_find (mongo_sync_gridfs *gfs, + const bson *query); + +/** Upload a file to GridFS from a buffer. + * + * Create a new file on GridFS from a buffer, using custom meta-data. + * + * @param gfs is the GridFS to create the file on. + * @param metadata is the (optional) file metadata. + * @param data is the data to store on GridFS. + * @param size is the size of the data. + * + * @returns A newly allocated file object, or NULL on error. It is the + * responsibility of the caller to free the returned object once it is + * no longer needed. + * + * @note The metadata MUST NOT contain any of the required GridFS + * metadata fields (_id, length, chunkSize, uploadDate, md5), + * otherwise a conflict will occurr, against which the function does + * not guard by design. + */ +mongo_sync_gridfs_chunked_file *mongo_sync_gridfs_chunked_file_new_from_buffer (mongo_sync_gridfs *gfs, + const bson *metadata, + const guint8 *data, + gint64 size); +/** Free a GridFS chunked file object. + * + * @param gfile is the file object to free. + */ +void mongo_sync_gridfs_chunked_file_free (mongo_sync_gridfs_chunked_file *gfile); + +/* Data access */ + +/** Create a cursor for a GridFS chunked file. + * + * The cursor can be used (via + * mongo_sync_gridfs_file_cursor_get_chunk()) to retrieve a GridFS + * file chunk by chunk. + * + * @param gfile is the GridFS chunked file to work with. + * @param start is the starting chunk. + * @param num is the total number of chunks to make a cursor for. + * + * @returns A newly allocated cursor object, or NULL on error. It is + * the responsibility of the caller to free the cursor once it is no + * longer needed. + */ +mongo_sync_cursor *mongo_sync_gridfs_chunked_file_cursor_new (mongo_sync_gridfs_chunked_file *gfile, + gint start, gint num); + +/** Get the data of a GridFS file chunk, via a cursor. + * + * Once we have a cursor, it can be iterated over with + * mongo_sync_cursor_next(), and its data can be conveniently accessed + * with this function. + * + * @param cursor is the cursor object to work with. + * @param size is a pointer to a variable where the chunk's actual + * size can be stored. + * + * @returns A pointer to newly allocated memory that holds the current + * chunk's data, or NULL on error. It is the responsibility of the + * caller to free this once it is no longer needed. + */ +guint8 *mongo_sync_gridfs_chunked_file_cursor_get_chunk (mongo_sync_cursor *cursor, + gint32 *size); + +/** @} */ + +G_END_DECLS + +/** @} */ + +#endif diff --git a/src/sync-gridfs-stream.c b/src/sync-gridfs-stream.c new file mode 100644 index 0000000..c9b11ed --- /dev/null +++ b/src/sync-gridfs-stream.c @@ -0,0 +1,507 @@ +/* sync-gridfs-stream.c - libmongo-client GridFS streaming implementation + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/sync-gridfs-stream.c + * MongoDB GridFS Streaming API implementation. + */ + +#include "sync-gridfs-stream.h" +#include "libmongo-private.h" + +#include <unistd.h> +#include <errno.h> + +mongo_sync_gridfs_stream * +mongo_sync_gridfs_stream_find (mongo_sync_gridfs *gfs, + const bson *query) +{ + mongo_sync_gridfs_stream *stream; + bson *meta = NULL; + bson_cursor *c; + mongo_packet *p; + const guint8 *oid; + + if (!gfs) + { + errno = ENOTCONN; + return NULL; + } + if (!query) + { + errno = EINVAL; + return NULL; + } + + p = mongo_sync_cmd_query (gfs->conn, gfs->ns.files, 0, 0, 1, query, NULL); + if (!p) + return NULL; + + stream = g_new0 (mongo_sync_gridfs_stream, 1); + stream->gfs = gfs; + stream->file.type = LMC_GRIDFS_FILE_STREAM_READER; + + mongo_wire_reply_packet_get_nth_document (p, 1, &meta); + bson_finish (meta); + mongo_wire_packet_free (p); + + c = bson_find (meta, "_id"); + if (!bson_cursor_get_oid (c, &oid)) + { + bson_cursor_free (c); + bson_free (meta); + g_free (stream); + + errno = EPROTO; + return NULL; + } + stream->file.id = g_malloc (12); + memcpy (stream->file.id, oid, 12); + + bson_cursor_find (c, "length"); + bson_cursor_get_int64 (c, &stream->file.length); + if (stream->file.length == 0) + { + gint32 i = 0; + + bson_cursor_get_int32 (c, &i); + stream->file.length = i; + } + + bson_cursor_find (c, "chunkSize"); + bson_cursor_get_int32 (c, &stream->file.chunk_size); + + bson_cursor_free (c); + bson_free (meta); + + if (stream->file.length == 0 || + stream->file.chunk_size == 0) + { + g_free (stream->file.id); + g_free (stream); + + errno = EPROTO; + return NULL; + } + + return stream; +} + +mongo_sync_gridfs_stream * +mongo_sync_gridfs_stream_new (mongo_sync_gridfs *gfs, + const bson *metadata) +{ + mongo_sync_gridfs_stream *stream; + bson_cursor *c; + + if (!gfs) + { + errno = ENOTCONN; + return NULL; + } + + stream = g_new0 (mongo_sync_gridfs_stream, 1); + stream->file.type = LMC_GRIDFS_FILE_STREAM_WRITER; + stream->gfs = gfs; + + stream->file.chunk_size = gfs->chunk_size; + + stream->writer.metadata = bson_new_from_data (bson_data (metadata), + bson_size (metadata) - 1); + + c = bson_find (metadata, "_id"); + if (!c) + { + stream->file.id = mongo_util_oid_new + (mongo_connection_get_requestid ((mongo_connection *)gfs->conn)); + if (!stream->file.id) + { + bson_free (stream->writer.metadata); + g_free (stream); + + errno = EFAULT; + return NULL; + } + bson_append_oid (stream->writer.metadata, "_id", stream->file.id); + } + else + { + const guint8 *oid; + + if (!bson_cursor_get_oid (c, &oid)) + { + bson_cursor_free (c); + bson_free (stream->writer.metadata); + g_free (stream); + + errno = EPROTO; + return NULL; + } + + stream->file.id = g_malloc (12); + memcpy (stream->file.id, oid, 12); + } + bson_cursor_free (c); + bson_finish (stream->writer.metadata); + + stream->writer.buffer = g_malloc (stream->file.chunk_size); + stream->writer.checksum = g_checksum_new (G_CHECKSUM_MD5); + + return stream; +} + +static inline gboolean +_stream_seek_chunk (mongo_sync_gridfs_stream *stream, + gint64 chunk) +{ + bson *b; + mongo_packet *p; + bson_cursor *c; + bson_binary_subtype subt = BSON_BINARY_SUBTYPE_USER_DEFINED; + gboolean r; + + b = bson_new_sized (32); + bson_append_oid (b, "files_id", stream->file.id); + bson_append_int64 (b, "n", chunk); + bson_finish (b); + + p = mongo_sync_cmd_query (stream->gfs->conn, + stream->gfs->ns.chunks, 0, + 0, 1, b, NULL); + bson_free (b); + + bson_free (stream->reader.bson); + stream->reader.bson = NULL; + stream->reader.chunk.data = NULL; + + mongo_wire_reply_packet_get_nth_document (p, 1, &stream->reader.bson); + mongo_wire_packet_free (p); + bson_finish (stream->reader.bson); + + c = bson_find (stream->reader.bson, "data"); + r = bson_cursor_get_binary (c, &subt, &stream->reader.chunk.data, + &stream->reader.chunk.size); + if (!r || (subt != BSON_BINARY_SUBTYPE_GENERIC && + subt != BSON_BINARY_SUBTYPE_BINARY)) + { + bson_cursor_free (c); + bson_free (stream->reader.bson); + stream->reader.bson = NULL; + stream->reader.chunk.data = NULL; + + errno = EPROTO; + return FALSE; + } + bson_cursor_free (c); + + if (subt == BSON_BINARY_SUBTYPE_BINARY) + { + stream->reader.chunk.start_offset = 4; + stream->reader.chunk.size -= 4; + } + stream->reader.chunk.offset = 0; + + return TRUE; +} + +gint64 +mongo_sync_gridfs_stream_read (mongo_sync_gridfs_stream *stream, + guint8 *buffer, + gint64 size) +{ + gint64 pos = 0; + + if (!stream) + { + errno = ENOENT; + return -1; + } + if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER) + { + errno = EOPNOTSUPP; + return -1; + } + if (!buffer || size <= 0) + { + errno = EINVAL; + return -1; + } + + if (!stream->reader.chunk.data) + { + if (!_stream_seek_chunk (stream, 0)) + return -1; + } + + while (pos < size && stream->file.offset + + stream->reader.chunk.start_offset < stream->file.length) + { + gint32 csize = stream->reader.chunk.size - stream->reader.chunk.offset; + + if (size - pos < csize) + csize = size - pos; + + memcpy (buffer + pos, + stream->reader.chunk.data + + stream->reader.chunk.start_offset + + stream->reader.chunk.offset, csize); + + stream->reader.chunk.offset += csize; + stream->file.offset += csize; + pos += csize; + + if (stream->reader.chunk.offset + stream->reader.chunk.start_offset >= + stream->reader.chunk.size && + stream->file.offset + stream->reader.chunk.start_offset < + stream->file.length) + { + stream->file.current_chunk++; + if (!_stream_seek_chunk (stream, stream->file.current_chunk)) + return -1; + } + } + + return pos; +} + +static gboolean +_stream_chunk_write (mongo_sync_gridfs *gfs, + const guint8 *oid, gint64 n, + const guint8 *buffer, gint32 size) +{ + bson *chunk; + + chunk = bson_new_sized (size + 128); + bson_append_oid (chunk, "files_id", oid); + bson_append_int64 (chunk, "n", n); + bson_append_binary (chunk, "data", BSON_BINARY_SUBTYPE_GENERIC, + buffer, size); + bson_finish (chunk); + + if (!mongo_sync_cmd_insert (gfs->conn, gfs->ns.chunks, chunk, NULL)) + { + int e = errno; + + bson_free (chunk); + errno = e; + return FALSE; + } + bson_free (chunk); + + return TRUE; +} + +gboolean +mongo_sync_gridfs_stream_write (mongo_sync_gridfs_stream *stream, + const guint8 *buffer, + gint64 size) +{ + gint64 pos = 0; + + if (!stream) + { + errno = ENOENT; + return FALSE; + } + if (stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER) + { + errno = EOPNOTSUPP; + return FALSE; + } + if (!buffer || size <= 0) + { + errno = EINVAL; + return FALSE; + } + + while (pos < size) + { + gint32 csize = stream->file.chunk_size - stream->writer.buffer_offset; + + if (size - pos < csize) + csize = size - pos; + + memcpy (stream->writer.buffer + stream->writer.buffer_offset, + buffer + pos, csize); + stream->writer.buffer_offset += csize; + stream->file.offset += csize; + stream->file.length += csize; + pos += csize; + + if (stream->writer.buffer_offset == stream->file.chunk_size) + { + if (!_stream_chunk_write (stream->gfs, + stream->file.id, + stream->file.current_chunk, + stream->writer.buffer, + stream->file.chunk_size)) + return FALSE; + g_checksum_update (stream->writer.checksum, stream->writer.buffer, + stream->file.chunk_size); + + stream->writer.buffer_offset = 0; + stream->file.current_chunk++; + } + } + + return TRUE; +} + +gboolean +mongo_sync_gridfs_stream_seek (mongo_sync_gridfs_stream *stream, + gint64 pos, + gint whence) +{ + gint64 real_pos = 0; + gint64 chunk; + gint32 offs; + + if (!stream) + { + errno = ENOENT; + return FALSE; + } + if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER) + { + errno = EOPNOTSUPP; + return FALSE; + } + + switch (whence) + { + case SEEK_SET: + if (pos == stream->file.offset) + return TRUE; + if (pos < 0 || pos > stream->file.length) + { + errno = ERANGE; + return FALSE; + } + real_pos = pos; + break; + case SEEK_CUR: + if (pos + stream->file.offset < 0 || + pos + stream->file.offset > stream->file.length) + { + errno = ERANGE; + return FALSE; + } + if (pos == 0) + return TRUE; + real_pos = pos + stream->file.offset; + break; + case SEEK_END: + if (pos > 0 || pos + stream->file.length < 0) + { + errno = ERANGE; + return FALSE; + } + real_pos = pos + stream->file.length; + break; + default: + errno = EINVAL; + return FALSE; + } + + chunk = real_pos / stream->file.chunk_size; + offs = real_pos % stream->file.chunk_size; + + if (!_stream_seek_chunk (stream, chunk)) + return FALSE; + + stream->reader.chunk.offset = offs; + stream->file.current_chunk = chunk; + stream->file.offset = real_pos; + + return TRUE; +} + +gboolean +mongo_sync_gridfs_stream_close (mongo_sync_gridfs_stream *stream) +{ + if (!stream) + { + errno = ENOENT; + return FALSE; + } + + if (stream->file.type != LMC_GRIDFS_FILE_STREAM_READER && + stream->file.type != LMC_GRIDFS_FILE_STREAM_WRITER) + { + errno = EINVAL; + return FALSE; + } + + if (stream->file.type == LMC_GRIDFS_FILE_STREAM_WRITER) + { + bson *meta; + gint64 upload_date; + GTimeVal tv; + gboolean closed = FALSE; + + if (stream->writer.buffer_offset > 0) + { + closed = _stream_chunk_write (stream->gfs, + stream->file.id, + stream->file.current_chunk, + stream->writer.buffer, + stream->writer.buffer_offset); + + if (closed) + g_checksum_update (stream->writer.checksum, + stream->writer.buffer, + stream->writer.buffer_offset); + } + + if (closed) + { + g_get_current_time (&tv); + upload_date = (((gint64) tv.tv_sec) * 1000) + + (gint64)(tv.tv_usec / 1000); + + /* _id is guaranteed by _stream_new() */ + meta = bson_new_from_data (bson_data (stream->writer.metadata), + bson_size (stream->writer.metadata) - 1); + bson_append_int64 (meta, "length", stream->file.length); + bson_append_int32 (meta, "chunkSize", stream->file.chunk_size); + bson_append_utc_datetime (meta, "uploadDate", upload_date); + if (stream->file.length) + bson_append_string (meta, "md5", + g_checksum_get_string (stream->writer.checksum), -1); + bson_finish (meta); + + if (!mongo_sync_cmd_insert (stream->gfs->conn, + stream->gfs->ns.files, meta, NULL)) + { + int e = errno; + + bson_free (meta); + errno = e; + return FALSE; + } + bson_free (meta); + } + + bson_free (stream->writer.metadata); + g_checksum_free (stream->writer.checksum); + g_free (stream->writer.buffer); + } + else + bson_free (stream->reader.bson); + + g_free (stream->file.id); + g_free (stream); + return TRUE; +} diff --git a/src/sync-gridfs-stream.h b/src/sync-gridfs-stream.h new file mode 100644 index 0000000..017f2ea --- /dev/null +++ b/src/sync-gridfs-stream.h @@ -0,0 +1,141 @@ +/* sync-gridfs-stream.h - libmong-client GridFS streaming API + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/sync-gridfs-stream.h + * MongoDB GridFS Streaming API. + * + * @addtogroup mongo_sync_gridfs_api + * @{ + */ + +#ifndef LIBMONGO_SYNC_GRIDFS_STREAM_H +#define LIBMONGO_SYNC_GRIDFS_STREAM_H 1 + +#include <sync-gridfs.h> +#include <glib.h> + +G_BEGIN_DECLS + +/** @defgroup mongo_sync_gridfs_stream_api Mongo GridFS Streaming API + * + * Ths submodule provides stream-based access to GridFS files. Stream + * based access has the advantage of allowing arbitrary reads and + * multi-part writes, at the cost of slightly higher memory usage and + * lower performance speed. + * + * It's best used when one needs only part of a file (and not + * neccessarily a full chunk, or the parts cross chunk boundaries), or + * when uploading a file from a source that cannot be fully stored in + * a memory buffer, and cannot be mmapped. Such as a network + * connection. + * + * @addtogroup mongo_sync_gridfs_stream_api + * @{ + */ + +/** Opaque GridFS file stream object type. */ +typedef struct _mongo_sync_gridfs_stream mongo_sync_gridfs_stream; + +/** Create a stream reader by finding the file matching a query. + * + * @param gfs is the GridFS to search on. + * @param query is the query based on which the file should be + * searched. + * + * @returns A newly allocated read-only stream object, or NULL on + * error. + * + * @note It is the responsiblity of the caller to free the stream once + * it is no longer needed. + */ +mongo_sync_gridfs_stream *mongo_sync_gridfs_stream_find (mongo_sync_gridfs *gfs, + const bson *query); + +/** Create a new GridFS stream writer. + * + * @param gfs is the GridFS to create a file on. + * @param metadata is the optional extra file metadata to use. + * + * @returns A newly allocated write-only stream object, or NULL on + * error. + * + * @note It is the responsiblity of the caller to free the stream once + * it is no longer needed. + */ +mongo_sync_gridfs_stream *mongo_sync_gridfs_stream_new (mongo_sync_gridfs *gfs, + const bson *metadata); + +/** Read an arbitrary number of bytes from a GridFS stream. + * + * @param stream is the read-only stream to read from. + * @param buffer is the buffer to store the read data in. + * @param size is the maximum number of bytes to read. + * + * @returns The number of bytes read, or -1 on error. + * + * @note The @a buffer parameter must have enough space allocated to + * hold at most @a size bytes. + */ +gint64 mongo_sync_gridfs_stream_read (mongo_sync_gridfs_stream *stream, + guint8 *buffer, + gint64 size); + +/** Write an arbitrary number of bytes to a GridFS stream. + * + * @param stream is the write-only stream to write to. + * @param buffer is the data to write. + * @param size is the amount of data to write. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_gridfs_stream_write (mongo_sync_gridfs_stream *stream, + const guint8 *buffer, + gint64 size); + +/** Seek to an arbitrary position in a GridFS stream. + * + * @param stream is the read-only stream to seek in. + * @param pos is the position to seek to. + * @param whence is used to determine how to seek. Possible values are + * @b SEEK_SET which means seek to the given position, @b SEEK_CUR + * meaning seek to the current position plus @a pos and @b SEEK_END + * which will seek from the end of the file. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_gridfs_stream_seek (mongo_sync_gridfs_stream *stream, + gint64 pos, + gint whence); + +/** Close a GridFS stream. + * + * Closes the GridFS stream, by writing out the buffered data, and the + * metadata if it's a write stream, and freeing up all resources in + * all cases. + * + * @param stream is the GridFS stream to close and free. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_gridfs_stream_close (mongo_sync_gridfs_stream *stream); + +/** @} */ + +G_END_DECLS + +/** @} */ + +#endif diff --git a/src/sync-gridfs.c b/src/sync-gridfs.c new file mode 100644 index 0000000..7d1af24 --- /dev/null +++ b/src/sync-gridfs.c @@ -0,0 +1,345 @@ +/* sync-gridfs.c - libmongo-client GridFS implementation + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/sync-gridfs.c + * MongoDB GridFS implementation. + */ + +#include "sync-gridfs.h" +#include "libmongo-private.h" + +#include <errno.h> + +mongo_sync_gridfs * +mongo_sync_gridfs_new (mongo_sync_connection *conn, + const gchar *ns_prefix) +{ + mongo_sync_gridfs *gfs; + bson *index; + gchar *db; + + if (!conn) + { + errno = ENOTCONN; + return NULL; + } + if (!ns_prefix) + { + errno = EINVAL; + return NULL; + } + db = strchr (ns_prefix, '.'); + if (!db) + { + errno = EINVAL; + return NULL; + } + + gfs = g_new (mongo_sync_gridfs, 1); + gfs->conn = conn; + + gfs->ns.prefix = g_strdup (ns_prefix); + gfs->ns.files = g_strconcat (gfs->ns.prefix, ".files", NULL); + gfs->ns.chunks = g_strconcat (gfs->ns.prefix, ".chunks", NULL); + gfs->ns.db = g_strndup (ns_prefix, db - ns_prefix); + + gfs->chunk_size = 256 * 1024; + + index = bson_new_sized (256); + bson_append_int32 (index, "files_id", 1); + bson_append_int32 (index, "n", 1); + bson_finish (index); + + if (!mongo_sync_cmd_index_create (conn, gfs->ns.chunks, index, + MONGO_INDEX_UNIQUE)) + { + bson_free (index); + mongo_sync_gridfs_free (gfs, FALSE); + + errno = EPROTO; + return NULL; + } + bson_free (index); + + return gfs; +} + +void +mongo_sync_gridfs_free (mongo_sync_gridfs *gfs, gboolean disconnect) +{ + if (!gfs) + { + errno = ENOTCONN; + return; + } + + g_free (gfs->ns.prefix); + g_free (gfs->ns.files); + g_free (gfs->ns.chunks); + g_free (gfs->ns.db); + + if (disconnect) + mongo_sync_disconnect (gfs->conn); + + g_free (gfs); + errno = 0; +} + +gint32 +mongo_sync_gridfs_get_chunk_size (mongo_sync_gridfs *gfs) +{ + if (!gfs) + { + errno = ENOTCONN; + return -1; + } + return gfs->chunk_size; +} + +gboolean +mongo_sync_gridfs_set_chunk_size (mongo_sync_gridfs *gfs, + gint32 chunk_size) +{ + if (!gfs) + { + errno = ENOTCONN; + return FALSE; + } + if (chunk_size < 1) + { + errno = EINVAL; + return FALSE; + } + + gfs->chunk_size = chunk_size; + return TRUE; +} + +mongo_sync_cursor * +mongo_sync_gridfs_list (mongo_sync_gridfs *gfs, + const bson *query) +{ + mongo_sync_cursor *cursor; + bson *q = NULL; + + if (!gfs) + { + errno = ENOTCONN; + return NULL; + } + + if (!query) + { + q = bson_new (); + bson_finish (q); + } + + cursor = mongo_sync_cursor_new + (gfs->conn, gfs->ns.files, + mongo_sync_cmd_query (gfs->conn, gfs->ns.files, 0, 0, 0, + (q) ? q : query, NULL)); + if (!cursor) + { + int e = errno; + + bson_free (q); + errno = e; + return NULL; + } + bson_free (q); + return cursor; +} + +const guint8 * +mongo_sync_gridfs_file_get_id (gpointer gfile) +{ + mongo_sync_gridfs_chunked_file *c = (mongo_sync_gridfs_chunked_file *)gfile; + mongo_sync_gridfs_stream *s = (mongo_sync_gridfs_stream *)gfile; + + if (!gfile) + { + errno = ENOTCONN; + return NULL; + } + if (c->meta.type == LMC_GRIDFS_FILE_CHUNKED) + return c->meta.oid; + else + return s->file.id; +} + +gint64 +mongo_sync_gridfs_file_get_length (gpointer gfile) +{ + mongo_sync_gridfs_file_common *f = (mongo_sync_gridfs_file_common *)gfile; + + if (!gfile) + { + errno = ENOTCONN; + return -1; + } + return f->length; +} + +gint32 +mongo_sync_gridfs_file_get_chunk_size (gpointer gfile) +{ + mongo_sync_gridfs_file_common *f = (mongo_sync_gridfs_file_common *)gfile; + + if (!gfile) + { + errno = ENOTCONN; + return -1; + } + return f->chunk_size; +} + +const gchar * +mongo_sync_gridfs_file_get_md5 (gpointer gfile) +{ + mongo_sync_gridfs_chunked_file *f = (mongo_sync_gridfs_chunked_file *)gfile; + + if (!gfile) + { + errno = ENOTCONN; + return NULL; + } + if (f->meta.type != LMC_GRIDFS_FILE_CHUNKED) + { + errno = EOPNOTSUPP; + return NULL; + } + + return f->meta.md5; +} + +gint64 +mongo_sync_gridfs_file_get_date (gpointer gfile) +{ + mongo_sync_gridfs_chunked_file *f = (mongo_sync_gridfs_chunked_file *)gfile; + + if (!gfile) + { + errno = ENOTCONN; + return -1; + } + if (f->meta.type != LMC_GRIDFS_FILE_CHUNKED) + { + errno = EOPNOTSUPP; + return -1; + } + + return f->meta.date; +} + +const bson * +mongo_sync_gridfs_file_get_metadata (gpointer gfile) +{ + mongo_sync_gridfs_chunked_file *f = (mongo_sync_gridfs_chunked_file *)gfile; + + if (!gfile) + { + errno = ENOTCONN; + return NULL; + } + if (f->meta.type != LMC_GRIDFS_FILE_CHUNKED) + { + errno = EOPNOTSUPP; + return NULL; + } + + return f->meta.metadata; +} + +gint64 +mongo_sync_gridfs_file_get_chunks (gpointer gfile) +{ + mongo_sync_gridfs_file_common *f = (mongo_sync_gridfs_file_common *)gfile; + double chunk_count; + + if (!gfile) + { + errno = ENOTCONN; + return -1; + } + + chunk_count = (double)f->length / (double)f->chunk_size; + return (chunk_count - (gint64)chunk_count > 0) ? + (gint64)(chunk_count + 1) : (gint64)(chunk_count); +} + +gboolean +mongo_sync_gridfs_remove (mongo_sync_gridfs *gfs, + const bson *query) +{ + mongo_sync_cursor *fc; + + fc = mongo_sync_gridfs_list (gfs, query); + if (!fc) + { + if (errno != ENOTCONN) + errno = ENOENT; + return FALSE; + } + + while (mongo_sync_cursor_next (fc)) + { + bson *meta = mongo_sync_cursor_get_data (fc), *q; + bson_cursor *c; + const guint8 *ooid; + guint8 oid[12]; + + c = bson_find (meta, "_id"); + if (!bson_cursor_get_oid (c, &ooid)) + { + bson_free (meta); + bson_cursor_free (c); + mongo_sync_cursor_free (fc); + + errno = EPROTO; + return FALSE; + } + bson_cursor_free (c); + memcpy (oid, ooid, 12); + bson_free (meta); + + /* Delete metadata */ + q = bson_build (BSON_TYPE_OID, "_id", oid, + BSON_TYPE_NONE); + bson_finish (q); + + if (!mongo_sync_cmd_delete (gfs->conn, gfs->ns.files, 0, q)) + { + bson_free (q); + mongo_sync_cursor_free (fc); + return FALSE; + } + bson_free (q); + + /* Delete chunks */ + q = bson_build (BSON_TYPE_OID, "files_id", oid, + BSON_TYPE_NONE); + bson_finish (q); + + /* Chunks may or may not exist, an error in this case is + non-fatal. */ + mongo_sync_cmd_delete (gfs->conn, gfs->ns.chunks, 0, q); + bson_free (q); + } + + mongo_sync_cursor_free (fc); + + return TRUE; +} diff --git a/src/sync-gridfs.h b/src/sync-gridfs.h new file mode 100644 index 0000000..5d9ae1c --- /dev/null +++ b/src/sync-gridfs.h @@ -0,0 +1,193 @@ +/* sync-gridfs.h - libmong-client GridFS API + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/sync-gridfs.h + * MongoDB GridFS API. + * + * @addtogroup mongo_sync + * @{ + */ + +#ifndef LIBMONGO_SYNC_GRIDFS_H +#define LIBMONGO_SYNC_GRIDFS_H 1 + +#include <mongo-sync.h> +#include <mongo-sync-cursor.h> +#include <glib.h> + +G_BEGIN_DECLS + +/** @defgroup mongo_sync_gridfs_api Mongo GridFS API + * + * The GridFS API - and related modules, like @ref + * mongo_sync_gridfs_chunk_api and @ref mongo_sync_gridfs_stream_api - + * provide a conveneint way to work with GridFS, and files stored on + * it. + * + * This module implements the GridFS support functions, which allow + * one to connect to or create new GridFS instances, list or remove + * files, or retrieve metadata about files opened by one of the + * sub-modules. + * + * @addtogroup mongo_sync_gridfs_api + * @{ + */ + +/** Opaque GridFS object. */ +typedef struct _mongo_sync_gridfs mongo_sync_gridfs; + +/** Create a new GridFS object. + * + * @param conn is the MongoDB connection to base the filesystem object + * on. + * @param ns_prefix is the prefix the GridFS collections should be + * under. + * + * @returns A newly allocated GridFS object, or NULL on error. + */ +mongo_sync_gridfs *mongo_sync_gridfs_new (mongo_sync_connection *conn, + const gchar *ns_prefix); + +/** Close and free a GridFS object. + * + * @param gfs is the GridFS object to free up. + * @param disconnect signals whether to free the underlying connection + * aswell. + */ +void mongo_sync_gridfs_free (mongo_sync_gridfs *gfs, gboolean disconnect); + +/** Get the default chunk size of a GridFS object. + * + * @param gfs is the GridFS object to get the default chunk size of. + * + * @returns The chunk size in bytes, or -1 on error. + */ +gint32 mongo_sync_gridfs_get_chunk_size (mongo_sync_gridfs *gfs); + +/** Set the default chunk size of a GridFS object. + * + * @param gfs is the GridFS object to set the default chunk size of. + * @param chunk_size is the desired default chunk size. + * + * @returns TRUE on success, FALSE otherwise. + */ +gboolean mongo_sync_gridfs_set_chunk_size (mongo_sync_gridfs *gfs, + gint32 chunk_size); + +/** List GridFS files matching a query. + * + * Finds all files on a GridFS, based on a custom query. + * + * @param gfs is the GridFS to list files from. + * @param query is the custom query based on which files shall be + * sought. Passing a NULL query will find all files, without + * restriction. + * + * @returns A newly allocated cursor object, or NULL on error. It is + * the responsibility of the caller to free the returned cursor once + * it is no longer needed. + */ +mongo_sync_cursor *mongo_sync_gridfs_list (mongo_sync_gridfs *gfs, + const bson *query); + +/** Delete files matching a query from GridFS. + * + * Finds all files on a GridFS, based on a custom query, and removes + * them. + * + * @param gfs is the GridFS to delete files from. + * @param query is the custom query based on which files shall be + * sought. Passing a NULL query will find all files, without + * restriction. + * + * @returns TRUE if all files were deleted successfully, FALSE + * otherwise. + */ +gboolean mongo_sync_gridfs_remove (mongo_sync_gridfs *gfs, + const bson *query); + +/* Metadata */ + +/** Get the file ID of a GridFS file. + * + * @param gfile is the GridFS file to work with. + * + * @returns The ObjectID of the file, or NULL on error. The returned + * pointer points to an internal area, and should not be modified or + * freed, and is only valid as long as the file object is valid. + */ +const guint8 *mongo_sync_gridfs_file_get_id (gpointer gfile); + +/** Get the length of a GridFS file. + * + * @param gfile is the GridFS file to work with. + * + * @returns The length of the file, or -1 on error. + */ +gint64 mongo_sync_gridfs_file_get_length (gpointer gfile); + +/** Get the chunk size of a GridFS file. + * + * @param gfile is the GridFS file to work with. + * + * @returns The maximum size of the chunks of the file, or -1 on error. + */ +gint32 mongo_sync_gridfs_file_get_chunk_size (gpointer gfile); + +/** Get the MD5 digest of a GridFS file. + * + * @param gfile is the GridFS file to work with. + * + * @returns The MD5 digest of the file, or NULL on error. The returned + * pointer points to an internal area, and should not be modified or + * freed, and is only valid as long as the file object is valid. + */ +const gchar *mongo_sync_gridfs_file_get_md5 (gpointer gfile); + +/** Get the upload date of a GridFS file. + * + * @param gfile is the GridFS file to work with. + * + * @returns The upload date of the file, or -1 on error. + */ +gint64 mongo_sync_gridfs_file_get_date (gpointer gfile); + +/** Get the full metadata of a GridFS file + * + * @param gfile is the GridFS file to work with. + * + * @returns A BSON object containing the full metadata, or NULL on + * error. The returned pointer points to an internal area, and should + * not be modified or freed, and is only valid as long as the file + * object is valid. + */ +const bson *mongo_sync_gridfs_file_get_metadata (gpointer gfile); + +/** Get the number of chunks in a GridFS file. + * + * @param gfile is the GridFS file to work with. + * + * @returns The number of chunks in the GridFS file, or -1 on error. + */ +gint64 mongo_sync_gridfs_file_get_chunks (gpointer gfile); + +/** @} */ + +G_END_DECLS + +/** @} */ + +#endif |