-- JSON DBC: Database connectivity over a JSON-RPC link. -- Client-side code -- Safe to reload live. require "jsonrpc/client" db = db or {} db._rawsql_mt = db._rawsql_mt or {} local _rawsql_mt = db._rawsql_mt function rawsql(str) local t = {} t.sql = str setmetatable(t, _rawsql_mt) return t end -- use this as a table value (for InsertDBRow/UpdateDBRow/etc) to force the -- 'null' to appear in the query since lua will cull out the 'nil' in your -- table. Not necessary for db.format. db.null = rawsql('null') local function dbquote(thing) if(type(thing) == "string") then return "'"..db_escape(thing).."'" elseif type(thing) == "number" then return thing elseif type(thing) == "nil" then return "null" elseif type(thing) == "table" and getmetatable(thing) == _rawsql_mt then return thing.sql else error('Trying to use a '..type(thing)..' as a SQL argument!') end end function db.format(fmt, ...) local n=0 return string.gsub(fmt, "?", function() n=n+1 return dbquote(arg[n]) end) end -- currently-executing transactions db.xacn = db.xacn or {} db.xid = db.xid or 1 local xacn = db.xacn function db.connect(userid, jsonrpc_host, jsonrpc_port, jsonrpc_login_magic) db.rpc = assert(JSONRPC.make_client(jsonrpc_host, jsonrpc_port, { dbdata = function(rpc, reqid, row, idx) if xacn[reqid] then xacn[reqid](row, idx) end end}) ) assert(db.rpc:login(userid, jsonrpc_login_magic) == "ok") end -- synchronous; execute fn(row, index) for all rows in a query function db.foreach(query, fn) local xid = db.xid db.xid=db.xid+1 xacn[xid] = fn local nrows, err = db.rpc:dbquery(query, xid) if not nrows then log.error('Error executing query: '..err..'\nquery was: '..query) end xacn[xid] = nil return nrows, err end -- synchronous; returns a table containing all rows function db.fetchall(query) local rows = {} local ok, err = db.foreach(query, function(row) table.insert(rows, row) end) if not ok then return ok, err end return rows end -- asynchronous version function db.foreach_async(query, callback) local xid = db.xid db.xid=db.xid+1 xacn[xid] = callback CreateThread(function() local nrows, err = db.rpc:dbquery(query, xid) if not nrows then log.error('Error executing query: '..tostring(err)..'\nquery was: '..tostring(query)) end xacn[xid] = nil end) end -- asynchronous query execution: no result function db.execute(query) local nrows, err = db.rpc:notify('execute', query) end -- synchronous; blocks until all pending executions have been sent function db.flush() -- all we have to do is insert a "marker" RPC query into the outgoing -- stream; when we receive the response to this query, we know the server -- has received all data up to this point. return db.rpc:checkpoint() end function db.executef(query, ...) return db.execute(db.format(query, unpack(arg))) end function db.fetchallf(query, ...) return db.fetchall(db.format(query, unpack(arg))) end -- these have tricky argument orders, so make sure fn is really a function... I -- don't really like it this way but I don't have a better way in mind either. function db.foreachf(query, fn, ...) assert(type(fn) == "function") return db.foreach(db.format(query, unpack(arg)), fn) end function db.foreachf_async(query, cb, ...) assert(cb == nil or type(cb) == "function") return db.foreach_async(db.format(query, unpack(arg)), cb) end function db.executebatch(updatetbl, finishedfn, timeout) -- Just queue up all the updates as asynchronous execute calls en masse -- It'd be better to 'stream' these on demand as the output buffer needs -- filling, though, to avoid using huge amounts of RAM to buffer these. for _,q in ipairs(updatetbl) do db.execute(q) end local numq = table.getn(updatetbl) updatetbl = nil -- allow this huge mess of updates to be collected timeout = timeout or 60 local tmr = Timer() tmr:SetTimeout(timeout*1000, function() log.error("db.executebatch: timed out!") tmr:Kill() if finishedfn then finishedfn(false, numq) end return end) -- Create a spawn a process that waits for our batch to be acknowledged by -- the server and calls our callback. CreateThread(function() db.flush() tmr:Kill() if finishedfn then finishedfn(true, numq) end end) return true end function db.stresstest_query(limit) local n=0 limit = limit or 1000 local t = os.time() print('Starting stress test...') db.foreach("select * from characterdb limit "..limit, function(row, index) n=n+1 end) print("Got "..n.."/"..limit.." characters; finished in "..(os.time() - t)..' seconds.') end function db.insertrow(tablename, values, qtype) local qtype = qtype or 'insert' local fields_sql = {} local values_sql = {} local function add_param(param,val) table.insert(fields_sql, param) table.insert(values_sql, dbquote(val)) end for param,val in pairs(values) do add_param(param,val) end return db.execute(qtype..' into '..tablename..' ('.. table.concat(fields_sql, ',')..') VALUES ('..table.concat(values_sql,',')..')') end function db.replacerow(tablename, values) return db.insertrow(tablename, values, 'replace') end function db.updaterow(tablename, values, whereclause, ...) local values_sql = {} local function add_param(param,val) table.insert(values_sql, param..'='..dbquote(val)) end for param,val in pairs(values) do add_param(param,val) end whereclause = whereclause and (' where '..formatquery(whereclause, unpack(arg))) or '' return db.execute('update '..tablename..' set '..table.concat(values_sql, ',')..whereclause) end