Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to observe on selected columns of a collection #609

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1cdf6f8
feat: add support to observe selected columns on raw records
pranjal-jain Jan 20, 2020
2f4c634
test: Update QueryDescription and encodeQuery tests
pranjal-jain Jan 20, 2020
08bf04c
refactor: Change API to observe on selected columns and add tests
pranjal-jain Jan 29, 2020
47102f0
Fix a few types
azizhk Jan 29, 2020
be79a10
refactor: rename fetchSelected to experimentalFetchColumns
pranjal-jain Jan 30, 2020
57a8a9b
add changelog
pranjal-jain Jan 30, 2020
2365dad
Merge branch 'observable_columns' into observable_columns
pranjal-jain Jan 30, 2020
91a7501
Merge pull request #1 from azizhk/observable_columns
pranjal-jain Jan 30, 2020
07cd83a
refactor: flow fixes
pranjal-jain Jan 30, 2020
776c93d
add api to typescript declations
pranjal-jain Jan 30, 2020
cbcfc96
refacotor API and code cleanup
pranjal-jain Jan 30, 2020
266c7fa
Add changelog entry and update docs to include new API
pranjal-jain Jan 30, 2020
ab72313
fix: minor change in doc
pranjal-jain Jan 30, 2020
a8fd663
Merge branch 'master' of github.com:Nozbe/WatermelonDB into observabl…
pranjal-jain Jan 30, 2020
97460e4
fix: typescript declarations
pranjal-jain Jan 30, 2020
7eaabf0
refactor: remove unused imports
pranjal-jain Jan 30, 2020
8786d99
fix: typing of RecordState
pranjal-jain Jan 31, 2020
97e41d8
fix: bypass caching for select queries
pranjal-jain Feb 2, 2020
1a7d946
fix: minor fix in android database driver
pranjal-jain Feb 5, 2020
7c75d08
fix: run matcher on change set before returning
pranjal-jain Feb 25, 2020
c967b16
Merge branch 'master' of github.com:Nozbe/WatermelonDB into observabl…
zhirzh Nov 10, 2021
e34922d
Merge pull request #4 from zhirzh/observable_columns
pranjal-jain Aug 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-Unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

- [adapters] Adapter objects can now be distinguished by checking their `static adapterType`
- [Query] New `Q.includes('foo')` query for case-sensitive exact string includes comparison
- [Query] Added experimental `query.experimentalFetchColumns(['col1', 'col2'])` and `query.experimentalObserveColumns(['col1', 'col2'])` and methods to fetch and observe on record states with only the selected columns.

### Performance

Expand Down
42 changes: 42 additions & 0 deletions docs-master/Query.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ const comments = await post.comments
const verifiedCommentCount = await post.verifiedComments.count
```

To fetch record states with selected columns only, use `experimentalFetchColumns(['col1', 'col2'])`.

```js
const commentStates = await post.comments.experimentalFetchColumns(['created_at', 'is_verified'])
/**
* commentStates = [{
* id: "abcd",
* created_at: 1580384390117,
* is_verified: true
* },
* {
* id: "efgh",
* created_at: 1580388020008,
* is_verified: true
* }]
**/
const sortedComments = commentStates.sort((comment1, comment2) => comment1.created_at - comment2.created_at)
const isLastCommentVerified = !!sortedComments.length && sortedComments[0].is_verified
```

This will query **all** comments of the post and fetch comment states with only `id`, `created_at` and `is_verified` columns.

## Query conditions

```js
Expand Down Expand Up @@ -267,6 +289,26 @@ tasksCollection.query(

Call `query.observeWithColumns(['foo', 'bar'])` to create an Observable that emits a value not only when the list of matching records changes (new records/deleted records), but also when any of the matched records changes its `foo` or `bar` column. [Use this for observing sorted lists](./Components.md)

### Observing on columns

Call `query.experimentalObserveColumns(['col1', 'col2'])` to create an Observable that emits matching records' states with only the selected columns populated (`id` is included implicitly). The observable will emit not only when a matching record is added or deleted but also when any of the matched records changes its observed columns (any of 'col1' or 'col2' in this case)

This is same as

```js
query.observe().pipe(
map(records =>
records.map(
record => ({
id: record._raw.id,
col1: record._raw.col1,
col2: record._raw.col2
})
)
)
)
```

#### Count throttling

By default, calling `query.observeCount()` returns an Observable that is throttled to emit at most once every 250ms. You can disable throttling using `query.observeCount(false)`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class DatabaseBridge(private val reactContext: ReactApplicationContext) :

@ReactMethod
fun query(tag: ConnectionTag, table: TableName, query: SQL, args: ReadableArray, promise: Promise) =
withDriver(tag, promise) { it.query(table, query, args.toArrayList().toArray()) }

@ReactMethod
fun cachedQuery(tag: ConnectionTag, table: TableName, query: SQL, args: ReadableArray, promise: Promise) =
withDriver(tag, promise) { it.cachedQuery(table, query, args.toArrayList().toArray()) }

@ReactMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,16 @@ class DatabaseDriver(context: Context, dbName: String) {
}
}

fun query(table: TableName, query: SQL, args: QueryArgs): WritableArray {
return query(table, query, false, args)
}

fun cachedQuery(table: TableName, query: SQL, args: QueryArgs): WritableArray {
// log?.info("Cached Query: $query")
return query(table, query, true, args)
}

private fun query(table: TableName, query: SQL, willCache: Boolean, args: QueryArgs): WritableArray {
val resultArray = Arguments.createArray()
database.rawQuery(query, args).use {
if (it.count > 0 && it.columnNames.contains("id")) {
Expand All @@ -63,7 +71,9 @@ class DatabaseDriver(context: Context, dbName: String) {
if (isCached(table, id)) {
resultArray.pushString(id)
} else {
markAsCached(table, id)
if (willCache) {
markAsCached(table, id)
}
resultArray.pushMapFromCursor(it)
}
}
Expand Down
6 changes: 6 additions & 0 deletions native/ios/WatermelonDB/DatabaseBridge.m
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ @interface RCT_EXTERN_REMAP_MODULE(DatabaseBridge, DatabaseBridge, NSObject)
args:(nonnull NSArray *)args
)

WMELON_BRIDGE_METHOD(cachedQuery,
table:(nonnull NSString *)table
query:(nonnull NSString *)query
args:(nonnull NSArray *)args
)

WMELON_BRIDGE_METHOD(queryIds,
query:(nonnull NSString *)query
args:(nonnull NSArray *)args
Expand Down
11 changes: 11 additions & 0 deletions native/ios/WatermelonDB/DatabaseBridge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ extension DatabaseBridge {
query: Database.SQL,
args: Database.QueryArgs,
resolve: @escaping RCTPromiseResolveBlock, reject: @escaping RCTPromiseRejectBlock) {
withDriver(tag, resolve, reject) {
try $0.query(table: table, query: query, args: args)
}
}

@objc(cachedQuery:table:query:args:resolve:reject:)
func cachedQuery(tag: ConnectionTag,
table: Database.TableName,
query: Database.SQL,
args: Database.QueryArgs,
resolve: @escaping RCTPromiseResolveBlock, reject: @escaping RCTPromiseRejectBlock) {
withDriver(tag, resolve, reject) {
try $0.cachedQuery(table: table, query: query, args: args)
}
Expand Down
12 changes: 11 additions & 1 deletion native/ios/WatermelonDB/DatabaseDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,24 @@ class DatabaseDriver {
return record.resultDictionary!
}

func query(table: Database.TableName, query: Database.SQL, args: Database.QueryArgs = []) throws -> [Any] {
return try _query(table, query, false, args)
}

func cachedQuery(table: Database.TableName, query: Database.SQL, args: Database.QueryArgs = []) throws -> [Any] {
return try _query(table, query, true, args)
}

private func _query(_ table: Database.TableName, _ query: Database.SQL, _ willCache: Bool, _ args: Database.QueryArgs = []) throws -> [Any] {
return try database.queryRaw(query, args).map { row in
let id = row.string(forColumn: "id")!

if isCached(table, id) {
return id
} else {
markAsCached(table, id)
if willCache {
markAsCached(table, id)
}
return row.resultDictionary!
}
}
Expand Down
12 changes: 11 additions & 1 deletion native/shared/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,14 @@ jsi::Value Database::find(jsi::String &tableName, jsi::String &id) {
}

jsi::Value Database::query(jsi::String &tableName, jsi::String &sql, jsi::Array &arguments) {
return query(tableName, sql, false, arguments);
}

jsi::Value Database::cachedQuery(jsi::String &tableName, jsi::String &sql, jsi::Array &arguments) {
return query(tableName, sql, true, arguments);
}

jsi::Value Database::query(jsi::String &tableName, jsi::String &sql, bool willCache, jsi::Array &arguments) {
auto &rt = getRt();
const std::lock_guard<std::mutex> lock(mutex_);

Expand All @@ -434,7 +442,9 @@ jsi::Value Database::query(jsi::String &tableName, jsi::String &sql, jsi::Array
jsi::String jsiId = jsi::String::createFromAscii(rt, id);
records.push_back(std::move(jsiId));
} else {
markAsCached(cacheKey(tableName.utf8(rt), std::string(id)));
if (willCache) {
markAsCached(cacheKey(tableName.utf8(rt), std::string(id)));
}
jsi::Object record = resultDictionary(statement.stmt);
records.push_back(std::move(record));
}
Expand Down
2 changes: 2 additions & 0 deletions native/shared/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class Database : public jsi::HostObject {

jsi::Value find(jsi::String &tableName, jsi::String &id);
jsi::Value query(jsi::String &tableName, jsi::String &sql, jsi::Array &arguments);
jsi::Value cachedQuery(jsi::String &tableName, jsi::String &sql, jsi::Array &arguments);
jsi::Value query(jsi::String &tableName, jsi::String &sql, bool willCache, jsi::Array &arguments);
jsi::Value queryAsArray(jsi::String &tableName, jsi::String &sql, jsi::Array &arguments);
jsi::Array queryIds(jsi::String &sql, jsi::Array &arguments);
jsi::Array unsafeQueryRaw(jsi::String &sql, jsi::Array &arguments);
Expand Down
7 changes: 7 additions & 0 deletions native/shared/DatabaseInstallation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ void Database::install(jsi::Runtime *runtime) {
jsi::Array arguments = args[2].getObject(rt).getArray(rt);
return database->query(tableName, sql, arguments);
});
createMethod(rt, adapter, "cachedQuery", 3, [database](jsi::Runtime &rt, const jsi::Value *args) {
assert(database->initialized_);
jsi::String tableName = args[0].getString(rt);
jsi::String sql = args[1].getString(rt);
jsi::Array arguments = args[2].getObject(rt).getArray(rt);
return database->cachedQuery(tableName, sql, arguments);
});
createMethod(rt, adapter, "queryAsArray", 3, [database](jsi::Runtime &rt, const jsi::Value *args) {
assert(database->initialized_);
jsi::String tableName = args[0].getString(rt);
Expand Down
17 changes: 15 additions & 2 deletions src/Collection/RecordCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import logger from '../utils/common/logger'
import type Model, { RecordId } from '../Model'
import type Collection from './index'
import type { CachedQueryResult } from '../adapters/type'
import type { TableName } from '../Schema'
import type { RawRecord } from '../RawRecord'
import type { TableName, ColumnName } from '../Schema'
import type { RawRecord, RecordState } from '../RawRecord'
import { getRecordState } from '../RawRecord'

type Instantiator<T> = (RawRecord) => T

Expand Down Expand Up @@ -57,6 +58,18 @@ export default class RecordCache<Record: Model> {
return this._modelForRaw(result)
}

recordStatesFromQueryResult(result: CachedQueryResult, columns: ColumnName[]): RecordState[] {
return result.map(res => this.recordStateFromQueryResult(res, columns))
}

recordStateFromQueryResult(result: RecordId | RawRecord, columns: ColumnName[]): RecordState {
let rawRecord = result
if (typeof rawRecord === 'string') {
rawRecord = this._cachedModelForId(rawRecord)._raw
}
return getRecordState(rawRecord, columns)
}

_cachedModelForId(id: RecordId): Record {
const record = this.map.get(id)

Expand Down
11 changes: 9 additions & 2 deletions src/Collection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type Database from '../Database'
import type Model, { RecordId } from '../Model'
import type { Clause } from '../QueryDescription'
import { type TableName, type TableSchema } from '../Schema'
import { type DirtyRaw } from '../RawRecord'
import { type DirtyRaw, type RecordState } from '../RawRecord'

import RecordCache from './RecordCache'

Expand Down Expand Up @@ -148,11 +148,18 @@ export default class Collection<Record: Model> {

// See: Query.fetch
_fetchQuery(query: Query<Record>, callback: ResultCallback<Record[]>): void {
this.database.adapter.underlyingAdapter.query(query.serialize(), (result) =>
this.database.adapter.underlyingAdapter.cachedQuery(query.serialize(), (result) =>
callback(mapValue((rawRecords) => this._cache.recordsFromQueryResult(rawRecords), result)),
)
}

_fetchQuerySelect(query: Query<Record>, callback: ResultCallback<RecordState[]>): void {
const columns = query.getSelectedColumns()
this.database.adapter.underlyingAdapter.query(query.serialize(), (result) =>
callback(mapValue((rawRecords) => this._cache.recordStatesFromQueryResult(rawRecords, columns),result)),
)
}

_fetchIds(query: Query<Record>, callback: ResultCallback<RecordId[]>): void {
this.database.adapter.underlyingAdapter.queryIds(query.serialize(), callback)
}
Expand Down
10 changes: 5 additions & 5 deletions src/Collection/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe('fetching queries', () => {
it('fetches queries and caches records', async () => {
const { tasks: collection, adapter } = mockDatabase()

adapter.query = jest
adapter.cachedQuery = jest
.fn()
.mockImplementation((query, cb) => cb({ value: [{ id: 'm1' }, { id: 'm2' }] }))

Expand All @@ -112,13 +112,13 @@ describe('fetching queries', () => {
expect(collection._cache.map.get('m2')).toBe(models[1])

// check if query was passed correctly
expect(adapter.query.mock.calls.length).toBe(1)
expect(adapter.query.mock.calls[0][0]).toEqual(query.serialize())
expect(adapter.cachedQuery.mock.calls.length).toBe(1)
expect(adapter.cachedQuery.mock.calls[0][0]).toEqual(query.serialize())
})
it('fetches query records from cache if possible', async () => {
const { tasks: collection, adapter } = mockDatabase()

adapter.query = jest.fn().mockImplementation((query, cb) => cb({ value: ['m1', { id: 'm2' }] }))
adapter.cachedQuery = jest.fn().mockImplementation((query, cb) => cb({ value: ['m1', { id: 'm2' }] }))

const m1 = new MockTask(collection, { id: 'm1' })
collection._cache.add(m1)
Expand All @@ -135,7 +135,7 @@ describe('fetching queries', () => {
it('fetches query records from cache even if full raw object was sent', async () => {
const { tasks: collection, adapter } = mockDatabase()

adapter.query = jest
adapter.cachedQuery = jest
.fn()
.mockImplementation((query, cb) => cb({ value: [{ id: 'm1' }, { id: 'm2' }] }))

Expand Down
6 changes: 5 additions & 1 deletion src/Query/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
declare module '@nozbe/watermelondb/Query' {
import { Collection, ColumnName, Model, TableName, RecordId } from '@nozbe/watermelondb'
import { Collection, ColumnName, Model, TableName, RecordId, RecordState } from '@nozbe/watermelondb'
import { AssociationInfo } from '@nozbe/watermelondb/Model'
import { Clause, QueryDescription } from '@nozbe/watermelondb/QueryDescription'
import { Observable } from 'rxjs'
Expand All @@ -22,10 +22,14 @@ declare module '@nozbe/watermelondb/Query' {

public fetch(): Promise<Record[]>

public experimentalFetchColumns<T>(rawFields: Array<keyof T>): Promise<RecordState<T>[]>

public observe(): Observable<Record[]>

public observeWithColumns(rawFields: ColumnName[]): Observable<Record[]>

public experimentalObserveColumns<T>(rawFields: Array<keyof T>): Observable<RecordState<T>[]>

public fetchIds(): Promise<RecordId[]>

public fetchCount(): Promise<number>
Expand Down
25 changes: 25 additions & 0 deletions src/Query/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import lazy from '../decorators/lazy' // import from decorarators break the app
import subscribeToCount from '../observation/subscribeToCount'
import subscribeToQuery from '../observation/subscribeToQuery'
import subscribeToQueryWithColumns from '../observation/subscribeToQueryWithColumns'
import subscribeToQueryWithSelect from '../observation/subscribeToQueryWithSelect'
import * as Q from '../QueryDescription'
import type { Clause, QueryDescription } from '../QueryDescription'
import type Model, { AssociationInfo, RecordId } from '../Model'
import type Collection from '../Collection'
import type { TableName, ColumnName } from '../Schema'
import type { RecordState } from '../RawRecord'

import { getAssociations } from './helpers'

Expand Down Expand Up @@ -75,6 +77,7 @@ export default class Query<Record: Model> {
extend(...clauses: Clause[]): Query<Record> {
const { collection } = this
const {
select,
where,
sortBy,
take,
Expand All @@ -91,6 +94,7 @@ export default class Query<Record: Model> {
return new Query(collection, [
Q.experimentalJoinTables(joinTables),
...nestedJoinTables.map(({ from, to }) => Q.experimentalNestedJoin(from, to)),
...select,
...where,
...sortBy,
...(take ? [Q.take(take)] : []),
Expand All @@ -117,6 +121,11 @@ export default class Query<Record: Model> {
return this.fetch().then(onFulfill, onReject)
}

experimentalFetchColumns(columnNames: ColumnName[]): Promise<any[]> {
const queryWithSelect = this.extend(Q.experimentalSelect(columnNames))
return toPromise(callback => this.collection._fetchQuerySelect(queryWithSelect, callback))
}

// Emits an array of matching records, then emits a new array every time it changes
observe(): Observable<Record[]> {
return Observable.create((observer) =>
Expand All @@ -136,6 +145,18 @@ export default class Query<Record: Model> {
)
}

// Same as `observeWithColumns(columnNames)` but emits raw records with only the
// selected `columnNames` (and `id` property added implicitly).
// Note: This is an experimental feature and this API might change in future versions.
experimentalObserveColumns(columnNames: ColumnName[]): Observable<RecordState[]> {
const queryWithSelect = this.extend(Q.experimentalSelect(columnNames))
return Observable.create(observer =>
subscribeToQueryWithSelect(queryWithSelect, records => {
observer.next(records)
}),
)
}

// Queries database and returns the number of matching records
fetchCount(): Promise<number> {
return toPromise((callback) => this.collection._fetchCount(this, callback))
Expand Down Expand Up @@ -193,6 +214,10 @@ export default class Query<Record: Model> {
return this._cachedCountSubscribable.subscribe(subscriber)
}

getSelectedColumns(): ColumnName[] {
return Q.getSelectedColumns(this.description)
}

// Marks as deleted all records matching the query
async markAllAsDeleted(): Promise<void> {
const records = await this.fetch()
Expand Down
Loading