From 089444751c15cde2825d67a61d6183ca1bbd31b7 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Wed, 19 Jun 2024 10:08:34 +0700 Subject: [PATCH 1/8] Initial effort to implement team ingestor --- .../ingestor/src/main/scala/ingestor.scala | 9 +- .../src/main/scala/ingestor.team.scala | 107 ++++++++++++++++++ 2 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 modules/ingestor/src/main/scala/ingestor.team.scala diff --git a/modules/ingestor/src/main/scala/ingestor.scala b/modules/ingestor/src/main/scala/ingestor.scala index 285d4846..cde3798a 100644 --- a/modules/ingestor/src/main/scala/ingestor.scala +++ b/modules/ingestor/src/main/scala/ingestor.scala @@ -2,6 +2,7 @@ package lila.search package ingestor import cats.effect.* +import cats.syntax.all.* import mongo4cats.database.MongoDatabase import org.typelevel.log4cats.Logger @@ -13,6 +14,8 @@ object Ingestor: def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig)(using Logger[IO] ): IO[Ingestor] = - ForumIngestor(mongo, elastic, store, config.forum).map: f => - new Ingestor: - def run(): IO[Unit] = f.watch().compile.drain + (ForumIngestor(mongo, elastic, store, config.forum), TeamIngestor(mongo, elastic, store, config.team)) + .mapN: (forum, team) => + new Ingestor: + def run() = + forum.watch().merge(team.run()).compile.drain diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala new file mode 100644 index 00000000..a51a06ed --- /dev/null +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -0,0 +1,107 @@ +package lila.search +package ingestor + +import cats.effect.IO +import cats.syntax.all.* +import com.mongodb.client.model.changestream.OperationType.* +import lila.search.spec.TeamSource +import mongo4cats.bson.Document +import mongo4cats.database.MongoDatabase +import mongo4cats.models.collection.ChangeStreamDocument +import mongo4cats.operations.{ Aggregate, Filter } +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.syntax.* + +import java.time.Instant +import scala.concurrent.duration.* + +trait TeamIngestor: + def run(): fs2.Stream[IO, Unit] + +object TeamIngestor: + + // def data = TeamData(id, name, description, nbMembers, createdBy) + private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue) + private val eventFilter = Filter.in("operationType", interestedOperations) + // private val eventProjection = Projection.include( + // List( + // "documentKey._id", + // "fullDocument.name", + // "fullDocument.description", + // "fullDocument.nbMembers", + // "fullDocument.createdBy", + // ) + // ) + private val aggregate = Aggregate.matchBy(eventFilter) // .combinedWith(Aggregate.project(eventProjection)) + + private val index = Index.Team + + def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)( + using Logger[IO] + ): IO[TeamIngestor] = + mongo.getCollection("team").map(apply(elastic, store, config)) + + def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(teams: MongoCollection)(using + Logger[IO] + ): TeamIngestor = new: + def run() = + fs2.Stream + .eval(startAt.flatTap(since => info"Starting team ingestor from $since")) + .flatMap: last => + changeStream(last) + .filterNot(_.isEmpty) + .evalMap: events => + val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) + val (toDelete, toIndex) = events.partition(_.isDelete) + storeBulk(toIndex) + *> deleteMany(toDelete) + *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now())) + + private def storeBulk(events: List[ChangeStreamDocument[Document]]): IO[Unit] = + info"Received ${events.size} teams to index" + *> elastic.storeBulk(index, events.toSources) + *> info"Indexed ${events.size} teams" + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to index team: ${events.map(_.id).mkString(", ")}") + + private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] = + elastic + .deleteMany(index, events.flatMap(_.id.map(Id.apply))) + .flatTap(_ => info"Deleted ${events.size} teams") + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to delete teams: ${events.map(_.id).mkString(", ")}") + + private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = + store.put(index.value, time) + *> info"Stored last indexed time ${time.getEpochSecond} for $index" + + private def startAt: IO[Option[Instant]] = + config.startAt.fold(store.get(index.value))(Instant.ofEpochSecond(_).some.pure[IO]) + + private def changeStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = + val builder = teams.watch(aggregate) + since + .fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp)) + .batchSize(config.batchSize) + .boundedStream(config.batchSize) + .evalTap(IO.println) + .evalTap(x => IO.println(x.fullDocument)) + .groupWithin(config.batchSize, config.timeWindows.second) + .map(_.toList) + + extension (events: List[ChangeStreamDocument[Document]]) + private def toSources: List[(String, TeamSource)] = + events.flatten(event => (event.id, event.fullDocument.flatMap(_.toSource)).mapN(_ -> _)) + + extension (doc: Document) + private def toSource: Option[TeamSource] = + ( + doc.getString("name"), + doc.getString("description"), + doc.getInt("nbMembers") + ).mapN(TeamSource.apply) + + extension (event: ChangeStreamDocument[Document]) + private def isDelete: Boolean = + event.operationType == DELETE || + event.fullDocument.flatMap(_.get("enabled")).contains(false) From a12bed498e163f3d0981fe1623df6e385fd30a4a Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Fri, 21 Jun 2024 14:56:39 +0700 Subject: [PATCH 2/8] TeamIngestor basically works --- .../ingestor/src/main/scala/ingestor.scala | 2 +- .../src/main/scala/ingestor.team.scala | 64 ++++++++++++------- modules/ingestor/src/main/scala/package.scala | 4 ++ 3 files changed, 45 insertions(+), 25 deletions(-) diff --git a/modules/ingestor/src/main/scala/ingestor.scala b/modules/ingestor/src/main/scala/ingestor.scala index cde3798a..f9c04241 100644 --- a/modules/ingestor/src/main/scala/ingestor.scala +++ b/modules/ingestor/src/main/scala/ingestor.scala @@ -18,4 +18,4 @@ object Ingestor: .mapN: (forum, team) => new Ingestor: def run() = - forum.watch().merge(team.run()).compile.drain + forum.watch.merge(team.watch).compile.drain diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index a51a06ed..8f6d57c2 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -3,6 +3,7 @@ package ingestor import cats.effect.IO import cats.syntax.all.* +import com.mongodb.client.model.changestream.FullDocument import com.mongodb.client.model.changestream.OperationType.* import lila.search.spec.TeamSource import mongo4cats.bson.Document @@ -16,11 +17,13 @@ import java.time.Instant import scala.concurrent.duration.* trait TeamIngestor: - def run(): fs2.Stream[IO, Unit] + // watch change events from MongoDB and ingest team data into elastic search + def watch: fs2.Stream[IO, Unit] object TeamIngestor: - // def data = TeamData(id, name, description, nbMembers, createdBy) + private val index = Index.Team + private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue) private val eventFilter = Filter.in("operationType", interestedOperations) // private val eventProjection = Projection.include( @@ -34,8 +37,6 @@ object TeamIngestor: // ) private val aggregate = Aggregate.matchBy(eventFilter) // .combinedWith(Aggregate.project(eventProjection)) - private val index = Index.Team - def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)( using Logger[IO] ): IO[TeamIngestor] = @@ -44,7 +45,7 @@ object TeamIngestor: def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(teams: MongoCollection)(using Logger[IO] ): TeamIngestor = new: - def run() = + def watch = fs2.Stream .eval(startAt.flatTap(since => info"Starting team ingestor from $since")) .flatMap: last => @@ -53,23 +54,29 @@ object TeamIngestor: .evalMap: events => val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) val (toDelete, toIndex) = events.partition(_.isDelete) - storeBulk(toIndex) - *> deleteMany(toDelete) - *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now())) - - private def storeBulk(events: List[ChangeStreamDocument[Document]]): IO[Unit] = - info"Received ${events.size} teams to index" - *> elastic.storeBulk(index, events.toSources) - *> info"Indexed ${events.size} teams" + storeBulk(toIndex.flatten(_.fullDocument)) + *> deleteMany(toDelete.flatten(_.fullDocument)) + *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now)) + + private def storeBulk(docs: List[Document]): IO[Unit] = + val sources = docs.toSources + info"Received ${docs.size} teams to index" *> + elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} teams" .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index team: ${events.map(_.id).mkString(", ")}") + Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_._id).mkString(", ")}") - private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] = + private def deleteMany(docs: List[Document]): IO[Unit] = + info"Received ${docs.size} teams to delete" *> + deleteMany(docs.flatMap(_._id).map(Id.apply)).whenA(docs.nonEmpty) + + @scala.annotation.targetName("deleteManyWithIds") + private def deleteMany(ids: List[Id]): IO[Unit] = elastic - .deleteMany(index, events.flatMap(_.id.map(Id.apply))) - .flatTap(_ => info"Deleted ${events.size} teams") + .deleteMany(index, ids) + .flatTap(_ => info"Deleted ${ids.size} teams") .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to delete teams: ${events.map(_.id).mkString(", ")}") + Logger[IO].error(e)(s"Failed to delete teams: ${ids.map(_.value).mkString(", ")}") + .whenA(ids.nonEmpty) private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = store.put(index.value, time) @@ -83,25 +90,34 @@ object TeamIngestor: since .fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp)) .batchSize(config.batchSize) + .fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event .boundedStream(config.batchSize) .evalTap(IO.println) .evalTap(x => IO.println(x.fullDocument)) .groupWithin(config.batchSize, config.timeWindows.second) .map(_.toList) - extension (events: List[ChangeStreamDocument[Document]]) + extension (docs: List[Document]) private def toSources: List[(String, TeamSource)] = - events.flatten(event => (event.id, event.fullDocument.flatMap(_.toSource)).mapN(_ -> _)) + docs.flatten(doc => (doc._id, doc.toSource).mapN(_ -> _)) extension (doc: Document) private def toSource: Option[TeamSource] = ( - doc.getString("name"), - doc.getString("description"), - doc.getInt("nbMembers") + doc.getString(F.name), + doc.getString(F.description), + doc.getInt(F.nbMembers) ).mapN(TeamSource.apply) + private def isEnabled = + doc.getBoolean("enabled").getOrElse(true) + extension (event: ChangeStreamDocument[Document]) private def isDelete: Boolean = event.operationType == DELETE || - event.fullDocument.flatMap(_.get("enabled")).contains(false) + event.fullDocument.fold(false)(x => !x.isEnabled) + + object F: + val name = "name" + val description = "description" + val nbMembers = "nbMembers" diff --git a/modules/ingestor/src/main/scala/package.scala b/modules/ingestor/src/main/scala/package.scala index 6d807747..c69c10eb 100644 --- a/modules/ingestor/src/main/scala/package.scala +++ b/modules/ingestor/src/main/scala/package.scala @@ -20,6 +20,10 @@ type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[I extension [A](change: ChangeStreamDocument[A]) def docId: Option[String] = change.documentKey.flatMap(_.getString("_id")) +extension (doc: Document) + private def _id: Option[String] = + doc.getString("_id") + given [A: Schema]: Indexable[A] = (a: A) => writeToString(a) given Indexable[Source] = _ match From 8e290dc47c4e9babffc549773629c96777ee85f5 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Fri, 21 Jun 2024 14:57:05 +0700 Subject: [PATCH 3/8] Minor refactor and comments tweak --- .../src/main/scala/ingestor.forum.scala | 72 ++++++++++--------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/modules/ingestor/src/main/scala/ingestor.forum.scala b/modules/ingestor/src/main/scala/ingestor.forum.scala index e6a9f1f0..b824a13e 100644 --- a/modules/ingestor/src/main/scala/ingestor.forum.scala +++ b/modules/ingestor/src/main/scala/ingestor.forum.scala @@ -16,9 +16,9 @@ import java.time.Instant import scala.concurrent.duration.* trait ForumIngestor: - // Utilize change events functionality of MongoDB to watch for changes in the forum posts collection. - def watch(): fs2.Stream[IO, Unit] - // Fetch posts from since to until and ingest to data + // watch change events from MongoDB and ingest forum posts into elastic search + def watch: fs2.Stream[IO, Unit] + // Fetch posts in [since, until] and ingest into elastic search def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] object ForumIngestor: @@ -30,7 +30,7 @@ object ForumIngestor: private val interestedOperations = List(DELETE, INSERT, REPLACE).map(_.getValue) private val eventFilter = Filter.in("operationType", interestedOperations) - private val interestedFields = List("_id", "text", "topicId", "troll", "createdAt", "userId", "erasedAt") + private val interestedFields = List("_id", F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt) private val postProjection = Projection.include(interestedFields) private val interestedEventFields = @@ -49,7 +49,7 @@ object ForumIngestor: posts: MongoCollection )(using Logger[IO]): ForumIngestor = new: - def watch(): fs2.Stream[IO, Unit] = + def watch: fs2.Stream[IO, Unit] = fs2.Stream .eval(startAt.flatTap(since => info"Starting forum ingestor from $since")) .flatMap: last => @@ -62,9 +62,9 @@ object ForumIngestor: *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now())) def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] = - val filter = range("createdAt")(since, until) - .or(range("updatedAt")(since, until)) - .or(range("erasedAt")(since, until)) + val filter = range(F.createdAt)(since, until) + .or(range(F.updatedAt)(since, until)) + .or(range(F.erasedAt)(since, until)) posts .find(filter) .projection(postProjection) @@ -80,20 +80,20 @@ object ForumIngestor: storeBulk(toIndex) *> deleteMany(toDelete) ) - private def storeBulk(events: List[Document]): IO[Unit] = - info"Received ${events.size} forum posts to index" *> - IO.whenA(events.nonEmpty): - events.toSources - .flatMap: sources => - elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts" - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_._id).mkString(", ")}") + private def storeBulk(docs: List[Document]): IO[Unit] = + info"Received ${docs.size} forum posts to index" *> + docs.toSources + .flatMap: sources => + elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts" + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_._id).mkString(", ")}") + .whenA(docs.nonEmpty) @scala.annotation.targetName("deleteManyWithDocs") private def deleteMany(events: List[Document]): IO[Unit] = info"Received ${events.size} forum posts to delete" *> - IO.whenA(events.nonEmpty): - deleteMany(events.flatMap(_._id).map(Id.apply)) + deleteMany(events.flatMap(_._id).map(Id.apply)) + .whenA(events.nonEmpty) @scala.annotation.targetName("deleteManyWithChanges") private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] = @@ -102,12 +102,12 @@ object ForumIngestor: @scala.annotation.targetName("deleteManyWithIds") private def deleteMany(ids: List[Id]): IO[Unit] = - IO.whenA(ids.nonEmpty): - elastic - .deleteMany(index, ids) - .flatTap(_ => info"Deleted ${ids.size} forum posts") - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to delete forum posts: ${ids.map(_.value).mkString(", ")}") + elastic + .deleteMany(index, ids) + .flatTap(_ => info"Deleted ${ids.size} forum posts") + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to delete forum posts: ${ids.map(_.value).mkString(", ")}") + .whenA(ids.nonEmpty) private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = store.put(index.value, time) @@ -155,7 +155,7 @@ object ForumIngestor: extension (doc: Document) private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId]] = - (doc._id, doc.getString("topicId")) + (doc._id, doc.topicId) .flatMapN: (id, topicId) => doc.toSource(topicMap.get(topicId), topicId).map(id -> _) .match @@ -170,22 +170,28 @@ object ForumIngestor: private def toSource(topicName: Option[String], topicId: String): Option[ForumSource] = ( - doc.getString("text"), + doc.getString(F.text), topicName, - doc.getBoolean("troll"), - doc.getNested("createdAt").flatMap(_.asInstant).map(_.toEpochMilli()), - doc.getString("userId").some + doc.getBoolean(F.troll), + doc.getNested(F.createdAt).flatMap(_.asInstant).map(_.toEpochMilli), + doc.getString(F.userId).some ).mapN(ForumSource.apply(_, _, topicId, _, _, _)) private def isErased: Boolean = doc.get("erasedAt").isDefined - private def _id: Option[String] = - doc.getString("_id") - private def topicId: Option[String] = - doc.getString("topicId") + doc.getString(F.topicId) extension (event: ChangeStreamDocument[Document]) private def isDelete: Boolean = event.operationType == DELETE || event.fullDocument.exists(_.isErased) + + object F: + val text = "text" + val topicId = "topicId" + val troll = "troll" + val createdAt = "createdAt" + val userId = "userId" + val erasedAt = "erasedAt" + val updatedAt = "updatedAt" From 9bebe1e2550fb3941463d56623e91fbfed14e5e3 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Fri, 21 Jun 2024 15:31:04 +0700 Subject: [PATCH 4/8] With delete event, We don't have full document --- modules/ingestor/src/main/scala/ingestor.team.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index 8f6d57c2..dc5c3a6d 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -55,7 +55,7 @@ object TeamIngestor: val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) val (toDelete, toIndex) = events.partition(_.isDelete) storeBulk(toIndex.flatten(_.fullDocument)) - *> deleteMany(toDelete.flatten(_.fullDocument)) + *> deleteMany(toDelete) *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now)) private def storeBulk(docs: List[Document]): IO[Unit] = @@ -65,9 +65,9 @@ object TeamIngestor: .handleErrorWith: e => Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_._id).mkString(", ")}") - private def deleteMany(docs: List[Document]): IO[Unit] = - info"Received ${docs.size} teams to delete" *> - deleteMany(docs.flatMap(_._id).map(Id.apply)).whenA(docs.nonEmpty) + private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] = + info"Received ${events.size} teams to delete" *> + deleteMany(events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty) @scala.annotation.targetName("deleteManyWithIds") private def deleteMany(ids: List[Id]): IO[Unit] = From c79bcd0be4c33d38791e4347ac3178a010ded15d Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Fri, 21 Jun 2024 16:41:29 +0700 Subject: [PATCH 5/8] Only fetch interested fields and clean up --- .../src/main/scala/ingestor.team.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index dc5c3a6d..f94c8e01 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -9,7 +9,7 @@ import lila.search.spec.TeamSource import mongo4cats.bson.Document import mongo4cats.database.MongoDatabase import mongo4cats.models.collection.ChangeStreamDocument -import mongo4cats.operations.{ Aggregate, Filter } +import mongo4cats.operations.{ Aggregate, Filter, Projection } import org.typelevel.log4cats.Logger import org.typelevel.log4cats.syntax.* @@ -26,16 +26,14 @@ object TeamIngestor: private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue) private val eventFilter = Filter.in("operationType", interestedOperations) - // private val eventProjection = Projection.include( - // List( - // "documentKey._id", - // "fullDocument.name", - // "fullDocument.description", - // "fullDocument.nbMembers", - // "fullDocument.createdBy", - // ) - // ) - private val aggregate = Aggregate.matchBy(eventFilter) // .combinedWith(Aggregate.project(eventProjection)) + + private val interestedFields = List("_id", F.name, F.description, F.nbMembers, F.name, F.enabled) + + private val interestedEventFields = + List("operationType", "clusterTime", "documentKey._id") ++ interestedFields.map("fullDocument." + _) + private val eventProjection = Projection.include(interestedEventFields) + + private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection)) def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)( using Logger[IO] @@ -92,8 +90,7 @@ object TeamIngestor: .batchSize(config.batchSize) .fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event .boundedStream(config.batchSize) - .evalTap(IO.println) - .evalTap(x => IO.println(x.fullDocument)) + .evalTap(x => debug"Team change stream event: $x") .groupWithin(config.batchSize, config.timeWindows.second) .map(_.toList) @@ -110,7 +107,7 @@ object TeamIngestor: ).mapN(TeamSource.apply) private def isEnabled = - doc.getBoolean("enabled").getOrElse(true) + doc.getBoolean(F.enabled).getOrElse(true) extension (event: ChangeStreamDocument[Document]) private def isDelete: Boolean = @@ -121,3 +118,4 @@ object TeamIngestor: val name = "name" val description = "description" val nbMembers = "nbMembers" + val enabled = "enabled" From 4fccc39b8d01302b72993c05e909b81fe9316325 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Sat, 22 Jun 2024 12:16:19 +0700 Subject: [PATCH 6/8] Refactor deleteMany out of ingestors --- .../src/main/scala/ingestor.forum.scala | 24 ++----------------- .../src/main/scala/ingestor.team.scala | 15 +----------- modules/ingestor/src/main/scala/package.scala | 24 +++++++++++++++++++ 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/modules/ingestor/src/main/scala/ingestor.forum.scala b/modules/ingestor/src/main/scala/ingestor.forum.scala index b824a13e..4bb4ac2f 100644 --- a/modules/ingestor/src/main/scala/ingestor.forum.scala +++ b/modules/ingestor/src/main/scala/ingestor.forum.scala @@ -58,7 +58,7 @@ object ForumIngestor: val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption val (toDelete, toIndex) = events.partition(_.isDelete) storeBulk(toIndex.flatten(_.fullDocument)) - *> deleteMany(toDelete) + *> elastic.deleteMany(index, toDelete) *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now())) def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] = @@ -77,7 +77,7 @@ object ForumIngestor: dryRun.fold( toIndex.traverse_(doc => debug"Would index $doc") *> toDelete.traverse_(doc => debug"Would delete $doc"), - storeBulk(toIndex) *> deleteMany(toDelete) + storeBulk(toIndex) *> elastic.deleteMany(index, toDelete) ) private def storeBulk(docs: List[Document]): IO[Unit] = @@ -89,26 +89,6 @@ object ForumIngestor: Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_._id).mkString(", ")}") .whenA(docs.nonEmpty) - @scala.annotation.targetName("deleteManyWithDocs") - private def deleteMany(events: List[Document]): IO[Unit] = - info"Received ${events.size} forum posts to delete" *> - deleteMany(events.flatMap(_._id).map(Id.apply)) - .whenA(events.nonEmpty) - - @scala.annotation.targetName("deleteManyWithChanges") - private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] = - info"Received ${events.size} forum posts to delete" *> - deleteMany(events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty) - - @scala.annotation.targetName("deleteManyWithIds") - private def deleteMany(ids: List[Id]): IO[Unit] = - elastic - .deleteMany(index, ids) - .flatTap(_ => info"Deleted ${ids.size} forum posts") - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to delete forum posts: ${ids.map(_.value).mkString(", ")}") - .whenA(ids.nonEmpty) - private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = store.put(index.value, time) *> info"Stored last indexed time ${time.getEpochSecond} for $index" diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index f94c8e01..d5411c7a 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -53,7 +53,7 @@ object TeamIngestor: val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) val (toDelete, toIndex) = events.partition(_.isDelete) storeBulk(toIndex.flatten(_.fullDocument)) - *> deleteMany(toDelete) + *> elastic.deleteMany(index, toDelete) *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now)) private def storeBulk(docs: List[Document]): IO[Unit] = @@ -63,19 +63,6 @@ object TeamIngestor: .handleErrorWith: e => Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_._id).mkString(", ")}") - private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] = - info"Received ${events.size} teams to delete" *> - deleteMany(events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty) - - @scala.annotation.targetName("deleteManyWithIds") - private def deleteMany(ids: List[Id]): IO[Unit] = - elastic - .deleteMany(index, ids) - .flatTap(_ => info"Deleted ${ids.size} teams") - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to delete teams: ${ids.map(_.value).mkString(", ")}") - .whenA(ids.nonEmpty) - private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = store.put(index.value, time) *> info"Stored last indexed time ${time.getEpochSecond} for $index" diff --git a/modules/ingestor/src/main/scala/package.scala b/modules/ingestor/src/main/scala/package.scala index c69c10eb..bbcae5da 100644 --- a/modules/ingestor/src/main/scala/package.scala +++ b/modules/ingestor/src/main/scala/package.scala @@ -2,6 +2,7 @@ package lila.search package ingestor import cats.effect.IO +import cats.syntax.all.* import com.github.plokhotnyuk.jsoniter_scala.core.* import com.sksamuel.elastic4s.Indexable import lila.search.spec.{ ForumSource, Source } @@ -10,6 +11,8 @@ import mongo4cats.collection.GenericMongoCollection import mongo4cats.models.collection.ChangeStreamDocument import mongo4cats.operations.Filter import org.bson.BsonTimestamp +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.syntax.* import smithy4s.json.Json.given import smithy4s.schema.Schema @@ -39,3 +42,24 @@ def range(field: String)(since: Instant, until: Option[Instant]): Filter = import Filter.* val gtes = gte(field, since) until.fold(gtes)(until => gtes.and(lt(field, until))) + +extension (elastic: ESClient[IO]) + @scala.annotation.targetName("deleteManyWithIds") + def deleteMany(index: Index, ids: List[Id])(using Logger[IO]): IO[Unit] = + elastic + .deleteMany(index, ids) + .flatTap(_ => Logger[IO].info(s"Deleted ${ids.size} ${index.value}s")) + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to delete ${index.value}s: ${ids.map(_.value).mkString(", ")}") + .whenA(ids.nonEmpty) + + @scala.annotation.targetName("deleteManyWithDocs") + def deleteMany(index: Index, events: List[Document])(using Logger[IO]): IO[Unit] = + info"Received ${events.size} forum posts to delete" *> + deleteMany(index, events.flatMap(_._id).map(Id.apply)) + .whenA(events.nonEmpty) + + @scala.annotation.targetName("deleteManyWithChanges") + def deleteMany(index: Index, events: List[ChangeStreamDocument[Document]])(using Logger[IO]): IO[Unit] = + info"Received ${events.size} forum posts to delete" *> + deleteMany(index, events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty) From 203af567aaaed836b42fb095264e6f4dcd34c51f Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Sat, 22 Jun 2024 13:19:30 +0700 Subject: [PATCH 7/8] Add drop for team ingestor --- modules/ingestor/src/main/scala/ingestor.team.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index d5411c7a..9817d3e3 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -71,12 +71,16 @@ object TeamIngestor: config.startAt.fold(store.get(index.value))(Instant.ofEpochSecond(_).some.pure[IO]) private def changeStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = + // skip the first event if we're starting from a specific timestamp + // since the event at that timestamp is already indexed + val skip = since.fold(0)(_ => 1) val builder = teams.watch(aggregate) since .fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp)) .batchSize(config.batchSize) .fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event .boundedStream(config.batchSize) + .drop(skip) .evalTap(x => debug"Team change stream event: $x") .groupWithin(config.batchSize, config.timeWindows.second) .map(_.toList) From ace4d7a81eb97c2df0db45abaf67e8720c86ebd2 Mon Sep 17 00:00:00 2001 From: Thanh Le Date: Sat, 22 Jun 2024 14:24:21 +0700 Subject: [PATCH 8/8] code tweak --- .../src/main/scala/ingestor.forum.scala | 21 ++++++++++--------- .../src/main/scala/ingestor.team.scala | 4 ++-- modules/ingestor/src/main/scala/package.scala | 16 +++++++------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/modules/ingestor/src/main/scala/ingestor.forum.scala b/modules/ingestor/src/main/scala/ingestor.forum.scala index 4bb4ac2f..0be42825 100644 --- a/modules/ingestor/src/main/scala/ingestor.forum.scala +++ b/modules/ingestor/src/main/scala/ingestor.forum.scala @@ -25,12 +25,10 @@ object ForumIngestor: private val index = Index.Forum - private val topicProjection = Projection.include(List("_id", "name")) - private val interestedOperations = List(DELETE, INSERT, REPLACE).map(_.getValue) private val eventFilter = Filter.in("operationType", interestedOperations) - private val interestedFields = List("_id", F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt) + private val interestedFields = List(_id, F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt) private val postProjection = Projection.include(interestedFields) private val interestedEventFields = @@ -86,7 +84,7 @@ object ForumIngestor: .flatMap: sources => elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts" .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_._id).mkString(", ")}") + Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_.id).mkString(", ")}") .whenA(docs.nonEmpty) private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = @@ -99,10 +97,10 @@ object ForumIngestor: // Fetches topic names by their ids private def topicByIds(ids: Seq[String]): IO[Map[String, String]] = topics - .find(Filter.in("_id", ids)) - .projection(topicProjection) + .find(Filter.in(_id, ids)) + .projection(Projection.include(List(_id, Topic.name))) .all - .map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap) + .map(_.map(doc => (doc.id -> doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap) private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = val builder = posts.watch(aggregate) @@ -135,16 +133,16 @@ object ForumIngestor: extension (doc: Document) private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId]] = - (doc._id, doc.topicId) + (doc.id, doc.topicId) .flatMapN: (id, topicId) => doc.toSource(topicMap.get(topicId), topicId).map(id -> _) .match case Some(value) => value.some.pure[IO] case _ => - val reason = doc._id.fold("missing doc._id; ")(_ => "") + val reason = doc.id.fold("missing doc._id; ")(_ => "") + doc.topicId.fold("missing doc.topicId; ")(_ => "") + doc.topicId - .map(id => topicMap.get(id).fold("topic or topicName is missing")(_ => "")) + .map(id => topicMap.get(id).fold("topic or topic.name is missing")(_ => "")) .getOrElse("") info"failed to convert document to source: $doc because $reason".as(none) @@ -175,3 +173,6 @@ object ForumIngestor: val userId = "userId" val erasedAt = "erasedAt" val updatedAt = "updatedAt" + + object Topic: + val name = "name" diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala index 9817d3e3..410d942e 100644 --- a/modules/ingestor/src/main/scala/ingestor.team.scala +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -61,7 +61,7 @@ object TeamIngestor: info"Received ${docs.size} teams to index" *> elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} teams" .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_._id).mkString(", ")}") + Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}") private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = store.put(index.value, time) @@ -87,7 +87,7 @@ object TeamIngestor: extension (docs: List[Document]) private def toSources: List[(String, TeamSource)] = - docs.flatten(doc => (doc._id, doc.toSource).mapN(_ -> _)) + docs.flatten(doc => (doc.id, doc.toSource).mapN(_ -> _)) extension (doc: Document) private def toSource: Option[TeamSource] = diff --git a/modules/ingestor/src/main/scala/package.scala b/modules/ingestor/src/main/scala/package.scala index bbcae5da..191f5fb5 100644 --- a/modules/ingestor/src/main/scala/package.scala +++ b/modules/ingestor/src/main/scala/package.scala @@ -18,14 +18,15 @@ import smithy4s.schema.Schema import java.time.Instant +val _id = "_id" + type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]] -extension [A](change: ChangeStreamDocument[A]) - def docId: Option[String] = change.documentKey.flatMap(_.getString("_id")) +extension [A](change: ChangeStreamDocument[A]) def docId: Option[String] = change.documentKey.flatMap(_.id) extension (doc: Document) - private def _id: Option[String] = - doc.getString("_id") + private def id: Option[String] = + doc.getString(_id) given [A: Schema]: Indexable[A] = (a: A) => writeToString(a) given Indexable[Source] = @@ -39,9 +40,8 @@ extension (instant: Instant) def asBsonTimestamp: BsonTimestamp = BsonTimestamp(instant.getEpochSecond.toInt, 1) def range(field: String)(since: Instant, until: Option[Instant]): Filter = - import Filter.* - val gtes = gte(field, since) - until.fold(gtes)(until => gtes.and(lt(field, until))) + val gtes = Filter.gte(field, since) + until.fold(gtes)(until => gtes.and(Filter.lt(field, until))) extension (elastic: ESClient[IO]) @scala.annotation.targetName("deleteManyWithIds") @@ -56,7 +56,7 @@ extension (elastic: ESClient[IO]) @scala.annotation.targetName("deleteManyWithDocs") def deleteMany(index: Index, events: List[Document])(using Logger[IO]): IO[Unit] = info"Received ${events.size} forum posts to delete" *> - deleteMany(index, events.flatMap(_._id).map(Id.apply)) + deleteMany(index, events.flatMap(_.id).map(Id.apply)) .whenA(events.nonEmpty) @scala.annotation.targetName("deleteManyWithChanges")