diff options
Diffstat (limited to 'src/mongo-wire.c')
-rw-r--r-- | src/mongo-wire.c | 645 |
1 files changed, 645 insertions, 0 deletions
diff --git a/src/mongo-wire.c b/src/mongo-wire.c new file mode 100644 index 0000000..cf140a5 --- /dev/null +++ b/src/mongo-wire.c @@ -0,0 +1,645 @@ +/* mongo-wire.c - libmongo-client's MongoDB wire protocoll implementation. + * Copyright 2011, 2012 Gergely Nagy <algernon@balabit.hu> + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** @file src/mongo-wire.c + * Implementation of the MongoDB Wire Protocol. + */ + +#include <glib.h> +#include <string.h> +#include <stdarg.h> +#include <errno.h> + +#include "bson.h" +#include "mongo-wire.h" +#include "libmongo-private.h" + +/** @internal Constant zero value. */ +static const gint32 zero = 0; + +/** @internal A MongoDB command, as it appears on the wire. + * + * For the sake of clarity, and sanity of the library, the header and + * data parts are stored separately, and as such, will need to be sent + * separately aswell. + */ +struct _mongo_packet +{ + mongo_packet_header header; /**< The packet header. */ + guint8 *data; /**< The actual data of the packet. */ + gint32 data_size; /**< Size of the data payload. */ +}; + +/** @internal Mongo command opcodes. */ +typedef enum + { + OP_REPLY = 1, /**< Message is a reply. Only sent by the server. */ + OP_MSG = 1000, /**< Message is a generic message. */ + OP_UPDATE = 2001, /**< Message is an update command. */ + OP_INSERT = 2002, /**< Message is an insert command. */ + OP_RESERVED = 2003, /**< Reserved and unused. */ + OP_QUERY = 2004, /**< Message is a query command. */ + OP_GET_MORE = 2005, /**< Message is a get more command. */ + OP_DELETE = 2006, /**< Message is a delete command. */ + OP_KILL_CURSORS = 2007 /**< Message is a kill cursors command. */ + } mongo_wire_opcode; + +mongo_packet * +mongo_wire_packet_new (void) +{ + mongo_packet *p = (mongo_packet *)g_new0 (mongo_packet, 1); + + p->header.length = GINT32_TO_LE (sizeof (mongo_packet_header)); + return p; +} + +gboolean +mongo_wire_packet_get_header (const mongo_packet *p, + mongo_packet_header *header) +{ + if (!p || !header) + { + errno = EINVAL; + return FALSE; + } + + header->length = GINT32_FROM_LE (p->header.length); + header->id = GINT32_FROM_LE (p->header.id); + header->resp_to = GINT32_FROM_LE (p->header.resp_to); + header->opcode = GINT32_FROM_LE (p->header.opcode); + + return TRUE; +} + +gboolean +mongo_wire_packet_get_header_raw (const mongo_packet *p, + mongo_packet_header *header) +{ + if (!p || !header) + { + errno = EINVAL; + return FALSE; + } + + header->length = p->header.length; + header->id = p->header.id; + header->resp_to = p->header.resp_to; + header->opcode = p->header.opcode; + + return TRUE; +} + +gboolean +mongo_wire_packet_set_header (mongo_packet *p, + const mongo_packet_header *header) +{ + if (!p || !header) + { + errno = EINVAL; + return FALSE; + } + if (GINT32_FROM_LE (header->length) < (gint32)sizeof (mongo_packet_header)) + { + errno = ERANGE; + return FALSE; + } + + p->header.length = GINT32_TO_LE (header->length); + p->header.id = GINT32_TO_LE (header->id); + p->header.resp_to = GINT32_TO_LE (header->resp_to); + p->header.opcode = GINT32_TO_LE (header->opcode); + + p->data_size = header->length - sizeof (mongo_packet_header); + + return TRUE; +} + +gboolean +mongo_wire_packet_set_header_raw (mongo_packet *p, + const mongo_packet_header *header) +{ + if (!p || !header) + { + errno = EINVAL; + return FALSE; + } + + p->header.length = header->length; + p->header.id = header->id; + p->header.resp_to = header->resp_to; + p->header.opcode = header->opcode; + + p->data_size = header->length - sizeof (mongo_packet_header); + + return TRUE; +} + +gint32 +mongo_wire_packet_get_data (const mongo_packet *p, const guint8 **data) +{ + if (!p || !data) + { + errno = EINVAL; + return -1; + } + if (p->data == NULL) + { + errno = EINVAL; + return -1; + } + + *data = (const guint8 *)p->data; + return p->data_size; +} + +gboolean +mongo_wire_packet_set_data (mongo_packet *p, const guint8 *data, gint32 size) +{ + if (!p || !data || size <= 0) + { + errno = EINVAL; + return FALSE; + } + + if (p->data) + g_free (p->data); + p->data = g_malloc (size); + memcpy (p->data, data, size); + + p->data_size = size; + p->header.length = + GINT32_TO_LE (p->data_size + sizeof (mongo_packet_header)); + + return TRUE; +} + +void +mongo_wire_packet_free (mongo_packet *p) +{ + if (!p) + { + errno = EINVAL; + return; + } + + if (p->data) + g_free (p->data); + g_free (p); +} + +mongo_packet * +mongo_wire_cmd_update (gint32 id, const gchar *ns, gint32 flags, + const bson *selector, const bson *update) +{ + mongo_packet *p; + gint32 t_flags = GINT32_TO_LE (flags); + gint nslen; + + if (!ns || !selector || !update) + { + errno = EINVAL; + return NULL; + } + + if (bson_size (selector) < 0 || + bson_size (update) < 0) + { + errno = EINVAL; + return NULL; + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_UPDATE); + + nslen = strlen (ns) + 1; + p->data_size = bson_size (selector) + bson_size (update) + + sizeof (gint32) * 2 + nslen; + + p->data = g_malloc (p->data_size); + + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, nslen); + memcpy (p->data + sizeof (gint32) + nslen, (void *)&t_flags, + sizeof (gint32)); + memcpy (p->data + sizeof (gint32) * 2 + nslen, + bson_data (selector), bson_size (selector)); + memcpy (p->data + sizeof (gint32) * 2 + nslen + bson_size (selector), + bson_data (update), bson_size (update)); + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_insert_n (gint32 id, const gchar *ns, gint32 n, + const bson **docs) +{ + mongo_packet *p; + gint32 pos, dsize = 0; + gint32 i; + + if (!ns || !docs) + { + errno = EINVAL; + return NULL; + } + + if (n <= 0) + { + errno = ERANGE; + return NULL; + } + + for (i = 0; i < n; i++) + { + if (bson_size (docs[i]) <= 0) + { + errno = EINVAL; + return NULL; + } + dsize += bson_size (docs[i]); + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_INSERT); + + pos = sizeof (gint32) + strlen (ns) + 1; + p->data_size = pos + dsize; + p->data = (guint8 *)g_malloc (p->data_size); + + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, strlen (ns) + 1); + + for (i = 0; i < n; i++) + { + memcpy (p->data + pos, bson_data (docs[i]), bson_size (docs[i])); + pos += bson_size (docs[i]); + } + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_insert (gint32 id, const gchar *ns, ...) +{ + mongo_packet *p; + bson **docs, *d; + gint32 n = 0; + va_list ap; + + if (!ns) + { + errno = EINVAL; + return NULL; + } + + docs = (bson **)g_new0 (bson *, 1); + + va_start (ap, ns); + while ((d = (bson *)va_arg (ap, gpointer))) + { + if (bson_size (d) < 0) + { + g_free (docs); + errno = EINVAL; + return NULL; + } + + docs = (bson **)g_renew (bson *, docs, n + 1); + docs[n++] = d; + } + va_end (ap); + + p = mongo_wire_cmd_insert_n (id, ns, n, (const bson **)docs); + g_free (docs); + return p; +} + +mongo_packet * +mongo_wire_cmd_query (gint32 id, const gchar *ns, gint32 flags, + gint32 skip, gint32 ret, const bson *query, + const bson *sel) +{ + mongo_packet *p; + gint32 tmp, nslen; + + if (!ns || !query) + { + errno = EINVAL; + return NULL; + } + + if (bson_size (query) < 0 || (sel && bson_size (sel) < 0)) + { + errno = EINVAL; + return NULL; + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_QUERY); + + nslen = strlen (ns) + 1; + p->data_size = + sizeof (gint32) + nslen + sizeof (gint32) * 2 + bson_size (query); + + if (sel) + p->data_size += bson_size (sel); + p->data = g_malloc (p->data_size); + + tmp = GINT32_TO_LE (flags); + memcpy (p->data, (void *)&tmp, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, nslen); + tmp = GINT32_TO_LE (skip); + memcpy (p->data + sizeof (gint32) + nslen, (void *)&tmp, sizeof (gint32)); + tmp = GINT32_TO_LE (ret); + memcpy (p->data + sizeof (gint32) * 2 + nslen, + (void *)&tmp, sizeof (gint32)); + memcpy (p->data + sizeof (gint32) * 3 + nslen, bson_data (query), + bson_size (query)); + + if (sel) + memcpy (p->data + sizeof (gint32) * 3 + nslen + bson_size (query), + bson_data (sel), bson_size (sel)); + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_get_more (gint32 id, const gchar *ns, + gint32 ret, gint64 cursor_id) +{ + mongo_packet *p; + gint32 t_ret; + gint64 t_cid; + gint32 nslen; + + if (!ns) + { + errno = EINVAL; + return NULL; + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_GET_MORE); + + t_ret = GINT32_TO_LE (ret); + t_cid = GINT64_TO_LE (cursor_id); + + nslen = strlen (ns) + 1; + p->data_size = sizeof (gint32) + nslen + sizeof (gint32) + sizeof (gint64); + p->data = g_malloc (p->data_size); + + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, nslen); + memcpy (p->data + sizeof (gint32) + nslen, (void *)&t_ret, sizeof (gint32)); + memcpy (p->data + sizeof (gint32) * 2 + nslen, + (void *)&t_cid, sizeof (gint64)); + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_delete (gint32 id, const gchar *ns, + gint32 flags, const bson *sel) +{ + mongo_packet *p; + gint32 t_flags, nslen; + + if (!ns || !sel) + { + errno = EINVAL; + return NULL; + } + + if (bson_size (sel) < 0) + { + errno = EINVAL; + return NULL; + } + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_DELETE); + + nslen = strlen (ns) + 1; + p->data_size = sizeof (gint32) + nslen + sizeof (gint32) + bson_size (sel); + p->data = g_malloc (p->data_size); + + t_flags = GINT32_TO_LE (flags); + + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)ns, nslen); + memcpy (p->data + sizeof (gint32) + nslen, + (void *)&t_flags, sizeof (gint32)); + memcpy (p->data + sizeof (gint32) * 2 + nslen, + bson_data (sel), bson_size (sel)); + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_kill_cursors_va (gint32 id, gint32 n, va_list ap) +{ + mongo_packet *p; + gint32 i, t_n, pos; + gint64 t_cid; + + p = (mongo_packet *)g_new0 (mongo_packet, 1); + p->header.id = GINT32_TO_LE (id); + p->header.opcode = GINT32_TO_LE (OP_KILL_CURSORS); + + p->data_size = sizeof (gint32) + sizeof (gint32) + sizeof (gint64)* n; + p->data = g_malloc (p->data_size); + + t_n = GINT32_TO_LE (n); + pos = sizeof (gint32) * 2; + memcpy (p->data, (void *)&zero, sizeof (gint32)); + memcpy (p->data + sizeof (gint32), (void *)&t_n, sizeof (gint32)); + + for (i = 1; i <= n; i++) + { + t_cid = va_arg (ap, gint64); + t_cid = GINT64_TO_LE (t_cid); + + memcpy (p->data + pos, (void *)&t_cid, sizeof (gint64)); + pos += sizeof (gint64); + } + + p->header.length = GINT32_TO_LE (sizeof (p->header) + p->data_size); + + return p; +} + +mongo_packet * +mongo_wire_cmd_kill_cursors (gint32 id, gint32 n, ...) +{ + va_list ap; + mongo_packet *p; + + if (n <= 0) + { + errno = EINVAL; + return NULL; + } + + va_start (ap, n); + p = mongo_wire_cmd_kill_cursors_va (id, n, ap); + va_end (ap); + + return p; +} + +mongo_packet * +mongo_wire_cmd_custom (gint32 id, const gchar *db, gint32 flags, + const bson *command) +{ + mongo_packet *p; + gchar *ns; + bson *empty; + + if (!db || !command) + { + errno = EINVAL; + return NULL; + } + + if (bson_size (command) < 0) + { + errno = EINVAL; + return NULL; + } + + ns = g_strconcat (db, ".$cmd", NULL); + + empty = bson_new (); + bson_finish (empty); + + p = mongo_wire_cmd_query (id, ns, flags, 0, 1, command, empty); + g_free (ns); + bson_free (empty); + return p; +} + +gboolean +mongo_wire_reply_packet_get_header (const mongo_packet *p, + mongo_reply_packet_header *hdr) +{ + mongo_reply_packet_header h; + const guint8 *data; + + if (!p || !hdr) + { + errno = EINVAL; + return FALSE; + } + + if (p->header.opcode != OP_REPLY) + { + errno = EPROTO; + return FALSE; + } + + if (mongo_wire_packet_get_data (p, &data) == -1) + return FALSE; + + memcpy (&h, data, sizeof (mongo_reply_packet_header)); + + hdr->flags = GINT32_FROM_LE (h.flags); + hdr->cursor_id = GINT64_FROM_LE (h.cursor_id); + hdr->start = GINT32_FROM_LE (h.start); + hdr->returned = GINT32_FROM_LE (h.returned); + + return TRUE; +} + +gboolean +mongo_wire_reply_packet_get_data (const mongo_packet *p, + const guint8 **data) +{ + const guint8 *d; + + if (!p || !data) + { + errno = EINVAL; + return FALSE; + } + + if (p->header.opcode != OP_REPLY) + { + errno = EPROTO; + return FALSE; + } + + if (mongo_wire_packet_get_data (p, &d) == -1) + return FALSE; + + *data = d + sizeof (mongo_reply_packet_header); + return TRUE; +} + +gboolean +mongo_wire_reply_packet_get_nth_document (const mongo_packet *p, + gint32 n, + bson **doc) +{ + const guint8 *d; + mongo_reply_packet_header h; + gint32 i; + gint32 pos = 0; + + if (!p || !doc || n <= 0) + { + errno = EINVAL; + return FALSE; + } + + if (p->header.opcode != OP_REPLY) + { + errno = EPROTO; + return FALSE; + } + + if (!mongo_wire_reply_packet_get_header (p, &h)) + return FALSE; + + if (h.returned < n) + { + errno = ERANGE; + return FALSE; + } + + if (!mongo_wire_reply_packet_get_data (p, &d)) + return FALSE; + + for (i = 1; i < n; i++) + pos += bson_stream_doc_size (d, pos); + + *doc = bson_new_from_data (d + pos, bson_stream_doc_size (d, pos) - 1); + return TRUE; +} |