Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions duka/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import time
from collections import deque
from datetime import timedelta, date
import requests

from ..core import decompress, fetch_day, Logger
from ..core.csv_dumper import CSVDumper
from ..core.utils import is_debug_mode, TimeFrame


session = None

SATURDAY = 5
day_counter = 0

Expand Down Expand Up @@ -67,9 +71,18 @@ def name(symbol, timeframe, start, end):
return name + ext


def app(symbols, start, end, threads, timeframe, folder, header):
def app(symbols, start, end, threads, timeframe, folder, header, proxy=None):
if start > end:
return

# Create session here
session = requests.Session()
if proxy:
session.proxies.update({'http': proxy, 'https': proxy})
Logger.info(f"Using proxy: {proxy}")
else:
Logger.info("No proxy configured")

lock = threading.Lock()
global day_counter
total_days = how_many_days(start, end)
Expand All @@ -85,7 +98,9 @@ def do_work(symbol, day, csv):
star_time = time.time()
Logger.info("Fetching day {0}".format(day))
try:
csv.append(day, decompress(symbol, day, fetch_day(symbol, day)))
# Pass session to fetch_day
data = decompress(symbol, day, fetch_day(symbol, day, session=session))
csv.append(day, data)
except Exception as e:
print("ERROR for {0}, {1} Exception : {2}".format(day, symbol, str(e)))
elapsed_time = time.time() - star_time
Expand Down Expand Up @@ -115,3 +130,6 @@ def do_work(symbol, day, csv):
file.dump()

update_progress(day_counter, total_days, avg(last_fetch), threads)

# Clean up
session.close()
101 changes: 71 additions & 30 deletions duka/core/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,72 +9,113 @@

from ..core.utils import Logger, is_dst

URL = "https://www.dukascopy.com/datafeed/{currency}/{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5"

URL = "https://datafeed.dukascopy.com/datafeed/{currency}/{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5"
ATTEMPTS = 5


async def get(url):
async def get(url, session=None):
"""
Async download function that uses either a provided requests.Session (for proxy)
or falls back to the global requests module.
"""
loop = asyncio.get_event_loop()
buffer = BytesIO()
id = url[35:].replace('/', " ")
id_str = url[35:].replace('/', " ") # for logging
start = time.time()
Logger.info("Fetching {0}".format(id))
for i in range(ATTEMPTS):
Logger.info(f"Fetching {id_str}")

# Use session if provided, otherwise use plain requests
req = session if session is not None else requests

for attempt in range(ATTEMPTS):
try:
res = await loop.run_in_executor(None, lambda: requests.get(url, stream=True))
res = await loop.run_in_executor(
None, lambda: req.get(url, stream=True, timeout=30)
)

if res.status_code == 200:
for chunk in res.iter_content(DEFAULT_BUFFER_SIZE):
buffer.write(chunk)
Logger.info("Fetched {0} completed in {1}s".format(id, time.time() - start))
if len(buffer.getbuffer()) <= 0:
Logger.info("Buffer for {0} is empty ".format(id))
if chunk:
buffer.write(chunk)
elapsed = time.time() - start
size = len(buffer.getbuffer())
Logger.info(f"Fetched {id_str} completed in {elapsed:.1f}s ({size} bytes)")
if size == 0:
Logger.info(f"Buffer for {id_str} is empty")
return buffer.getbuffer()
else:
Logger.warn("Request to {0} failed with error code : {1} ".format(url, str(res.status_code)))
except Exception as e:
Logger.warn("Request {0} failed with exception : {1}".format(id, str(e)))
time.sleep(0.5 * i)

raise Exception("Request failed for {0} after ATTEMPTS attempts".format(url))

else:
Logger.warn(f"Request to {id_str} failed with status {res.status_code}")

def create_tasks(symbol, day):
except Exception as e:
Logger.warn(f"Attempt {attempt + 1}/{ATTEMPTS} failed for {id_str}: {e}")
if attempt < ATTEMPTS - 1:
time.sleep(0.5 * (attempt + 1))

start = 0
raise Exception(f"Request failed for {id_str} after {ATTEMPTS} attempts")

if is_dst(day):
start = 1

def create_tasks(symbol, day, session=None):
"""
Create list of tasks for all 24 hours (handles DST by skipping or adjusting as needed)
"""
# Dukascopy uses 0-based month in URL
url_info = {
'currency': symbol,
'year': day.year,
'month': day.month - 1,
'day': day.day
}
tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(0, 24)]

# Standard 24 hours
tasks = [
asyncio.ensure_future(get(URL.format(**url_info, hour=hour), session=session))
for hour in range(24)
]

# Optional: handle DST transition (uncomment if you want to fetch extra hour)
# if is_dst(day):
# next_day = day + datetime.timedelta(days=1)
# url_info = {
# next_url_info = {
# 'currency': symbol,
# 'year': next_day.year,
# 'month': next_day.month - 1,
# 'day': next_day.day
# }
# tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0))))
# tasks.append(asyncio.ensure_future(get(URL.format(**next_url_info, hour=0), session=session)))

return tasks


def fetch_day(symbol, day):
def fetch_day(symbol, day, session=None):
"""
Main function: fetch all hourly data for a symbol and day.
Now accepts optional session for proxy support.
"""
local_data = threading.local()
loop = getattr(local_data, 'loop', asyncio.new_event_loop())
loop = getattr(local_data, 'loop', None)

if loop is None or loop.is_closed():
loop = asyncio.new_event_loop()
local_data.loop = loop

asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
tasks = create_tasks(symbol, day)

tasks = create_tasks(symbol, day, session=session)

# Run all tasks concurrently
loop.run_until_complete(asyncio.wait(tasks))

# Combine all results
def add(acc, task):
acc.write(task.result())
try:
data = task.result()
if data:
acc.write(data)
except Exception as e:
Logger.error(f"Task failed: {e}")
return acc

return reduce(add, tasks, BytesIO()).getbuffer()
result = reduce(add, tasks, BytesIO())
return result.getbuffer()
4 changes: 3 additions & 1 deletion duka/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def main():
help='use candles instead of ticks. Accepted values M1 M2 M5 M10 M15 M30 H1 H4',
default=TimeFrame.TICK)
parser.add_argument('--header', action='store_true', help='include CSV header (default false)', default=False)
parser.add_argument('-p', '--proxy', type=str, default=None,
help='Proxy URL (e.g. http://127.0.0.1:1080 or http://user:pass@host:port)')
args = parser.parse_args()

if args.startdate is not None:
Expand All @@ -39,7 +41,7 @@ def main():
end = args.day

set_up_signals()
app(args.symbols, start, end, args.thread, args.candle, args.folder, args.header)
app(args.symbols, start, end, args.thread, args.candle, args.folder, args.header , proxy=args.proxy)


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
requests>=2.9.1
PySocks==1.7.1
requests==2.32.5