Skip to content

Commit 95cb897

Browse files
haileyokdevinivy
andauthored
Send mark-read-generic notification on updateSeen (#2567)
* send `mark-read-generic` notification on `updateSeen` add `reason` add `recipientDid` push `mark-read-generic` notification on `updateSeen` add `client_controlled` * unique id, change `alwaysDeliver` to false * use murmur id * organize import * bsky: fix tests, making courier config optional. fix unread count query. --------- Co-authored-by: Devin Ivy <[email protected]>
1 parent 80450cb commit 95cb897

File tree

11 files changed

+67
-16
lines changed

11 files changed

+67
-16
lines changed

packages/bsky/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
"key-encoder": "^2.0.3",
5252
"kysely": "^0.22.0",
5353
"multiformats": "^9.9.0",
54+
"murmurhash": "^2.0.1",
5455
"p-queue": "^6.6.2",
5556
"pg": "^8.10.0",
5657
"pino": "^8.21.0",

packages/bsky/proto/courier.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ message Notification {
3232
bool always_deliver = 6;
3333
google.protobuf.Timestamp timestamp = 7;
3434
google.protobuf.Struct additional = 8;
35+
bool client_controlled = 9;
3536
}
3637

3738
message PushNotificationsRequest {

packages/bsky/src/api/app/bsky/notification/registerPush.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
import { InvalidRequestError } from '@atproto/xrpc-server'
1+
import {
2+
InvalidRequestError,
3+
MethodNotImplementedError,
4+
} from '@atproto/xrpc-server'
25
import { Server } from '../../../../lexicon'
36
import AppContext from '../../../../context'
47
import { AppPlatform } from '../../../../proto/courier_pb'
@@ -7,6 +10,11 @@ export default function (server: Server, ctx: AppContext) {
710
server.app.bsky.notification.registerPush({
811
auth: ctx.authVerifier.standard,
912
handler: async ({ auth, input }) => {
13+
if (!ctx.courierClient) {
14+
throw new MethodNotImplementedError(
15+
'This service is not configured to support push token registration.',
16+
)
17+
}
1018
const { token, platform, serviceDid, appId } = input.body
1119
const did = auth.credentials.iss
1220
if (serviceDid !== auth.credentials.aud) {

packages/bsky/src/api/app/bsky/notification/updateSeen.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { Timestamp } from '@bufbuild/protobuf'
1+
import murmur from 'murmurhash'
2+
import { Struct, Timestamp } from '@bufbuild/protobuf'
23
import { Server } from '../../../../lexicon'
34
import AppContext from '../../../../context'
45

@@ -20,7 +21,29 @@ export default function (server: Server, ctx: AppContext) {
2021
timestamp: Timestamp.fromDate(seenAt),
2122
priority: true,
2223
}),
24+
ctx.courierClient?.pushNotifications({
25+
notifications: [
26+
{
27+
id: getNotifId(viewer, seenAt),
28+
clientControlled: true,
29+
recipientDid: viewer,
30+
alwaysDeliver: false,
31+
collapseKey: 'mark-read-generic',
32+
timestamp: Timestamp.fromDate(new Date()),
33+
additional: Struct.fromJson({
34+
reason: 'mark-read-generic',
35+
}),
36+
},
37+
],
38+
}),
2339
])
2440
},
2541
})
2642
}
43+
44+
function getNotifId(viewer: string, seenAt: Date) {
45+
const key = ['mark-read-generic', viewer, seenAt.getTime().toString()].join(
46+
'::',
47+
)
48+
return murmur.v3(key).toString(16)
49+
}

packages/bsky/src/config.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export interface ServerConfigValues {
1717
bsyncApiKey?: string
1818
bsyncHttpVersion?: '1.1' | '2'
1919
bsyncIgnoreBadTls?: boolean
20-
courierUrl: string
20+
courierUrl?: string
2121
courierApiKey?: string
2222
courierHttpVersion?: '1.1' | '2'
2323
courierIgnoreBadTls?: boolean
@@ -92,7 +92,6 @@ export class ServerConfig {
9292
const bsyncIgnoreBadTls = process.env.BSKY_BSYNC_IGNORE_BAD_TLS === 'true'
9393
assert(bsyncHttpVersion === '1.1' || bsyncHttpVersion === '2')
9494
const courierUrl = process.env.BSKY_COURIER_URL || undefined
95-
assert(courierUrl)
9695
const courierApiKey = process.env.BSKY_COURIER_API_KEY || undefined
9796
const courierHttpVersion = process.env.BSKY_COURIER_HTTP_VERSION || '2'
9897
const courierIgnoreBadTls =

packages/bsky/src/context.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export class AppContext {
3030
signingKey: Keypair
3131
idResolver: IdResolver
3232
bsyncClient: BsyncClient
33-
courierClient: CourierClient
33+
courierClient: CourierClient | undefined
3434
authVerifier: AuthVerifier
3535
featureGates: FeatureGates
3636
},
@@ -76,7 +76,7 @@ export class AppContext {
7676
return this.opts.bsyncClient
7777
}
7878

79-
get courierClient(): CourierClient {
79+
get courierClient(): CourierClient | undefined {
8080
return this.opts.courierClient
8181
}
8282

packages/bsky/src/data-plane/server/routes/notifs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ export default (db: Database): Partial<ServiceImpl<typeof Service>> => ({
116116
.selectFrom('follow')
117117
.select(sql<boolean>`${true}`.as('val'))
118118
.where('creator', '=', actorDid)
119-
.whereRef('subjectDid', '=', ref('notif.author')),
119+
.whereRef('subjectDid', '=', ref('notification.author')),
120120
),
121121
)
122122
.executeTakeFirst()

packages/bsky/src/index.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,16 @@ export class BskyAppView {
110110
interceptors: config.bsyncApiKey ? [bsyncAuth(config.bsyncApiKey)] : [],
111111
})
112112

113-
const courierClient = createCourierClient({
114-
baseUrl: config.courierUrl,
115-
httpVersion: config.courierHttpVersion ?? '2',
116-
nodeOptions: { rejectUnauthorized: !config.courierIgnoreBadTls },
117-
interceptors: config.courierApiKey
118-
? [courierAuth(config.courierApiKey)]
119-
: [],
120-
})
113+
const courierClient = config.courierUrl
114+
? createCourierClient({
115+
baseUrl: config.courierUrl,
116+
httpVersion: config.courierHttpVersion ?? '2',
117+
nodeOptions: { rejectUnauthorized: !config.courierIgnoreBadTls },
118+
interceptors: config.courierApiKey
119+
? [courierAuth(config.courierApiKey)]
120+
: [],
121+
})
122+
: undefined
121123

122124
const entrywayJwtPublicKey = config.entrywayJwtPublicKeyHex
123125
? createPublicKeyObject(config.entrywayJwtPublicKeyHex)

packages/bsky/src/proto/courier_pb.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ export class Notification extends Message<Notification> {
175175
*/
176176
additional?: Struct
177177

178+
/**
179+
* @generated from field: bool client_controlled = 9;
180+
*/
181+
clientControlled = false
182+
178183
constructor(data?: PartialMessage<Notification>) {
179184
super()
180185
proto3.util.initPartial(data, this)
@@ -206,6 +211,12 @@ export class Notification extends Message<Notification> {
206211
},
207212
{ no: 7, name: 'timestamp', kind: 'message', T: Timestamp },
208213
{ no: 8, name: 'additional', kind: 'message', T: Struct },
214+
{
215+
no: 9,
216+
name: 'client_controlled',
217+
kind: 'scalar',
218+
T: 8 /* ScalarType.BOOL */,
219+
},
209220
])
210221

211222
static fromBinary(

packages/dev-env/src/bsky.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ export class TestBsky {
6262
dataplaneHttpVersion: '1.1',
6363
bsyncUrl: `http://localhost:${bsyncPort}`,
6464
bsyncHttpVersion: '1.1',
65-
courierUrl: 'https://fake.example',
6665
modServiceDid: cfg.modServiceDid ?? 'did:example:invalidMod',
6766
labelsFromIssuerDids: [EXAMPLE_LABELER],
6867
...cfg,

0 commit comments

Comments
 (0)