Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

team ingestor #240

Merged
merged 8 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 40 additions & 53 deletions modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@ import java.time.Instant
import scala.concurrent.duration.*

trait ForumIngestor:
// Utilize change events functionality of MongoDB to watch for changes in the forum posts collection.
def watch(): fs2.Stream[IO, Unit]
// Fetch posts from since to until and ingest to data
// watch change events from MongoDB and ingest forum posts into elastic search
def watch: fs2.Stream[IO, Unit]
// Fetch posts in [since, until] and ingest into elastic search
def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit]

object ForumIngestor:

private val index = Index.Forum

private val topicProjection = Projection.include(List("_id", "name"))

private val interestedOperations = List(DELETE, INSERT, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)

private val interestedFields = List("_id", "text", "topicId", "troll", "createdAt", "userId", "erasedAt")
private val interestedFields = List(_id, F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt)
private val postProjection = Projection.include(interestedFields)

private val interestedEventFields =
Expand All @@ -49,7 +47,7 @@ object ForumIngestor:
posts: MongoCollection
)(using Logger[IO]): ForumIngestor = new:

def watch(): fs2.Stream[IO, Unit] =
def watch: fs2.Stream[IO, Unit] =
fs2.Stream
.eval(startAt.flatTap(since => info"Starting forum ingestor from $since"))
.flatMap: last =>
Expand All @@ -58,13 +56,13 @@ object ForumIngestor:
val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex.flatten(_.fullDocument))
*> deleteMany(toDelete)
*> elastic.deleteMany(index, toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now()))

def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] =
val filter = range("createdAt")(since, until)
.or(range("updatedAt")(since, until))
.or(range("erasedAt")(since, until))
val filter = range(F.createdAt)(since, until)
.or(range(F.updatedAt)(since, until))
.or(range(F.erasedAt)(since, until))
posts
.find(filter)
.projection(postProjection)
Expand All @@ -77,37 +75,17 @@ object ForumIngestor:
dryRun.fold(
toIndex.traverse_(doc => debug"Would index $doc")
*> toDelete.traverse_(doc => debug"Would delete $doc"),
storeBulk(toIndex) *> deleteMany(toDelete)
storeBulk(toIndex) *> elastic.deleteMany(index, toDelete)
)

private def storeBulk(events: List[Document]): IO[Unit] =
info"Received ${events.size} forum posts to index" *>
IO.whenA(events.nonEmpty):
events.toSources
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_._id).mkString(", ")}")

@scala.annotation.targetName("deleteManyWithDocs")
private def deleteMany(events: List[Document]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
IO.whenA(events.nonEmpty):
deleteMany(events.flatMap(_._id).map(Id.apply))

@scala.annotation.targetName("deleteManyWithChanges")
private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
deleteMany(events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty)

@scala.annotation.targetName("deleteManyWithIds")
private def deleteMany(ids: List[Id]): IO[Unit] =
IO.whenA(ids.nonEmpty):
elastic
.deleteMany(index, ids)
.flatTap(_ => info"Deleted ${ids.size} forum posts")
private def storeBulk(docs: List[Document]): IO[Unit] =
info"Received ${docs.size} forum posts to index" *>
docs.toSources
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to delete forum posts: ${ids.map(_.value).mkString(", ")}")
Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_.id).mkString(", ")}")
.whenA(docs.nonEmpty)

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
Expand All @@ -119,10 +97,10 @@ object ForumIngestor:
// Fetches topic names by their ids
private def topicByIds(ids: Seq[String]): IO[Map[String, String]] =
topics
.find(Filter.in("_id", ids))
.projection(topicProjection)
.find(Filter.in(_id, ids))
.projection(Projection.include(List(_id, Topic.name)))
.all
.map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap)
.map(_.map(doc => (doc.id -> doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap)

private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
val builder = posts.watch(aggregate)
Expand Down Expand Up @@ -155,37 +133,46 @@ object ForumIngestor:
extension (doc: Document)

private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId]] =
(doc._id, doc.getString("topicId"))
(doc.id, doc.topicId)
.flatMapN: (id, topicId) =>
doc.toSource(topicMap.get(topicId), topicId).map(id -> _)
.match
case Some(value) => value.some.pure[IO]
case _ =>
val reason = doc._id.fold("missing doc._id; ")(_ => "")
val reason = doc.id.fold("missing doc._id; ")(_ => "")
+ doc.topicId.fold("missing doc.topicId; ")(_ => "")
+ doc.topicId
.map(id => topicMap.get(id).fold("topic or topicName is missing")(_ => ""))
.map(id => topicMap.get(id).fold("topic or topic.name is missing")(_ => ""))
.getOrElse("")
info"failed to convert document to source: $doc because $reason".as(none)

private def toSource(topicName: Option[String], topicId: String): Option[ForumSource] =
(
doc.getString("text"),
doc.getString(F.text),
topicName,
doc.getBoolean("troll"),
doc.getNested("createdAt").flatMap(_.asInstant).map(_.toEpochMilli()),
doc.getString("userId").some
doc.getBoolean(F.troll),
doc.getNested(F.createdAt).flatMap(_.asInstant).map(_.toEpochMilli),
doc.getString(F.userId).some
).mapN(ForumSource.apply(_, _, topicId, _, _, _))

private def isErased: Boolean =
doc.get("erasedAt").isDefined

private def _id: Option[String] =
doc.getString("_id")

private def topicId: Option[String] =
doc.getString("topicId")
doc.getString(F.topicId)

extension (event: ChangeStreamDocument[Document])
private def isDelete: Boolean =
event.operationType == DELETE || event.fullDocument.exists(_.isErased)

object F:
val text = "text"
val topicId = "topicId"
val troll = "troll"
val createdAt = "createdAt"
val userId = "userId"
val erasedAt = "erasedAt"
val updatedAt = "updatedAt"

object Topic:
val name = "name"
9 changes: 6 additions & 3 deletions modules/ingestor/src/main/scala/ingestor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lila.search
package ingestor

import cats.effect.*
import cats.syntax.all.*
import mongo4cats.database.MongoDatabase
import org.typelevel.log4cats.Logger

Expand All @@ -13,6 +14,8 @@ object Ingestor:
def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig)(using
Logger[IO]
): IO[Ingestor] =
ForumIngestor(mongo, elastic, store, config.forum).map: f =>
new Ingestor:
def run(): IO[Unit] = f.watch().compile.drain
(ForumIngestor(mongo, elastic, store, config.forum), TeamIngestor(mongo, elastic, store, config.team))
.mapN: (forum, team) =>
new Ingestor:
def run() =
forum.watch.merge(team.watch).compile.drain
112 changes: 112 additions & 0 deletions modules/ingestor/src/main/scala/ingestor.team.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package lila.search
package ingestor

import cats.effect.IO
import cats.syntax.all.*
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.client.model.changestream.OperationType.*
import lila.search.spec.TeamSource
import mongo4cats.bson.Document
import mongo4cats.database.MongoDatabase
import mongo4cats.models.collection.ChangeStreamDocument
import mongo4cats.operations.{ Aggregate, Filter, Projection }
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*

import java.time.Instant
import scala.concurrent.duration.*

trait TeamIngestor:
// watch change events from MongoDB and ingest team data into elastic search
def watch: fs2.Stream[IO, Unit]

object TeamIngestor:

private val index = Index.Team

private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)

private val interestedFields = List("_id", F.name, F.description, F.nbMembers, F.name, F.enabled)

private val interestedEventFields =
List("operationType", "clusterTime", "documentKey._id") ++ interestedFields.map("fullDocument." + _)
private val eventProjection = Projection.include(interestedEventFields)

private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection))

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(
using Logger[IO]
): IO[TeamIngestor] =
mongo.getCollection("team").map(apply(elastic, store, config))

def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(teams: MongoCollection)(using
Logger[IO]
): TeamIngestor = new:
def watch =
fs2.Stream
.eval(startAt.flatTap(since => info"Starting team ingestor from $since"))
.flatMap: last =>
changeStream(last)
.filterNot(_.isEmpty)
.evalMap: events =>
val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex.flatten(_.fullDocument))
*> elastic.deleteMany(index, toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now))

private def storeBulk(docs: List[Document]): IO[Unit] =
val sources = docs.toSources
info"Received ${docs.size} teams to index" *>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} teams"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}")

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"

private def startAt: IO[Option[Instant]] =
config.startAt.fold(store.get(index.value))(Instant.ofEpochSecond(_).some.pure[IO])

private def changeStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
// skip the first event if we're starting from a specific timestamp
// since the event at that timestamp is already indexed
val skip = since.fold(0)(_ => 1)
val builder = teams.watch(aggregate)
since
.fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp))
.batchSize(config.batchSize)
.fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event
.boundedStream(config.batchSize)
.drop(skip)
.evalTap(x => debug"Team change stream event: $x")
.groupWithin(config.batchSize, config.timeWindows.second)
.map(_.toList)

extension (docs: List[Document])
private def toSources: List[(String, TeamSource)] =
docs.flatten(doc => (doc.id, doc.toSource).mapN(_ -> _))

extension (doc: Document)
private def toSource: Option[TeamSource] =
(
doc.getString(F.name),
doc.getString(F.description),
doc.getInt(F.nbMembers)
).mapN(TeamSource.apply)

private def isEnabled =
doc.getBoolean(F.enabled).getOrElse(true)

extension (event: ChangeStreamDocument[Document])
private def isDelete: Boolean =
event.operationType == DELETE ||
event.fullDocument.fold(false)(x => !x.isEnabled)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.exists(


object F:
val name = "name"
val description = "description"
val nbMembers = "nbMembers"
val enabled = "enabled"
38 changes: 33 additions & 5 deletions modules/ingestor/src/main/scala/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lila.search
package ingestor

import cats.effect.IO
import cats.syntax.all.*
import com.github.plokhotnyuk.jsoniter_scala.core.*
import com.sksamuel.elastic4s.Indexable
import lila.search.spec.{ ForumSource, Source }
Expand All @@ -10,15 +11,22 @@ import mongo4cats.collection.GenericMongoCollection
import mongo4cats.models.collection.ChangeStreamDocument
import mongo4cats.operations.Filter
import org.bson.BsonTimestamp
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*
import smithy4s.json.Json.given
import smithy4s.schema.Schema

import java.time.Instant

val _id = "_id"

type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]]

extension [A](change: ChangeStreamDocument[A])
def docId: Option[String] = change.documentKey.flatMap(_.getString("_id"))
extension [A](change: ChangeStreamDocument[A]) def docId: Option[String] = change.documentKey.flatMap(_.id)

extension (doc: Document)
private def id: Option[String] =
doc.getString(_id)

given [A: Schema]: Indexable[A] = (a: A) => writeToString(a)
given Indexable[Source] =
Expand All @@ -32,6 +40,26 @@ extension (instant: Instant)
def asBsonTimestamp: BsonTimestamp = BsonTimestamp(instant.getEpochSecond.toInt, 1)

def range(field: String)(since: Instant, until: Option[Instant]): Filter =
import Filter.*
val gtes = gte(field, since)
until.fold(gtes)(until => gtes.and(lt(field, until)))
val gtes = Filter.gte(field, since)
until.fold(gtes)(until => gtes.and(Filter.lt(field, until)))

extension (elastic: ESClient[IO])
@scala.annotation.targetName("deleteManyWithIds")
def deleteMany(index: Index, ids: List[Id])(using Logger[IO]): IO[Unit] =
elastic
.deleteMany(index, ids)
.flatTap(_ => Logger[IO].info(s"Deleted ${ids.size} ${index.value}s"))
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to delete ${index.value}s: ${ids.map(_.value).mkString(", ")}")
.whenA(ids.nonEmpty)

@scala.annotation.targetName("deleteManyWithDocs")
def deleteMany(index: Index, events: List[Document])(using Logger[IO]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
deleteMany(index, events.flatMap(_.id).map(Id.apply))
.whenA(events.nonEmpty)

@scala.annotation.targetName("deleteManyWithChanges")
def deleteMany(index: Index, events: List[ChangeStreamDocument[Document]])(using Logger[IO]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
deleteMany(index, events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty)
Loading