11use std:: {
2- collections:: HashSet ,
32 env,
43 fmt:: Display ,
54 sync:: {
65 atomic:: { AtomicBool , Ordering } ,
7- Arc , Once ,
6+ Once ,
87 } ,
98 time:: Duration ,
109} ;
@@ -38,10 +37,7 @@ use summarization::SummarizationApi;
3837use tokio:: {
3938 net:: TcpListener ,
4039 select, signal,
41- sync:: {
42- mpsc:: { self , Receiver , Sender } ,
43- RwLock ,
44- } ,
40+ sync:: mpsc:: { self , Receiver , Sender } ,
4541 task:: JoinHandle ,
4642} ;
4743use tower:: { BoxError , ServiceBuilder } ;
@@ -288,19 +284,17 @@ struct Job {
288284 data : Json < JobData > ,
289285}
290286
291- #[ allow( clippy:: too_many_arguments) ]
292287async fn handle_webhooks_wrapper (
293288 rx : Receiver < EventData > ,
294289 embedding_api : EmbeddingApi ,
295290 github_api : GithubApi ,
296291 huggingface_api : HuggingfaceApi ,
297- ongoing_indexation : Arc < RwLock < HashSet < String > > > ,
298292 slack : Slack ,
299293 summarization_api : SummarizationApi ,
300294 pool : Pool < Postgres > ,
301295) -> anyhow:: Result < ( ) > {
302296 select ! {
303- _ = handle_webhooks( rx, embedding_api, github_api, huggingface_api, ongoing_indexation , slack, summarization_api, pool) => { Ok ( ( ) ) } ,
297+ _ = handle_webhooks( rx, embedding_api, github_api, huggingface_api, slack, summarization_api, pool) => { Ok ( ( ) ) } ,
304298 _ = shutdown_signal( ) => { Ok ( ( ) ) } ,
305299 }
306300}
@@ -311,7 +305,6 @@ async fn handle_webhooks(
311305 embedding_api : EmbeddingApi ,
312306 github_api : GithubApi ,
313307 huggingface_api : HuggingfaceApi ,
314- ongoing_indexation : Arc < RwLock < HashSet < String > > > ,
315308 slack : Slack ,
316309 summarization_api : SummarizationApi ,
317310 pool : Pool < Postgres > ,
@@ -561,22 +554,13 @@ async fn handle_webhooks(
561554 let embedding_api = embedding_api. clone ( ) ;
562555 let github_api = github_api. clone ( ) ;
563556 let pool = pool. clone ( ) ;
564- let ongoing_indexation = ongoing_indexation. clone ( ) ;
565557 let span = info_span ! (
566558 "repository_indexation" ,
567559 repository = repo_data. full_name,
568560 source = repo_data. source. to_string( )
569561 ) ;
570562 tokio:: spawn ( async move {
571563 info ! ( "indexing started" ) ;
572- let contained_in_set = ongoing_indexation
573- . write ( )
574- . await
575- . insert ( repo_data. full_name . clone ( ) ) ;
576- if !contained_in_set {
577- error ! ( "indexation already ongoing" ) ;
578- return ;
579- }
580564 let job = match sqlx:: query_as!(
581565 Job ,
582566 r#"select data as "data: Json<JobData>" from jobs where repository_full_name = $1 and job_type = $2"# ,
@@ -701,10 +685,6 @@ async fn handle_webhooks(
701685 }
702686 }
703687 }
704- ongoing_indexation
705- . write ( )
706- . await
707- . remove ( & repo_data. full_name ) ;
708688 if let Err ( err) = sqlx:: query!(
709689 "delete from jobs where repository_full_name = $1" ,
710690 repo_data. full_name
@@ -1048,7 +1028,6 @@ async fn main() -> anyhow::Result<()> {
10481028 let embedding_api = EmbeddingApi :: new ( config. embedding_api ) ?;
10491029 let github_api = GithubApi :: new ( config. github_api , config. message_config . clone ( ) ) ?;
10501030 let huggingface_api = HuggingfaceApi :: new ( config. huggingface_api , config. message_config ) ?;
1051- let ongoing_indexation = Arc :: new ( RwLock :: new ( HashSet :: new ( ) ) ) ;
10521031 let slack = Slack :: new ( & config. slack ) ?;
10531032 let summarization_api = SummarizationApi :: new ( config. summarization_api ) ?;
10541033
@@ -1075,7 +1054,6 @@ async fn main() -> anyhow::Result<()> {
10751054 embedding_api,
10761055 github_api,
10771056 huggingface_api,
1078- ongoing_indexation,
10791057 slack,
10801058 summarization_api,
10811059 pool
0 commit comments