Switch search SQL to triple-quote strings. (#14311)

For ease of reading we switch from concatenated strings to
triple quote strings.
This commit is contained in:
Patrick Cloke 2022-10-28 11:44:10 -04:00 committed by GitHub
parent 453914b472
commit 81815e0561
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 100 additions and 89 deletions

View File

@ -0,0 +1 @@
Allow use of postgres and sqllite full-text search operators in search queries.

View File

@ -80,11 +80,11 @@ class SearchWorkerStore(SQLBaseStore):
if not self.hs.config.server.enable_search: if not self.hs.config.server.enable_search:
return return
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
sql = ( sql = """
"INSERT INTO event_search" INSERT INTO event_search
" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" (event_id, room_id, key, vector, stream_ordering, origin_server_ts)
" VALUES (?,?,?,to_tsvector('english', ?),?,?)" VALUES (?,?,?,to_tsvector('english', ?),?,?)
) """
args1 = ( args1 = (
( (
@ -101,20 +101,20 @@ class SearchWorkerStore(SQLBaseStore):
txn.execute_batch(sql, args1) txn.execute_batch(sql, args1)
elif isinstance(self.database_engine, Sqlite3Engine): elif isinstance(self.database_engine, Sqlite3Engine):
sql = ( self.db_pool.simple_insert_many_txn(
"INSERT INTO event_search (event_id, room_id, key, value)" txn,
" VALUES (?,?,?,?)" table="event_search",
keys=("event_id", "room_id", "key", "value"),
values=(
(
entry.event_id,
entry.room_id,
entry.key,
_clean_value_for_search(entry.value),
)
for entry in entries
),
) )
args2 = (
(
entry.event_id,
entry.room_id,
entry.key,
_clean_value_for_search(entry.value),
)
for entry in entries
)
txn.execute_batch(sql, args2)
else: else:
# This should be unreachable. # This should be unreachable.
@ -162,15 +162,17 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
TYPES = ["m.room.name", "m.room.message", "m.room.topic"] TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
def reindex_search_txn(txn: LoggingTransaction) -> int: def reindex_search_txn(txn: LoggingTransaction) -> int:
sql = ( sql = """
"SELECT stream_ordering, event_id, room_id, type, json, " SELECT stream_ordering, event_id, room_id, type, json, origin_server_ts
" origin_server_ts FROM events" FROM events
" JOIN event_json USING (room_id, event_id)" JOIN event_json USING (room_id, event_id)
" WHERE ? <= stream_ordering AND stream_ordering < ?" WHERE ? <= stream_ordering AND stream_ordering < ?
" AND (%s)" AND (%s)
" ORDER BY stream_ordering DESC" ORDER BY stream_ordering DESC
" LIMIT ?" LIMIT ?
) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),) """ % (
" OR ".join("type = '%s'" % (t,) for t in TYPES),
)
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
@ -284,8 +286,10 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
try: try:
c.execute( c.execute(
"CREATE INDEX CONCURRENTLY event_search_fts_idx" """
" ON event_search USING GIN (vector)" CREATE INDEX CONCURRENTLY event_search_fts_idx
ON event_search USING GIN (vector)
"""
) )
except psycopg2.ProgrammingError as e: except psycopg2.ProgrammingError as e:
logger.warning( logger.warning(
@ -323,12 +327,16 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
# We create with NULLS FIRST so that when we search *backwards* # We create with NULLS FIRST so that when we search *backwards*
# we get the ones with non null origin_server_ts *first* # we get the ones with non null origin_server_ts *first*
c.execute( c.execute(
"CREATE INDEX CONCURRENTLY event_search_room_order ON event_search(" """
"room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)" CREATE INDEX CONCURRENTLY event_search_room_order
ON event_search(room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
"""
) )
c.execute( c.execute(
"CREATE INDEX CONCURRENTLY event_search_order ON event_search(" """
"origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)" CREATE INDEX CONCURRENTLY event_search_order
ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
"""
) )
conn.set_session(autocommit=False) conn.set_session(autocommit=False)
@ -345,14 +353,14 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
) )
def reindex_search_txn(txn: LoggingTransaction) -> Tuple[int, bool]: def reindex_search_txn(txn: LoggingTransaction) -> Tuple[int, bool]:
sql = ( sql = """
"UPDATE event_search AS es SET stream_ordering = e.stream_ordering," UPDATE event_search AS es
" origin_server_ts = e.origin_server_ts" SET stream_ordering = e.stream_ordering, origin_server_ts = e.origin_server_ts
" FROM events AS e" FROM events AS e
" WHERE e.event_id = es.event_id" WHERE e.event_id = es.event_id
" AND ? <= e.stream_ordering AND e.stream_ordering < ?" AND ? <= e.stream_ordering AND e.stream_ordering < ?
" RETURNING es.stream_ordering" RETURNING es.stream_ordering
) """
min_stream_id = max_stream_id - batch_size min_stream_id = max_stream_id - batch_size
txn.execute(sql, (min_stream_id, max_stream_id)) txn.execute(sql, (min_stream_id, max_stream_id))
@ -456,33 +464,33 @@ class SearchStore(SearchBackgroundUpdateStore):
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
search_query = search_term search_query = search_term
tsquery_func = self.database_engine.tsquery_func tsquery_func = self.database_engine.tsquery_func
sql = ( sql = f"""
f"SELECT ts_rank_cd(vector, {tsquery_func}('english', ?)) AS rank," SELECT ts_rank_cd(vector, {tsquery_func}('english', ?)) AS rank,
" room_id, event_id" room_id, event_id
" FROM event_search" FROM event_search
f" WHERE vector @@ {tsquery_func}('english', ?)" WHERE vector @@ {tsquery_func}('english', ?)
) """
args = [search_query, search_query] + args args = [search_query, search_query] + args
count_sql = ( count_sql = f"""
"SELECT room_id, count(*) as count FROM event_search" SELECT room_id, count(*) as count FROM event_search
f" WHERE vector @@ {tsquery_func}('english', ?)" WHERE vector @@ {tsquery_func}('english', ?)
) """
count_args = [search_query] + count_args count_args = [search_query] + count_args
elif isinstance(self.database_engine, Sqlite3Engine): elif isinstance(self.database_engine, Sqlite3Engine):
search_query = _parse_query_for_sqlite(search_term) search_query = _parse_query_for_sqlite(search_term)
sql = ( sql = """
"SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" SELECT rank(matchinfo(event_search)) as rank, room_id, event_id
" FROM event_search" FROM event_search
" WHERE value MATCH ?" WHERE value MATCH ?
) """
args = [search_query] + args args = [search_query] + args
count_sql = ( count_sql = """
"SELECT room_id, count(*) as count FROM event_search" SELECT room_id, count(*) as count FROM event_search
" WHERE value MATCH ?" WHERE value MATCH ?
) """
count_args = [search_query] + count_args count_args = [search_query] + count_args
else: else:
# This should be unreachable. # This should be unreachable.
@ -588,26 +596,27 @@ class SearchStore(SearchBackgroundUpdateStore):
raise SynapseError(400, "Invalid pagination token") raise SynapseError(400, "Invalid pagination token")
clauses.append( clauses.append(
"(origin_server_ts < ?" """
" OR (origin_server_ts = ? AND stream_ordering < ?))" (origin_server_ts < ? OR (origin_server_ts = ? AND stream_ordering < ?))
"""
) )
args.extend([origin_server_ts, origin_server_ts, stream]) args.extend([origin_server_ts, origin_server_ts, stream])
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
search_query = search_term search_query = search_term
tsquery_func = self.database_engine.tsquery_func tsquery_func = self.database_engine.tsquery_func
sql = ( sql = f"""
f"SELECT ts_rank_cd(vector, {tsquery_func}('english', ?)) as rank," SELECT ts_rank_cd(vector, {tsquery_func}('english', ?)) as rank,
" origin_server_ts, stream_ordering, room_id, event_id" origin_server_ts, stream_ordering, room_id, event_id
" FROM event_search" FROM event_search
f" WHERE vector @@ {tsquery_func}('english', ?) AND " WHERE vector @@ {tsquery_func}('english', ?) AND
) """
args = [search_query, search_query] + args args = [search_query, search_query] + args
count_sql = ( count_sql = f"""
"SELECT room_id, count(*) as count FROM event_search" SELECT room_id, count(*) as count FROM event_search
f" WHERE vector @@ {tsquery_func}('english', ?) AND " WHERE vector @@ {tsquery_func}('english', ?) AND
) """
count_args = [search_query] + count_args count_args = [search_query] + count_args
elif isinstance(self.database_engine, Sqlite3Engine): elif isinstance(self.database_engine, Sqlite3Engine):
@ -619,23 +628,24 @@ class SearchStore(SearchBackgroundUpdateStore):
# in the events table to get the topological ordering. We need # in the events table to get the topological ordering. We need
# to use the indexes in this order because sqlite refuses to # to use the indexes in this order because sqlite refuses to
# MATCH unless it uses the full text search index # MATCH unless it uses the full text search index
sql = ( sql = """
"SELECT rank(matchinfo) as rank, room_id, event_id," SELECT
" origin_server_ts, stream_ordering" rank(matchinfo) as rank, room_id, event_id, origin_server_ts, stream_ordering
" FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" FROM (
" FROM event_search" SELECT key, event_id, matchinfo(event_search) as matchinfo
" WHERE value MATCH ?" FROM event_search
" )" WHERE value MATCH ?
" CROSS JOIN events USING (event_id)"
" WHERE "
) )
CROSS JOIN events USING (event_id)
WHERE
"""
search_query = _parse_query_for_sqlite(search_term) search_query = _parse_query_for_sqlite(search_term)
args = [search_query] + args args = [search_query] + args
count_sql = ( count_sql = """
"SELECT room_id, count(*) as count FROM event_search" SELECT room_id, count(*) as count FROM event_search
" WHERE value MATCH ? AND " WHERE value MATCH ? AND
) """
count_args = [search_query] + count_args count_args = [search_query] + count_args
else: else:
# This should be unreachable. # This should be unreachable.
@ -647,10 +657,10 @@ class SearchStore(SearchBackgroundUpdateStore):
# We add an arbitrary limit here to ensure we don't try to pull the # We add an arbitrary limit here to ensure we don't try to pull the
# entire table from the database. # entire table from the database.
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
sql += ( sql += """
" ORDER BY origin_server_ts DESC NULLS LAST," ORDER BY origin_server_ts DESC NULLS LAST, stream_ordering DESC NULLS LAST
" stream_ordering DESC NULLS LAST LIMIT ?" LIMIT ?
) """
elif isinstance(self.database_engine, Sqlite3Engine): elif isinstance(self.database_engine, Sqlite3Engine):
sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?" sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
else: else: